Merge pull request #2060 from TheBlueMatt/2023-02-peers-disconnect-consistency
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 2 Mar 2023 23:44:23 +0000 (23:44 +0000)
committerGitHub <noreply@github.com>
Thu, 2 Mar 2023 23:44:23 +0000 (23:44 +0000)
Remove peers from the node_id_to_descriptor even without init

fuzz/src/process_network_graph.rs
lightning-background-processor/src/lib.rs
lightning-block-sync/src/rpc.rs
lightning-rapid-gossip-sync/src/lib.rs
lightning-rapid-gossip-sync/src/processing.rs
lightning/src/util/indexed_map.rs
lightning/src/util/macro_logger.rs
lightning/src/util/ser.rs
lightning/src/util/wakers.rs

index c900a7d38d5ac0529b0912e05717f9d0f0e4c693..b4c6a29e8a99f47744fdb9fd0caf2468c7604e57 100644 (file)
@@ -7,7 +7,7 @@ use crate::utils::test_logger;
 fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
        let logger = test_logger::TestLogger::new("".to_owned(), out);
        let network_graph = lightning::routing::gossip::NetworkGraph::new(bitcoin::Network::Bitcoin, &logger);
-       let rapid_sync = RapidGossipSync::new(&network_graph);
+       let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
        let _ = rapid_sync.update_network_graph(data);
 }
 
index 8711a4aeb5898e393f173886ca8e20de4b9b0d6f..f7f1296b98e66e737c6c81283a8a7e459a7b8160 100644 (file)
@@ -953,7 +953,7 @@ mod tests {
                        let params = ChainParameters { network, best_block };
                        let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
-                       let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
+                       let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
index f04769560246f8537e1e022efa22c8b7a815eab4..6b4397a6b0fbe87d19a130fa18002a27042c9864 100644 (file)
@@ -13,8 +13,27 @@ use serde_json;
 
 use std::convert::TryFrom;
 use std::convert::TryInto;
+use std::error::Error;
+use std::fmt;
 use std::sync::atomic::{AtomicUsize, Ordering};
 
+/// An error returned by the RPC server.
+#[derive(Debug)]
+pub struct RpcError {
+       /// The error code.
+       pub code: i64,
+       /// The error message.
+       pub message: String,
+}
+
+impl fmt::Display for RpcError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "RPC error {}: {}", self.code, self.message)
+    }
+}
+
+impl Error for RpcError {}
+
 /// A simple RPC client for calling methods using HTTP `POST`.
 pub struct RpcClient {
        basic_auth: String,
@@ -69,8 +88,11 @@ impl RpcClient {
                let error = &response["error"];
                if !error.is_null() {
                        // TODO: Examine error code for a more precise std::io::ErrorKind.
-                       let message = error["message"].as_str().unwrap_or("unknown error");
-                       return Err(std::io::Error::new(std::io::ErrorKind::Other, message));
+                       let rpc_error = RpcError { 
+                               code: error["code"].as_i64().unwrap_or(-1), 
+                               message: error["message"].as_str().unwrap_or("unknown error").to_string() 
+                       };
+                       return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error));
                }
 
                let result = &mut response["result"];
@@ -163,7 +185,9 @@ mod tests {
                match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
                        Err(e) => {
                                assert_eq!(e.kind(), std::io::ErrorKind::Other);
-                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter");
+                               let rpc_error: Box<RpcError> = e.into_inner().unwrap().downcast().unwrap();
+                               assert_eq!(rpc_error.code, -8);
+                               assert_eq!(rpc_error.message, "invalid parameter");
                        },
                        Ok(_) => panic!("Expected error"),
                }
index 3bceb2e28e9239c0a7e8b34790275d4f51f064e2..af235b1c4224d990f2a1d71881ab23e659d61e28 100644 (file)
@@ -54,7 +54,7 @@
 //! # let logger = FakeLogger {};
 //!
 //! let network_graph = NetworkGraph::new(Network::Bitcoin, &logger);
-//! let rapid_sync = RapidGossipSync::new(&network_graph);
+//! let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
 //! let snapshot_contents: &[u8] = &[0; 0];
 //! let new_last_sync_timestamp_result = rapid_sync.update_network_graph(snapshot_contents);
 //! ```
@@ -94,14 +94,16 @@ mod processing;
 pub struct RapidGossipSync<NG: Deref<Target=NetworkGraph<L>>, L: Deref>
 where L::Target: Logger {
        network_graph: NG,
+       logger: L,
        is_initial_sync_complete: AtomicBool
 }
 
 impl<NG: Deref<Target=NetworkGraph<L>>, L: Deref> RapidGossipSync<NG, L> where L::Target: Logger {
        /// Instantiate a new [`RapidGossipSync`] instance.
-       pub fn new(network_graph: NG) -> Self {
+       pub fn new(network_graph: NG, logger: L) -> Self {
                Self {
                        network_graph,
+                       logger,
                        is_initial_sync_complete: AtomicBool::new(false)
                }
        }
@@ -228,7 +230,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let sync_result = rapid_sync.sync_network_graph_with_file_path(&graph_sync_test_file);
 
                if sync_result.is_err() {
@@ -260,7 +262,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let start = std::time::Instant::now();
                let sync_result = rapid_sync
                        .sync_network_graph_with_file_path("./res/full_graph.lngossip");
@@ -299,7 +301,7 @@ pub mod bench {
                let logger = TestLogger::new();
                b.iter(|| {
                        let network_graph = NetworkGraph::new(Network::Bitcoin, &logger);
-                       let rapid_sync = RapidGossipSync::new(&network_graph);
+                       let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                        let sync_result = rapid_sync.sync_network_graph_with_file_path("./res/full_graph.lngossip");
                        if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
                                let error_string = format!("Input file lightning-rapid-gossip-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
index 4b6de04c6556a5ef302d4dced78f1b833f5e5380..8a52d234388052a38b540bfa30a6621387007ec5 100644 (file)
@@ -10,6 +10,7 @@ use lightning::ln::msgs::{
 };
 use lightning::routing::gossip::NetworkGraph;
 use lightning::util::logger::Logger;
+use lightning::{log_warn, log_trace, log_given_level};
 use lightning::util::ser::{BigSize, Readable};
 use lightning::io;
 
@@ -120,6 +121,7 @@ impl<NG: Deref<Target=NetworkGraph<L>>, L: Deref> RapidGossipSync<NG, L> where L
                                if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action {
                                        // everything is fine, just a duplicate channel announcement
                                } else {
+                                       log_warn!(self.logger, "Failed to process channel announcement: {:?}", lightning_error);
                                        return Err(lightning_error.into());
                                }
                        }
@@ -169,24 +171,19 @@ impl<NG: Deref<Target=NetworkGraph<L>>, L: Deref> RapidGossipSync<NG, L> where L
                        if (channel_flags & 0b_1000_0000) != 0 {
                                // incremental update, field flags will indicate mutated values
                                let read_only_network_graph = network_graph.read_only();
-                               if let Some(channel) = read_only_network_graph
-                                       .channels()
-                                       .get(&short_channel_id) {
-
-                                       let directional_info = channel
-                                               .get_directional_info(channel_flags)
-                                               .ok_or(LightningError {
-                                                       err: "Couldn't find previous directional data for update".to_owned(),
-                                                       action: ErrorAction::IgnoreError,
-                                               })?;
-
+                               if let Some(directional_info) =
+                                       read_only_network_graph.channels().get(&short_channel_id)
+                                       .and_then(|channel| channel.get_directional_info(channel_flags))
+                               {
                                        synthetic_update.cltv_expiry_delta = directional_info.cltv_expiry_delta;
                                        synthetic_update.htlc_minimum_msat = directional_info.htlc_minimum_msat;
                                        synthetic_update.htlc_maximum_msat = directional_info.htlc_maximum_msat;
                                        synthetic_update.fee_base_msat = directional_info.fees.base_msat;
                                        synthetic_update.fee_proportional_millionths = directional_info.fees.proportional_millionths;
-
                                } else {
+                                       log_trace!(self.logger,
+                                               "Skipping application of channel update for chan {} with flags {} as original data is missing.",
+                                               short_channel_id, channel_flags);
                                        skip_update_for_unknown_channel = true;
                                }
                        };
@@ -223,7 +220,9 @@ impl<NG: Deref<Target=NetworkGraph<L>>, L: Deref> RapidGossipSync<NG, L> where L
                        match network_graph.update_channel_unsigned(&synthetic_update) {
                                Ok(_) => {},
                                Err(LightningError { action: ErrorAction::IgnoreDuplicateGossip, .. }) => {},
-                               Err(LightningError { action: ErrorAction::IgnoreAndLog(_), .. }) => {},
+                               Err(LightningError { action: ErrorAction::IgnoreAndLog(level), err }) => {
+                                       log_given_level!(self.logger, level, "Failed to apply channel update: {:?}", err);
+                               },
                                Err(LightningError { action: ErrorAction::IgnoreError, .. }) => {},
                                Err(e) => return Err(e.into()),
                        }
@@ -287,7 +286,7 @@ mod tests {
                        0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0,
                        0, 3, 232, 0, 0, 0,
                ];
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let update_result = rapid_sync.update_network_graph(&example_input[..]);
                assert!(update_result.is_err());
                if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result {
@@ -312,7 +311,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let update_result = rapid_sync.update_network_graph(&incremental_update_input[..]);
                assert!(update_result.is_ok());
        }
@@ -340,17 +339,8 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
-               let update_result = rapid_sync.update_network_graph(&announced_update_input[..]);
-               assert!(update_result.is_err());
-               if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
-                       assert_eq!(
-                               lightning_error.err,
-                               "Couldn't find previous directional data for update"
-                       );
-               } else {
-                       panic!("Unexpected update result: {:?}", update_result)
-               }
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
+               rapid_sync.update_network_graph(&announced_update_input[..]).unwrap();
        }
 
        #[test]
@@ -376,7 +366,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let initialization_result = rapid_sync.update_network_graph(&initialization_input[..]);
                if initialization_result.is_err() {
                        panic!(
@@ -405,16 +395,7 @@ mod tests {
                        0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
                        68, 226, 0, 6, 11, 0, 1, 128,
                ];
-               let update_result = rapid_sync.update_network_graph(&opposite_direction_incremental_update_input[..]);
-               assert!(update_result.is_err());
-               if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
-                       assert_eq!(
-                               lightning_error.err,
-                               "Couldn't find previous directional data for update"
-                       );
-               } else {
-                       panic!("Unexpected update result: {:?}", update_result)
-               }
+               rapid_sync.update_network_graph(&opposite_direction_incremental_update_input[..]).unwrap();
        }
 
        #[test]
@@ -442,7 +423,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let initialization_result = rapid_sync.update_network_graph(&initialization_input[..]);
                assert!(initialization_result.is_ok());
 
@@ -501,7 +482,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let initialization_result = rapid_sync.update_network_graph(&initialization_input[..]);
                assert!(initialization_result.is_ok());
 
@@ -526,7 +507,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let update_result = rapid_sync.update_network_graph(&VALID_RGS_BINARY);
                if update_result.is_err() {
                        panic!("Unexpected update result: {:?}", update_result)
@@ -557,7 +538,7 @@ mod tests {
 
                assert_eq!(network_graph.read_only().channels().len(), 0);
 
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                // this is mostly for checking uint underflow issues before the fuzzer does
                let update_result = rapid_sync.update_network_graph_no_std(&VALID_RGS_BINARY, Some(0));
                assert!(update_result.is_ok());
@@ -576,7 +557,7 @@ mod tests {
                        let network_graph = NetworkGraph::new(Network::Bitcoin, &logger);
                        assert_eq!(network_graph.read_only().channels().len(), 0);
 
-                       let rapid_sync = RapidGossipSync::new(&network_graph);
+                       let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                        let update_result = rapid_sync.update_network_graph_no_std(&VALID_RGS_BINARY, Some(latest_succeeding_time));
                        assert!(update_result.is_ok());
                        assert_eq!(network_graph.read_only().channels().len(), 2);
@@ -586,7 +567,7 @@ mod tests {
                        let network_graph = NetworkGraph::new(Network::Bitcoin, &logger);
                        assert_eq!(network_graph.read_only().channels().len(), 0);
 
-                       let rapid_sync = RapidGossipSync::new(&network_graph);
+                       let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                        let update_result = rapid_sync.update_network_graph_no_std(&VALID_RGS_BINARY, Some(earliest_failing_time));
                        assert!(update_result.is_err());
                        if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
@@ -622,7 +603,7 @@ mod tests {
 
                let logger = TestLogger::new();
                let network_graph = NetworkGraph::new(Network::Bitcoin, &logger);
-               let rapid_sync = RapidGossipSync::new(&network_graph);
+               let rapid_sync = RapidGossipSync::new(&network_graph, &logger);
                let update_result = rapid_sync.update_network_graph(&unknown_version_input[..]);
 
                assert!(update_result.is_err());
index 3d45172517db3e706dcbf4884dd75923c872c8a5..2b5bbac0ddc583e188a9b36b2833cbb42bb74eea 100644 (file)
@@ -21,6 +21,8 @@ use core::ops::{Bound, RangeBounds};
 /// actually backed by a [`HashMap`], with some additional tracking to ensure we can iterate over
 /// keys in the order defined by [`Ord`].
 ///
+/// (C-not exported) as bindings provide alternate accessors rather than exposing maps directly.
+///
 /// [`BTreeMap`]: alloc::collections::BTreeMap
 #[derive(Clone, Debug, Eq)]
 pub struct IndexedMap<K: Hash + Ord, V> {
@@ -147,6 +149,8 @@ impl<K: Hash + Ord + PartialEq, V: PartialEq> PartialEq for IndexedMap<K, V> {
 }
 
 /// An iterator over a range of values in an [`IndexedMap`]
+///
+/// (C-not exported) as bindings provide alternate accessors rather than exposing maps directly.
 pub struct Range<'a, K: Hash + Ord, V> {
        inner_range: Iter<'a, K>,
        map: &'a HashMap<K, V>,
@@ -161,6 +165,8 @@ impl<'a, K: Hash + Ord, V: 'a> Iterator for Range<'a, K, V> {
 }
 
 /// An [`Entry`] for a key which currently has no value
+///
+/// (C-not exported) as bindings provide alternate accessors rather than exposing maps directly.
 pub struct VacantEntry<'a, K: Hash + Ord, V> {
        #[cfg(feature = "hashbrown")]
        underlying_entry: hash_map::VacantEntry<'a, K, V, hash_map::DefaultHashBuilder>,
@@ -171,6 +177,8 @@ pub struct VacantEntry<'a, K: Hash + Ord, V> {
 }
 
 /// An [`Entry`] for an existing key-value pair
+///
+/// (C-not exported) as bindings provide alternate accessors rather than exposing maps directly.
 pub struct OccupiedEntry<'a, K: Hash + Ord, V> {
        #[cfg(feature = "hashbrown")]
        underlying_entry: hash_map::OccupiedEntry<'a, K, V, hash_map::DefaultHashBuilder>,
@@ -181,6 +189,8 @@ pub struct OccupiedEntry<'a, K: Hash + Ord, V> {
 
 /// A mutable reference to a position in the map. This can be used to reference, add, or update the
 /// value at a fixed key.
+///
+/// (C-not exported) as bindings provide alternate accessors rather than exposing maps directly.
 pub enum Entry<'a, K: Hash + Ord, V> {
        /// A mutable reference to a position within the map where there is no value.
        Vacant(VacantEntry<'a, K, V>),
index e83e6e2ee48ea27d40327c4e56cb50e2827e94fc..6e98272f3171d2bc47082c4171b6e4013800e911 100644 (file)
@@ -167,17 +167,17 @@ macro_rules! log_given_level {
        ($logger: expr, $lvl:expr, $($arg:tt)+) => (
                match $lvl {
                        #[cfg(not(any(feature = "max_level_off")))]
-                       $crate::util::logger::Level::Error => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Error => $crate::log_internal!($logger, $lvl, $($arg)*),
                        #[cfg(not(any(feature = "max_level_off", feature = "max_level_error")))]
-                       $crate::util::logger::Level::Warn => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Warn => $crate::log_internal!($logger, $lvl, $($arg)*),
                        #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn")))]
-                       $crate::util::logger::Level::Info => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Info => $crate::log_internal!($logger, $lvl, $($arg)*),
                        #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info")))]
-                       $crate::util::logger::Level::Debug => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Debug => $crate::log_internal!($logger, $lvl, $($arg)*),
                        #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info", feature = "max_level_debug")))]
-                       $crate::util::logger::Level::Trace => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Trace => $crate::log_internal!($logger, $lvl, $($arg)*),
                        #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info", feature = "max_level_debug", feature = "max_level_trace")))]
-                       $crate::util::logger::Level::Gossip => log_internal!($logger, $lvl, $($arg)*),
+                       $crate::util::logger::Level::Gossip => $crate::log_internal!($logger, $lvl, $($arg)*),
 
                        #[cfg(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info", feature = "max_level_debug", feature = "max_level_trace"))]
                        _ => {
@@ -191,7 +191,7 @@ macro_rules! log_given_level {
 #[macro_export]
 macro_rules! log_error {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Error, $($arg)*);
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Error, $($arg)*);
        )
 }
 
@@ -199,7 +199,7 @@ macro_rules! log_error {
 #[macro_export]
 macro_rules! log_warn {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Warn, $($arg)*);
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Warn, $($arg)*);
        )
 }
 
@@ -207,7 +207,7 @@ macro_rules! log_warn {
 #[macro_export]
 macro_rules! log_info {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Info, $($arg)*);
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Info, $($arg)*);
        )
 }
 
@@ -215,7 +215,7 @@ macro_rules! log_info {
 #[macro_export]
 macro_rules! log_debug {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Debug, $($arg)*);
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Debug, $($arg)*);
        )
 }
 
@@ -223,7 +223,7 @@ macro_rules! log_debug {
 #[macro_export]
 macro_rules! log_trace {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Trace, $($arg)*)
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Trace, $($arg)*)
        )
 }
 
@@ -231,6 +231,6 @@ macro_rules! log_trace {
 #[macro_export]
 macro_rules! log_gossip {
        ($logger: expr, $($arg:tt)*) => (
-               log_given_level!($logger, $crate::util::logger::Level::Gossip, $($arg)*);
+               $crate::log_given_level!($logger, $crate::util::logger::Level::Gossip, $($arg)*);
        )
 }
index 14c25775174b19dc50f4dabe5ecb434ce0ea259b..bef192585b5e486696068174b92539c0faa895ff 100644 (file)
@@ -89,6 +89,8 @@ impl Writer for VecWriter {
 
 /// Writer that only tracks the amount of data written - useful if you need to calculate the length
 /// of some data when serialized but don't yet need the full data.
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct LengthCalculatingWriter(pub usize);
 impl Writer for LengthCalculatingWriter {
        #[inline]
@@ -100,6 +102,8 @@ impl Writer for LengthCalculatingWriter {
 
 /// Essentially [`std::io::Take`] but a bit simpler and with a method to walk the underlying stream
 /// forward to ensure we always consume exactly the fixed length specified.
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct FixedLengthReader<R: Read> {
        read: R,
        bytes_read: u64,
@@ -155,6 +159,8 @@ impl<R: Read> LengthRead for FixedLengthReader<R> {
 
 /// A [`Read`] implementation which tracks whether any bytes have been read at all. This allows us to distinguish
 /// between "EOF reached before we started" and "EOF reached mid-read".
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct ReadTrackingReader<R: Read> {
        read: R,
        /// Returns whether we have read from this reader or not yet.
@@ -289,6 +295,8 @@ impl<T: Readable> MaybeReadable for T {
 }
 
 /// Wrapper to read a required (non-optional) TLV record.
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct RequiredWrapper<T>(pub Option<T>);
 impl<T: Readable> Readable for RequiredWrapper<T> {
        #[inline]
@@ -311,6 +319,8 @@ impl<T> From<T> for RequiredWrapper<T> {
 
 /// Wrapper to read a required (non-optional) TLV record that may have been upgraded without
 /// backwards compat.
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct UpgradableRequired<T: MaybeReadable>(pub Option<T>);
 impl<T: MaybeReadable> MaybeReadable for UpgradableRequired<T> {
        #[inline]
@@ -591,6 +601,8 @@ impl Readable for [u16; 8] {
 
 /// A type for variable-length values within TLV record where the length is encoded as part of the record.
 /// Used to prevent encoding the length twice.
+///
+/// (C-not exported) as manual TLV building is not currently supported in bindings
 pub struct WithoutLength<T>(pub T);
 
 impl Writeable for WithoutLength<&String> {
index fdbc22f116600b7162bdbdcafa4a6f314af944e1..f86fc376cee0202323b9924057157aaf8379f86a 100644 (file)
@@ -105,7 +105,10 @@ impl Notifier {
        pub(crate) fn notify(&self) {
                let mut lock = self.notify_pending.lock().unwrap();
                if let Some(future_state) = &lock.1 {
-                       future_state.lock().unwrap().complete();
+                       if future_state.lock().unwrap().complete() {
+                               lock.1 = None;
+                               return;
+                       }
                }
                lock.0 = true;
                mem::drop(lock);
@@ -161,12 +164,13 @@ pub(crate) struct FutureState {
 }
 
 impl FutureState {
-       fn complete(&mut self) {
+       fn complete(&mut self) -> bool {
                for (counts_as_call, callback) in self.callbacks.drain(..) {
                        callback.call();
                        self.callbacks_made |= counts_as_call;
                }
                self.complete = true;
+               self.callbacks_made
        }
 }
 
@@ -469,4 +473,63 @@ mod tests {
                assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
                assert!(!notifier.wait_timeout(Duration::from_millis(1)));
        }
+
+       #[test]
+       fn test_poll_post_notify_completes() {
+               // Tests that if we have a future state that has completed, and we haven't yet requested a
+               // new future, if we get a notify prior to requesting that second future it is generated
+               // pre-completed.
+               let notifier = Notifier::new();
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+       }
+
+       #[test]
+       fn test_poll_post_notify_completes_initial_notified() {
+               // Identical to the previous test, but the first future completes via a wake rather than an
+               // immediate `Poll::Ready`.
+               let notifier = Notifier::new();
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+
+               notifier.notify();
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+               assert!(!woken.load(Ordering::SeqCst));
+
+               let mut future = notifier.get_future();
+               let (woken, waker) = create_waker();
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+               assert!(!woken.load(Ordering::SeqCst));
+
+               notifier.notify();
+               assert!(woken.load(Ordering::SeqCst));
+               assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+       }
 }