Merge pull request #950 from TheBlueMatt/2021-06-changelog
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 23 Jun 2021 17:22:11 +0000 (17:22 +0000)
committerGitHub <noreply@github.com>
Wed, 23 Jun 2021 17:22:11 +0000 (17:22 +0000)
Add a dummy first CHANGELOG entry for future tracking

21 files changed:
.github/workflows/build.yml
ci/check-compiles.sh
lightning-block-sync/src/poll.rs
lightning-invoice/src/lib.rs
lightning-net-tokio/src/lib.rs
lightning/Cargo.toml
lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/chain/keysinterface.rs
lightning/src/chain/onchaintx.rs
lightning/src/lib.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/reorg_tests.rs
lightning/src/routing/router.rs
lightning/src/util/message_signing.rs
lightning/src/util/ser.rs
lightning/src/util/test_utils.rs

index d24b2be7a45dc3fd7a0b4f95f741552b6a8424a4..85aaac6a3a0d378e5cef84a1f7ef307d121a8658 100644 (file)
@@ -14,21 +14,34 @@ jobs:
                      # 1.41.0 is Debian stable
                      1.41.0,
                      # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, and coverage generation
-                     1.45.2]
+                     1.45.2,
+                     # 1.49.0 is MSRV for no_std builds using hashbrown
+                     1.49.0]
         include:
           - toolchain: stable
             build-net-tokio: true
+            build-no-std: true
           - toolchain: stable
             platform: macos-latest
             build-net-tokio: true
+            build-no-std: true
           - toolchain: stable
             platform: windows-latest
             build-net-tokio: true
+            build-no-std: true
           - toolchain: beta
             build-net-tokio: true
+            build-no-std: true
+          - toolchain: 1.36.0
+            build-no-std: false
+          - toolchain: 1.41.0
+            build-no-std: false
           - toolchain: 1.45.2
             build-net-tokio: true
+            build-no-std: false
             coverage: true
+          - toolchain: 1.49.0
+            build-no-std: true
     runs-on: ${{ matrix.platform }}
     steps:
       - name: Checkout source code
@@ -47,7 +60,10 @@ jobs:
         run: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always
       - name: Build on Rust ${{ matrix.toolchain }}
         if: "! matrix.build-net-tokio"
-        run: cargo build --verbose  --color always -p lightning && cargo build --verbose  --color always -p lightning-invoice && cargo build --verbose  --color always -p lightning-persister
+        run: |
+          cargo build --verbose  --color always -p lightning
+          cargo build --verbose  --color always -p lightning-invoice
+          cargo build --verbose  --color always -p lightning-persister
       - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features
         if: "matrix.build-net-tokio && !matrix.coverage"
         run: |
@@ -56,7 +72,6 @@ jobs:
           cargo build --verbose --color always --features rpc-client
           cargo build --verbose --color always --features rpc-client,rest-client
           cargo build --verbose --color always --features rpc-client,rest-client,tokio
-          cd ..
       - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
         if: matrix.coverage
         run: |
@@ -65,16 +80,30 @@ jobs:
           RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client
           RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client
           RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio
-          cd ..
       - name: Test on Rust ${{ matrix.toolchain }} with net-tokio
         if: "matrix.build-net-tokio && !matrix.coverage"
         run: cargo test --verbose --color always
       - name: Test on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation
         if: matrix.coverage
         run: RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always
+      - name: Test on no_std bullds Rust ${{ matrix.toolchain }}
+        if: "matrix.build-no-std && !matrix.coverage"
+        run: |
+          cd lightning
+          cargo test --verbose --color always --features hashbrown
+          cd ..
+      - name: Test on no_std bullds Rust ${{ matrix.toolchain }} and full code-linking for coverage generation
+        if: "matrix.build-no-std && matrix.coverage"
+        run: |
+          cd lightning
+          RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features hashbrown
+          cd ..
       - name: Test on Rust ${{ matrix.toolchain }}
         if: "! matrix.build-net-tokio"
-        run: cargo test --verbose --color always  -p lightning && cargo test --verbose --color always  -p lightning-invoice && cargo build --verbose  --color always -p lightning-persister
+        run: |
+          cargo test --verbose --color always  -p lightning
+          cargo test --verbose --color always  -p lightning-invoice
+          cargo build --verbose  --color always -p lightning-persister
       - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features
         if: "matrix.build-net-tokio && !matrix.coverage"
         run: |
@@ -83,7 +112,6 @@ jobs:
           cargo test --verbose --color always --features rpc-client
           cargo test --verbose --color always --features rpc-client,rest-client
           cargo test --verbose --color always --features rpc-client,rest-client,tokio
-          cd ..
       - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
         if: matrix.coverage
         run: |
@@ -92,7 +120,6 @@ jobs:
           RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client
           RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client
           RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client,tokio
-          cd ..
       - name: Install deps for kcov
         if: matrix.coverage
         run: |
@@ -157,6 +184,7 @@ jobs:
         run: |
           cd lightning
           RUSTFLAGS="--cfg=require_route_graph_test" cargo test
+          RUSTFLAGS="--cfg=require_route_graph_test" cargo test --features hashbrown
           cd ..
       - name: Run benchmarks on Rust ${{ matrix.toolchain }}
         run: |
@@ -165,7 +193,9 @@ jobs:
   check_commits:
     runs-on: ubuntu-latest
     env:
-      TOOLCHAIN: stable
+      # rustc 1.53 regressed and panics when building our (perfectly fine) docs.
+      # See https://github.com/rust-lang/rust/issues/84738
+      TOOLCHAIN: 1.52.1
     steps:
       - name: Checkout source code
         uses: actions/checkout@v2
index 9222f609cc082ced5a1ac3806e8c9b05c7531379..79c2d92b761ce409cc31fda64d3667bc29bf1cfe 100755 (executable)
@@ -6,3 +6,4 @@ cargo check
 cargo doc
 cargo doc --document-private-items
 cd fuzz && cargo check --features=stdin_fuzz
+cd ../lightning && cargo check --no-default-features --features=no_std
index fbe803b4cf3f12aafd17b5fb7776346f9686570b..c59652ea51716db9c7d24c3d9015c2e73f13833b 100644 (file)
@@ -44,7 +44,9 @@ pub enum ChainTip {
 }
 
 /// The `Validate` trait defines behavior for validating chain data.
-pub(crate) trait Validate {
+///
+/// This trait is sealed and not meant to be implemented outside of this crate.
+pub trait Validate: sealed::Validate {
        /// The validated data wrapper which can be dereferenced to obtain the validated data.
        type T: std::ops::Deref<Target = Self>;
 
@@ -156,16 +158,24 @@ impl std::ops::Deref for ValidatedBlock {
        }
 }
 
+mod sealed {
+       /// Used to prevent implementing [`super::Validate`] outside the crate but still allow its use.
+       pub trait Validate {}
+
+       impl Validate for crate::BlockHeaderData {}
+       impl Validate for bitcoin::blockdata::block::Block {}
+}
+
 /// The canonical `Poll` implementation used for a single `BlockSource`.
 ///
-/// Other `Poll` implementations must be built using `ChainPoller` as it provides the only means of
-/// validating chain data.
-pub struct ChainPoller<B: DerefMut<Target=T> + Sized , T: BlockSource> {
+/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way
+/// of validating chain data and checking consistency.
+pub struct ChainPoller<B: DerefMut<Target=T> + Sized, T: BlockSource> {
        block_source: B,
        network: Network,
 }
 
-impl<B: DerefMut<Target=T> + Sized , T: BlockSource> ChainPoller<B, T> {
+impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
        /// Creates a new poller for the given block source.
        ///
        /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for
index fa6a8eedb0fb434385ebe25b553a2e03a3543675..2ce58f296f9b26b06ec4c92d8b634a57f680d5b3 100644 (file)
@@ -60,12 +60,16 @@ const MAX_EXPIRY_TIME: u64 = 60 * 60 * 24 * 356;
 /// Default expiry time as defined by [BOLT 11].
 ///
 /// [BOLT 11]: https://github.com/lightningnetwork/lightning-rfc/blob/master/11-payment-encoding.md
-const DEFAULT_EXPIRY_TIME: u64 = 3600;
+pub const DEFAULT_EXPIRY_TIME: u64 = 3600;
 
 /// Default minimum final CLTV expiry as defined by [BOLT 11].
 ///
+/// Note that this is *not* the same value as rust-lightning's minimum CLTV expiry, which is
+/// provided in [`MIN_FINAL_CLTV_EXPIRY`].
+///
 /// [BOLT 11]: https://github.com/lightningnetwork/lightning-rfc/blob/master/11-payment-encoding.md
-const DEFAULT_MIN_FINAL_CLTV_EXPIRY: u64 = 18;
+/// [`MIN_FINAL_CLTV_EXPIRY`]: lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY
+pub const DEFAULT_MIN_FINAL_CLTV_EXPIRY: u64 = 18;
 
 /// This function is used as a static assert for the size of `SystemTime`. If the crate fails to
 /// compile due to it this indicates that your system uses unexpected bounds for `SystemTime`. You
index d102778a460f20d202fe772920d1c0ee3418e40b..5f5fece0d2ed34e447e2905f9f3bd6d3f42ce102 100644 (file)
@@ -83,7 +83,7 @@ use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use lightning::util::logger::Logger;
 
-use std::{task, thread};
+use std::task;
 use std::net::SocketAddr;
 use std::net::TcpStream as StdTcpStream;
 use std::sync::{Arc, Mutex};
@@ -114,11 +114,6 @@ struct Connection {
        // socket. To wake it up (without otherwise changing its state, we can push a value into this
        // Sender.
        read_waker: mpsc::Sender<()>,
-       // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
-       // are sure we won't call any more read/write PeerManager functions with the same connection.
-       // This is set to true if we're in such a condition (with disconnect checked before with the
-       // top-level mutex held) and false when we can return.
-       block_disconnect_socket: bool,
        read_paused: bool,
        rl_requested_disconnect: bool,
        id: u64,
@@ -153,31 +148,24 @@ impl Connection {
                                } }
                        }
 
-                       macro_rules! prepare_read_write_call {
-                               () => { {
-                                       let mut us_lock = us.lock().unwrap();
-                                       if us_lock.rl_requested_disconnect {
-                                               shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection);
-                                       }
-                                       us_lock.block_disconnect_socket = true;
-                               } }
-                       }
-
-                       let read_paused = us.lock().unwrap().read_paused;
+                       let read_paused = {
+                               let us_lock = us.lock().unwrap();
+                               if us_lock.rl_requested_disconnect {
+                                       shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection);
+                               }
+                               us_lock.read_paused
+                       };
                        tokio::select! {
                                v = write_avail_receiver.recv() => {
                                        assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
-                                       prepare_read_write_call!();
                                        if let Err(e) = peer_manager.write_buffer_space_avail(&mut our_descriptor) {
                                                shutdown_socket!(e, Disconnect::CloseConnection);
                                        }
-                                       us.lock().unwrap().block_disconnect_socket = false;
                                },
                                _ = read_wake_receiver.recv() => {},
                                read = reader.read(&mut buf), if !read_paused => match read {
                                        Ok(0) => shutdown_socket!("Connection closed", Disconnect::PeerDisconnected),
                                        Ok(len) => {
-                                               prepare_read_write_call!();
                                                let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
                                                let mut us_lock = us.lock().unwrap();
                                                match read_res {
@@ -188,7 +176,6 @@ impl Connection {
                                                        },
                                                        Err(e) => shutdown_socket!(e, Disconnect::CloseConnection),
                                                }
-                                               us_lock.block_disconnect_socket = false;
                                        },
                                        Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected),
                                },
@@ -223,7 +210,7 @@ impl Connection {
                (reader, write_receiver, read_receiver,
                Arc::new(Mutex::new(Self {
                        writer: Some(writer), write_avail, read_waker, read_paused: false,
-                       block_disconnect_socket: false, rl_requested_disconnect: false,
+                       rl_requested_disconnect: false,
                        id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
                })))
        }
@@ -450,18 +437,10 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
        }
 
        fn disconnect_socket(&mut self) {
-               {
-                       let mut us = self.conn.lock().unwrap();
-                       us.rl_requested_disconnect = true;
-                       us.read_paused = true;
-                       // Wake up the sending thread, assuming it is still alive
-                       let _ = us.write_avail.try_send(());
-                       // Happy-path return:
-                       if !us.block_disconnect_socket { return; }
-               }
-               while self.conn.lock().unwrap().block_disconnect_socket {
-                       thread::yield_now();
-               }
+               let mut us = self.conn.lock().unwrap();
+               us.rl_requested_disconnect = true;
+               // Wake up the sending thread, assuming it is still alive
+               let _ = us.write_avail.try_send(());
        }
 }
 impl Clone for SocketDescriptor {
index affe46e3d03e2606bc58ea2a74c9b8e7feb4bbc5..71d85872398037881d5cc46c3f0ea8316b7c247c 100644 (file)
@@ -25,10 +25,12 @@ max_level_debug = []
 # This is unsafe to use in production because it may result in the counterparty publishing taking our funds.
 unsafe_revoked_tx_signing = []
 unstable = []
+no_std = ["hashbrown"]
 
 [dependencies]
 bitcoin = "0.26"
 
+hashbrown = { version = "0.11", optional = true }
 hex = { version = "0.3", optional = true }
 regex = { version = "0.1.80", optional = true }
 
index a3365f2b64fd58302eef8e292961e53384743896..23fc42f54a4fc16c43d149f4e117ff42ec46d623 100644 (file)
@@ -38,7 +38,6 @@ use util::events;
 use util::events::EventHandler;
 
 use prelude::*;
-use std::collections::{HashMap, hash_map};
 use std::sync::RwLock;
 use core::ops::Deref;
 
index a0f5eb71d7b3fdf69ef076183fdb1ccc76ff0fc4..0bd212705d5f14009cf0fe19a3e750f8b3f39d96 100644 (file)
@@ -52,7 +52,6 @@ use util::byte_utils;
 use util::events::Event;
 
 use prelude::*;
-use std::collections::{HashMap, HashSet};
 use core::{cmp, mem};
 use std::io::Error;
 use core::ops::Deref;
index 84d83196472142d4c4334dcba5bf2d1e0ffafb0b..c5ad6f28b263d85dd8559c0df82b5e6e9e91f1e7 100644 (file)
@@ -38,7 +38,6 @@ use ln::chan_utils::{HTLCOutputInCommitment, make_funding_redeemscript, ChannelP
 use ln::msgs::UnsignedChannelAnnouncement;
 
 use prelude::*;
-use std::collections::HashSet;
 use core::sync::atomic::{AtomicUsize, Ordering};
 use std::io::Error;
 use ln::msgs::{DecodeError, MAX_VALUE_MSAT};
index dd4d1d8f0b37f844c8262673cafc6e1b98407585..30493eb56c696ed2f055d477db033a4cecfe8500 100644 (file)
@@ -34,7 +34,6 @@ use util::byte_utils;
 
 use prelude::*;
 use alloc::collections::BTreeMap;
-use std::collections::HashMap;
 use core::cmp;
 use core::ops::Deref;
 use core::mem::replace;
index a9f46b43a4026f650069cb97f9baf1dd62a0d968..c84e3d2d8da24aaeb826cc2e258410d4c7c79c67 100644 (file)
@@ -44,5 +44,12 @@ pub mod ln;
 pub mod routing;
 
 mod prelude {
-       pub use alloc::{vec, vec::Vec, string::String};
-}
\ No newline at end of file
+       #[cfg(feature = "hashbrown")]
+       extern crate hashbrown;
+
+       pub use alloc::{vec, vec::Vec, string::String, collections::VecDeque};
+       #[cfg(not(feature = "hashbrown"))]
+       pub use std::collections::{HashMap, HashSet, hash_map};
+       #[cfg(feature = "hashbrown")]
+       pub use self::hashbrown::{HashMap, HashSet, hash_map};
+}
index e945f05144bae0532623b21250b8868452bdf52d..fdcf7c2de70ee71edb5bd8e3d2ff4dfdafa67606 100644 (file)
@@ -41,7 +41,6 @@ use ln::functional_test_utils::*;
 use util::test_utils;
 
 use prelude::*;
-use std::collections::HashMap;
 use std::sync::{Arc, Mutex};
 
 // If persister_fail is true, we have the persister return a PermanentFailure
index 13a993fd965596dc41e12a387bc178ea31372b35..ee266849b8bd8d813cf3757279ebacc5018dc60c 100644 (file)
@@ -64,7 +64,6 @@ use util::errors::APIError;
 use prelude::*;
 use core::{cmp, mem};
 use core::cell::RefCell;
-use std::collections::{HashMap, hash_map, HashSet};
 use std::io::{Cursor, Read};
 use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
 use core::sync::atomic::{AtomicUsize, Ordering};
index fb0435a0d3d94e313502ff15132a3bb162522444..166e943e66ddb1007dafbab2f49c843e6db63467 100644 (file)
@@ -44,7 +44,6 @@ use core::cell::RefCell;
 use std::rc::Rc;
 use std::sync::{Arc, Mutex};
 use core::mem;
-use std::collections::HashMap;
 
 pub const CHAN_CONFIRM_DEPTH: u32 = 10;
 
index 95d114aab5b6b2b563fb60b2656879b56221adb9..6298f654965f2ddf1fbea336b366315186246646 100644 (file)
@@ -52,7 +52,6 @@ use regex;
 
 use prelude::*;
 use alloc::collections::BTreeSet;
-use std::collections::{HashMap, HashSet};
 use core::default::Default;
 use std::sync::{Arc, Mutex};
 
index 33c4050b374dd21fa4eeb3486bb179ae91c85f4e..5546227eff2e4086a4762de8e71ed9b65418240d 100644 (file)
@@ -32,7 +32,6 @@ use routing::network_graph::NetGraphMsgHandler;
 
 use prelude::*;
 use alloc::collections::LinkedList;
-use std::collections::{HashMap,hash_map,HashSet};
 use std::sync::{Arc, Mutex};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use core::{cmp, hash, fmt, mem};
@@ -161,10 +160,15 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler {
        /// A message handler which handles messages specific to channels. Usually this is just a
-       /// ChannelManager object or a ErroringMessageHandler.
+       /// [`ChannelManager`] object or an [`ErroringMessageHandler`].
+       ///
+       /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
-       /// graph. Usually this is just a NetGraphMsgHandlerMonitor object or an IgnoringMessageHandler.
+       /// graph. Usually this is just a [`NetGraphMsgHandler`] object or an
+       /// [`IgnoringMessageHandler`].
+       ///
+       /// [`NetGraphMsgHandler`]: crate::routing::network_graph::NetGraphMsgHandler
        pub route_handler: RM,
 }
 
@@ -174,32 +178,35 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
 ///
 /// For efficiency, Clone should be relatively cheap for this type.
 ///
-/// You probably want to just extend an int and put a file descriptor in a struct and implement
-/// send_data. Note that if you are using a higher-level net library that may call close() itself,
-/// be careful to ensure you don't have races whereby you might register a new connection with an
-/// fd which is the same as a previous one which has yet to be removed via
-/// PeerManager::socket_disconnected().
+/// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original
+/// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it
+/// having triggered the disconnection or a call to [`PeerManager::socket_disconnected`]), and no
+/// further calls to the [`PeerManager`] related to the original socket occur. This allows you to
+/// use a file descriptor for your SocketDescriptor directly, however for simplicity you may wish
+/// to simply use another value which is guaranteed to be globally unique instead.
 pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
        /// Attempts to send some data from the given slice to the peer.
        ///
        /// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
-       /// Note that in the disconnected case, socket_disconnected must still fire and further write
-       /// attempts may occur until that time.
+       /// Note that in the disconnected case, [`PeerManager::socket_disconnected`] must still be
+       /// called and further write attempts may occur until that time.
        ///
-       /// If the returned size is smaller than data.len(), a write_available event must
-       /// trigger the next time more data can be written. Additionally, until the a send_data event
-       /// completes fully, no further read_events should trigger on the same peer!
+       /// If the returned size is smaller than `data.len()`, a
+       /// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
+       /// written. Additionally, until a `send_data` event completes fully, no further
+       /// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
+       /// prevent denial-of-service issues, you should not read or buffer any data from the socket
+       /// until then.
        ///
-       /// If a read_event on this descriptor had previously returned true (indicating that read
-       /// events should be paused to prevent DoS in the send buffer), resume_read may be set
-       /// indicating that read events on this descriptor should resume. A resume_read of false does
-       /// *not* imply that further read events should be paused.
+       /// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
+       /// (indicating that read events should be paused to prevent DoS in the send buffer),
+       /// `resume_read` may be set indicating that read events on this descriptor should resume. A
+       /// `resume_read` of false carries no meaning, and should not cause any action.
        fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
-       /// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no
-       /// more calls to write_buffer_space_avail, read_event or socket_disconnected may be made with
-       /// this descriptor. No socket_disconnected call should be generated as a result of this call,
-       /// though races may occur whereby disconnect_socket is called after a call to
-       /// socket_disconnected but prior to socket_disconnected returning.
+       /// Disconnect the socket pointed to by this SocketDescriptor.
+       ///
+       /// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
+       /// call (doing so is a noop).
        fn disconnect_socket(&mut self);
 }
 
@@ -234,6 +241,15 @@ enum InitSyncTracker{
        NodesSyncing(PublicKey),
 }
 
+/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
+/// we have fewer than this many messages in the outbound buffer again.
+/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
+/// refilled as we send bytes.
+const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
+/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
+/// the peer.
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
+
 struct Peer {
        channel_encryptor: PeerChannelEncryptor,
        their_node_id: Option<PublicKey>,
@@ -279,9 +295,6 @@ impl Peer {
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
        peers: HashMap<Descriptor, Peer>,
-       /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
-       /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
-       peers_needing_send: HashSet<Descriptor>,
        /// Only add to this set when noise completes:
        node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
 }
@@ -307,14 +320,25 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>;
 
-/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
-/// events into messages which it passes on to its MessageHandlers.
+/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
+/// socket events into messages which it passes on to its [`MessageHandler`].
+///
+/// Locks are taken internally, so you must never assume that reentrancy from a
+/// [`SocketDescriptor`] call back into [`PeerManager`] methods will not deadlock.
+///
+/// Calls to [`read_event`] will decode relevant messages and pass them to the
+/// [`ChannelMessageHandler`], likely doing message processing in-line. Thus, the primary form of
+/// parallelism in Rust-Lightning is in calls to [`read_event`]. Note, however, that calls to any
+/// [`PeerManager`] functions related to the same connection must occur only in serial, making new
+/// calls only after previous ones have returned.
 ///
 /// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager
 /// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but
 /// essentially you should default to using a SimpleRefPeerManager, and use a
 /// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
 /// you're using lightning-net-tokio.
+///
+/// [`read_event`]: PeerManager::read_event
 pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
@@ -395,8 +419,6 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor,
        }
 }
 
-/// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
-/// PeerIds may repeat, but only after socket_disconnected() has been called.
 impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<Descriptor, CM, RM, L> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
@@ -412,7 +434,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        message_handler,
                        peers: Mutex::new(PeerHolder {
                                peers: HashMap::new(),
-                               peers_needing_send: HashSet::new(),
                                node_id_to_descriptor: HashMap::new()
                        }),
                        our_node_secret,
@@ -457,8 +478,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        ///
        /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
-       /// Panics if descriptor is duplicative with some other descriptor which has not yet had a
-       /// socket_disconnected().
+       /// Panics if descriptor is duplicative with some other descriptor which has not yet been
+       /// [`socket_disconnected()`].
+       ///
+       /// [`socket_disconnected()`]: PeerManager::socket_disconnected
        pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor) -> Result<Vec<u8>, PeerHandleError> {
                let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
                let res = peer_encryptor.get_act_one().to_vec();
@@ -494,8 +517,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        /// call socket_disconnected for the new descriptor but must disconnect the connection
        /// immediately.
        ///
-       /// Panics if descriptor is duplicative with some other descriptor which has not yet had
-       /// socket_disconnected called.
+       /// Panics if descriptor is duplicative with some other descriptor which has not yet been
+       /// [`socket_disconnected()`].
+       ///
+       /// [`socket_disconnected()`]: PeerManager::socket_disconnected
        pub fn new_inbound_connection(&self, descriptor: Descriptor) -> Result<(), PeerHandleError> {
                let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret);
                let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
@@ -532,13 +557,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                }
                        }
                }
-               const MSG_BUFF_SIZE: usize = 10;
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
+                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
-                                               let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+                                               let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
                                                for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
                                                        encode_and_send_msg!(announce);
@@ -555,7 +579,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
-                                               let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
                                                for msg in all_messages.iter() {
                                                        encode_and_send_msg!(msg);
@@ -567,7 +591,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        },
                                        InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
                                        InitSyncTracker::NodesSyncing(key) => {
-                                               let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
                                                for msg in all_messages.iter() {
                                                        encode_and_send_msg!(msg);
@@ -586,7 +610,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        Some(buff) => buff,
                                };
 
-                               let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
+                               let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
                                let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
                                let data_sent = descriptor.send_data(pending, should_be_reading);
                                peer.pending_outbound_buffer_first_msg_offset += data_sent;
@@ -604,16 +628,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        ///
        /// May return an Err to indicate that the connection should be closed.
        ///
-       /// Will most likely call send_data on the descriptor passed in (or the descriptor handed into
-       /// new_*\_connection) before returning. Thus, be very careful with reentrancy issues! The
-       /// invariants around calling write_buffer_space_avail in case a write did not fully complete
-       /// must still hold - be ready to call write_buffer_space_avail again if a write call generated
-       /// here isn't sufficient! Panics if the descriptor was not previously registered in a
-       /// new_\*_connection event.
+       /// May call [`send_data`] on the descriptor passed in (or an equal descriptor) before
+       /// returning. Thus, be very careful with reentrancy issues! The invariants around calling
+       /// [`write_buffer_space_avail`] in case a write did not fully complete must still hold - be
+       /// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
+       /// sufficient!
+       ///
+       /// [`send_data`]: SocketDescriptor::send_data
+       /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
        pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
                let mut peers = self.peers.lock().unwrap();
                match peers.peers.get_mut(descriptor) {
-                       None => panic!("Descriptor for write_event is not already known to PeerManager"),
+                       None => {
+                               // This is most likely a simple race condition where the user found that the socket
+                               // was writeable, then we told the user to `disconnect_socket()`, then they called
+                               // this method. Return an error to make sure we get disconnected.
+                               return Err(PeerHandleError { no_connection_possible: false });
+                       },
                        Some(peer) => {
                                peer.awaiting_write_event = false;
                                self.do_attempt_write_data(descriptor, peer);
@@ -626,14 +657,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        ///
        /// May return an Err to indicate that the connection should be closed.
        ///
-       /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
-       /// Thus, however, you almost certainly want to call process_events() after any read_event to
-       /// generate send_data calls to handle responses.
+       /// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
+       /// Thus, however, you should call [`process_events`] after any `read_event` to generate
+       /// [`send_data`] calls to handle responses.
        ///
-       /// If Ok(true) is returned, further read_events should not be triggered until a send_data call
-       /// on this file descriptor has resume_read set (preventing DoS issues in the send buffer).
+       /// If `Ok(true)` is returned, further read_events should not be triggered until a
+       /// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
+       /// send buffer).
        ///
-       /// Panics if the descriptor was not previously registered in a new_*_connection event.
+       /// [`send_data`]: SocketDescriptor::send_data
+       /// [`process_events`]: PeerManager::process_events
        pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
                match self.do_read_event(peer_descriptor, data) {
                        Ok(res) => Ok(res),
@@ -645,22 +678,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        }
 
        /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
-       fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, descriptor: Descriptor, message: &M) {
+       fn enqueue_message<M: Encode + Writeable>(&self, peer: &mut Peer, message: &M) {
                let mut buffer = VecWriter(Vec::new());
                wire::write(message, &mut buffer).unwrap(); // crash if the write failed
                let encoded_message = buffer.0;
 
                log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
-               peers_needing_send.insert(descriptor);
        }
 
        fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
                let pause_read = {
                        let mut peers_lock = self.peers.lock().unwrap();
                        let peers = &mut *peers_lock;
+                       let mut msgs_to_forward = Vec::new();
+                       let mut peer_node_id = None;
                        let pause_read = match peers.peers.get_mut(peer_descriptor) {
-                               None => panic!("Descriptor for read_event is not already known to PeerManager"),
+                               None => {
+                                       // This is most likely a simple race condition where the user read some bytes
+                                       // from the socket, then we told the user to `disconnect_socket()`, then they
+                                       // called this method. Return an error to make sure we get disconnected.
+                                       return Err(PeerHandleError { no_connection_possible: false });
+                               },
                                Some(peer) => {
                                        assert!(peer.pending_read_buffer.len() > 0);
                                        assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
@@ -694,7 +733,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                                                },
                                                                                                msgs::ErrorAction::SendErrorMessage { msg } => {
                                                                                                        log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
-                                                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg);
+                                                                                                       self.enqueue_message(peer, &msg);
                                                                                                        continue;
                                                                                                },
                                                                                        }
@@ -736,7 +775,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                        insert_node_id!();
                                                                        let features = InitFeatures::known();
                                                                        let resp = msgs::Init { features };
-                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
+                                                                       self.enqueue_message(peer, &resp);
                                                                },
                                                                NextNoiseStep::ActThree => {
                                                                        let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
@@ -746,7 +785,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                        insert_node_id!();
                                                                        let features = InitFeatures::known();
                                                                        let resp = msgs::Init { features };
-                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
+                                                                       self.enqueue_message(peer, &resp);
                                                                },
                                                                NextNoiseStep::NoiseComplete => {
                                                                        if peer.pending_read_is_header {
@@ -794,13 +833,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                                        }
                                                                                };
 
-                                                                               if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
-                                                                                       match handling_error {
+                                                                               match self.handle_message(peer, message) {
+                                                                                       Err(handling_error) => match handling_error {
                                                                                                MessageHandlingError::PeerHandleError(e) => { return Err(e) },
                                                                                                MessageHandlingError::LightningError(e) => {
                                                                                                        try_potential_handleerror!(Err(e));
                                                                                                },
-                                                                                       }
+                                                                                       },
+                                                                                       Ok(Some(msg)) => {
+                                                                                               peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set"));
+                                                                                               msgs_to_forward.push(msg);
+                                                                                       },
+                                                                                       Ok(None) => {},
                                                                                }
                                                                        }
                                                                }
@@ -808,12 +852,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                }
                                        }
 
-                                       self.do_attempt_write_data(peer_descriptor, peer);
-
-                                       peer.pending_outbound_buffer.len() > 10 // pause_read
+                                       peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read
                                }
                        };
 
+                       for msg in msgs_to_forward.drain(..) {
+                               self.forward_broadcast_msg(peers, &msg, peer_node_id.as_ref());
+                       }
+
                        pause_read
                };
 
@@ -821,7 +867,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
        }
 
        /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
-       fn handle_message(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> {
+       /// Returns the message back if it needs to be broadcasted to all other peers.
+       fn handle_message(&self, peer: &mut Peer, message: wire::Message) -> Result<Option<wire::Message>, MessageHandlingError> {
                log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
 
                // Need an Init as first message
@@ -831,6 +878,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        return Err(PeerHandleError{ no_connection_possible: false }.into());
                }
 
+               let mut should_forward = None;
+
                match message {
                        // Setup and Control messages:
                        wire::Message::Init(msg) => {
@@ -854,7 +903,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
 
                                if msg.features.initial_routing_sync() {
                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
-                                       peers_needing_send.insert(peer_descriptor.clone());
                                }
                                if !msg.features.supports_static_remote_key() {
                                        log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
@@ -889,7 +937,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        wire::Message::Ping(msg) => {
                                if msg.ponglen < 65532 {
                                        let resp = msgs::Pong { byteslen: msg.ponglen };
-                                       self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+                                       self.enqueue_message(peer, &resp);
                                }
                        },
                        wire::Message::Pong(_msg) => {
@@ -953,34 +1001,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
                        },
                        wire::Message::ChannelAnnouncement(msg) => {
-                               let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {
-                                       Ok(v) => v,
-                                       Err(e) => { return Err(e.into()); },
-                               };
-
-                               if should_forward {
-                                       // TODO: forward msg along to all our other peers!
+                               if self.message_handler.route_handler.handle_channel_announcement(&msg)
+                                               .map_err(|e| -> MessageHandlingError { e.into() })? {
+                                       should_forward = Some(wire::Message::ChannelAnnouncement(msg));
                                }
                        },
                        wire::Message::NodeAnnouncement(msg) => {
-                               let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) {
-                                       Ok(v) => v,
-                                       Err(e) => { return Err(e.into()); },
-                               };
-
-                               if should_forward {
-                                       // TODO: forward msg along to all our other peers!
+                               if self.message_handler.route_handler.handle_node_announcement(&msg)
+                                               .map_err(|e| -> MessageHandlingError { e.into() })? {
+                                       should_forward = Some(wire::Message::NodeAnnouncement(msg));
                                }
                        },
                        wire::Message::ChannelUpdate(msg) => {
                                self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg);
-                               let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
-                                       Ok(v) => v,
-                                       Err(e) => { return Err(e.into()); },
-                               };
-
-                               if should_forward {
-                                       // TODO: forward msg along to all our other peers!
+                               if self.message_handler.route_handler.handle_channel_update(&msg)
+                                               .map_err(|e| -> MessageHandlingError { e.into() })? {
+                                       should_forward = Some(wire::Message::ChannelUpdate(msg));
                                }
                        },
                        wire::Message::QueryShortChannelIds(msg) => {
@@ -1009,12 +1045,83 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
                        }
                };
-               Ok(())
+               Ok(should_forward)
+       }
+
+       fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message, except_node: Option<&PublicKey>) {
+               match msg {
+                       wire::Message::ChannelAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (_, peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
+                                               continue
+                                       }
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                               continue;
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
+                                          peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                               }
+                       },
+                       wire::Message::NodeAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (_, peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_node_announcement(msg.contents.node_id) {
+                                               continue
+                                       }
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                               continue;
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                               }
+                       },
+                       wire::Message::ChannelUpdate(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (_, peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
+                                               continue
+                                       }
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                               }
+                       },
+                       _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
+               }
        }
 
        /// Checks for any events generated by our handlers and processes them. Includes sending most
        /// response messages as well as messages generated by calls to handler functions directly (eg
-       /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
+       /// functions like [`ChannelManager::process_pending_htlc_forwards`] or [`send_payment`]).
+       ///
+       /// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
+       /// issues!
+       ///
+       /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
+       /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
+       /// [`send_data`]: SocketDescriptor::send_data
        pub fn process_events(&self) {
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
@@ -1027,24 +1134,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        let peers = &mut *peers_lock;
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
-                                       ($node_id: expr, $handle_no_such_peer: block) => {
+                                       ($node_id: expr) => {
                                                {
-                                                       let descriptor = match peers.node_id_to_descriptor.get($node_id) {
-                                                               Some(descriptor) => descriptor.clone(),
+                                                       match peers.node_id_to_descriptor.get($node_id) {
+                                                               Some(descriptor) => match peers.peers.get_mut(&descriptor) {
+                                                                       Some(peer) => {
+                                                                               if peer.their_features.is_none() {
+                                                                                       continue;
+                                                                               }
+                                                                               peer
+                                                                       },
+                                                                       None => panic!("Inconsistent peers set state!"),
+                                                               },
                                                                None => {
-                                                                       $handle_no_such_peer;
                                                                        continue;
                                                                },
-                                                       };
-                                                       match peers.peers.get_mut(&descriptor) {
-                                                               Some(peer) => {
-                                                                       if peer.their_features.is_none() {
-                                                                               $handle_no_such_peer;
-                                                                               continue;
-                                                                       }
-                                                                       (descriptor, peer)
-                                                               },
-                                                               None => panic!("Inconsistent peers set state!"),
                                                        }
                                                }
                                        }
@@ -1054,65 +1158,46 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
                                                                log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               // TODO: If the peer is gone we should generate a DiscardFunding event
+                                               // indicating to the wallet that they should just throw away this funding transaction
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                log_trace!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -1121,9 +1206,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                update_fulfill_htlcs.len(),
                                                                update_fail_htlcs.len(),
                                                                log_bytes!(commitment_signed.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                for msg in update_add_htlcs {
                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
@@ -1140,101 +1223,52 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
-                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-                                                       let encoded_update_msg = encode_msg!(update_msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
-                                                                       continue
-                                                               }
-                                                               match peer.their_node_id {
-                                                                       None => continue,
-                                                                       Some(their_node_id) => {
-                                                                               if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
-                                                                                       continue
-                                                                               }
-                                                                       }
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                                       MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
-                                               if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_node_announcement(msg.contents.node_id) {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
                                                }
                                        },
                                        MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
@@ -1244,7 +1278,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                match *action {
                                                        msgs::ErrorAction::DisconnectPeer { ref msg } => {
                                                                if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
-                                                                       peers.peers_needing_send.remove(&descriptor);
                                                                        if let Some(mut peer) = peers.peers.remove(&descriptor) {
                                                                                if let Some(ref msg) = *msg {
                                                                                        log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
@@ -1267,23 +1300,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                log_pubkey!(node_id),
                                                                                msg.data);
-                                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                                       //TODO: Do whatever we're gonna do for handling dropped messages
-                                                               });
+                                                               let peer = get_peer_for_forwarding!(node_id);
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                                               self.do_attempt_write_data(&mut descriptor, peer);
                                                        },
                                                }
                                        },
                                        MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        }
                                        MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
                                                log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
@@ -1292,40 +1320,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                        msg.first_blocknum,
                                                        msg.number_of_blocks,
                                                        msg.sync_complete);
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
-                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        }
                                }
                        }
 
-                       for mut descriptor in peers.peers_needing_send.drain() {
-                               match peers.peers.get_mut(&descriptor) {
-                                       Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
-                                       None => panic!("Inconsistent peers set state!"),
-                               }
+                       for (descriptor, peer) in peers.peers.iter_mut() {
+                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
                        }
                }
        }
 
        /// Indicates that the given socket descriptor's connection is now closed.
-       ///
-       /// This must only be called if the socket has been disconnected by the peer or your own
-       /// decision to disconnect it and must NOT be called in any case where other parts of this
-       /// library (eg PeerHandleError, explicit disconnect_socket calls) instruct you to disconnect
-       /// the peer.
-       ///
-       /// Panics if the descriptor was not previously registered in a successful new_*_connection event.
        pub fn socket_disconnected(&self, descriptor: &Descriptor) {
                self.disconnect_event_internal(descriptor, false);
        }
 
        fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
                let mut peers = self.peers.lock().unwrap();
-               peers.peers_needing_send.remove(descriptor);
                let peer_option = peers.peers.remove(descriptor);
                match peer_option {
-                       None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
+                       None => {
+                               // This is most likely a simple race condition where the user found that the socket
+                               // was disconnected, then we told the user to `disconnect_socket()`, then they
+                               // called this method. Either way we're disconnected, return.
+                       },
                        Some(peer) => {
                                match peer.their_node_id {
                                        Some(node_id) => {
@@ -1340,38 +1360,41 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
 
        /// Disconnect a peer given its node id.
        ///
-       /// Set no_connection_possible to true to prevent any further connection with this peer,
+       /// Set `no_connection_possible` to true to prevent any further connection with this peer,
        /// force-closing any channels we have with it.
        ///
-       /// If a peer is connected, this will call `disconnect_socket` on the descriptor for the peer,
-       /// so be careful about reentrancy issues.
+       /// If a peer is connected, this will call [`disconnect_socket`] on the descriptor for the
+       /// peer. Thus, be very careful about reentrancy issues.
+       ///
+       /// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
        pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) {
                let mut peers_lock = self.peers.lock().unwrap();
                if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) {
                        log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
                        peers_lock.peers.remove(&descriptor);
-                       peers_lock.peers_needing_send.remove(&descriptor);
                        self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
                        descriptor.disconnect_socket();
                }
        }
 
        /// This function should be called roughly once every 30 seconds.
-       /// It will send pings to each peer and disconnect those which did not respond to the last round of pings.
-
-       /// Will most likely call send_data on all of the registered descriptors, thus, be very careful with reentrancy issues!
+       /// It will send pings to each peer and disconnect those which did not respond to the last
+       /// round of pings.
+       ///
+       /// May call [`send_data`] on all [`SocketDescriptor`]s. Thus, be very careful with reentrancy
+       /// issues!
+       ///
+       /// [`send_data`]: SocketDescriptor::send_data
        pub fn timer_tick_occurred(&self) {
                let mut peers_lock = self.peers.lock().unwrap();
                {
                        let peers = &mut *peers_lock;
-                       let peers_needing_send = &mut peers.peers_needing_send;
                        let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
                        let peers = &mut peers.peers;
                        let mut descriptors_needing_disconnect = Vec::new();
 
                        peers.retain(|descriptor, peer| {
                                if peer.awaiting_pong {
-                                       peers_needing_send.remove(descriptor);
                                        descriptors_needing_disconnect.push(descriptor.clone());
                                        match peer.their_node_id {
                                                Some(node_id) => {
@@ -1495,7 +1518,9 @@ mod tests {
                let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
                peer_a.new_inbound_connection(fd_a.clone()).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+               peer_a.process_events();
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               peer_b.process_events();
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                (fd_a.clone(), fd_b.clone())
        }
@@ -1534,10 +1559,12 @@ mod tests {
 
                // peers[0] awaiting_pong is set to true, but the Peer is still connected
                peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
                // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
                peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
 
@@ -1558,7 +1585,9 @@ mod tests {
                let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
 
                // Make each peer to read the messages that the other peer just wrote to them.
+               peers[0].process_events();
                peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
+               peers[1].process_events();
                peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
 
                // Check that each peer has received the expected number of channel updates and channel
index fb316caf2ef1008012ca57ffc8722ee3e9e412ef..9946cc24a3cb2485597dee37cac9c22d1efa3fa1 100644 (file)
@@ -23,7 +23,6 @@ use bitcoin::blockdata::block::{Block, BlockHeader};
 use bitcoin::hash_types::BlockHash;
 
 use prelude::*;
-use std::collections::HashMap;
 use core::mem;
 
 use ln::functional_test_utils::*;
index 9f4b8ac3589a27ef7c713751eb1d98ad18596978..0edeef0c3cb3d9629ddcfd4445dbf2f1fccf3af4 100644 (file)
@@ -24,7 +24,6 @@ use util::logger::Logger;
 use prelude::*;
 use alloc::collections::BinaryHeap;
 use core::cmp;
-use std::collections::HashMap;
 use core::ops::Deref;
 
 /// A hop in a route
@@ -3843,6 +3842,7 @@ mod tests {
                }
        }
 
+       #[cfg(not(feature = "no_std"))]
        pub(super) fn random_init_seed() -> u64 {
                // Because the default HashMap in std pulls OS randomness, we can use it as a (bad) RNG.
                use core::hash::{BuildHasher, Hasher};
@@ -3850,9 +3850,11 @@ mod tests {
                println!("Using seed of {}", seed);
                seed
        }
+       #[cfg(not(feature = "no_std"))]
        use util::ser::Readable;
 
        #[test]
+       #[cfg(not(feature = "no_std"))]
        fn generate_routes() {
                let mut d = match super::test_utils::get_route_file() {
                        Ok(f) => f,
@@ -3880,6 +3882,7 @@ mod tests {
        }
 
        #[test]
+       #[cfg(not(feature = "no_std"))]
        fn generate_routes_mpp() {
                let mut d = match super::test_utils::get_route_file() {
                        Ok(f) => f,
@@ -3907,7 +3910,7 @@ mod tests {
        }
 }
 
-#[cfg(test)]
+#[cfg(all(test, not(feature = "no_std")))]
 pub(crate) mod test_utils {
        use std::fs::File;
        /// Tries to open a network graph file, or panics with a URL to fetch it.
@@ -3934,7 +3937,7 @@ pub(crate) mod test_utils {
        }
 }
 
-#[cfg(all(test, feature = "unstable"))]
+#[cfg(all(test, feature = "unstable", not(feature = "no_std")))]
 mod benches {
        use super::*;
        use util::logger::{Logger, Record};
index c39da1d533e51b7fbc0f94f30b1947040139be54..69c802e7a0ab45a89bc904afa88f784fc8d061d1 100644 (file)
 //! Note this is not part of the specs, but follows lnd's signing and verifying protocol, which can is defined as follows:
 //!
 //! signature = zbase32(SigRec(sha256d(("Lightning Signed Message:" + msg)))
-//! zbase32 from https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt
+//! zbase32 from <https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt>
 //! SigRec has first byte 31 + recovery id, followed by 64 byte sig.
 //!
 //! This implementation is compatible with both lnd's and c-lightning's
 //!
-//! https://lightning.readthedocs.io/lightning-signmessage.7.html
-//! https://api.lightning.community/#signmessage
+//! <https://lightning.readthedocs.io/lightning-signmessage.7.html>
+//! <https://api.lightning.community/#signmessage>
 
 use prelude::*;
 use crate::util::zbase32;
index 4e69a37d20e9fcc5e0d093804136ef9f90be1747..b02fef275d2c08d644f1f706c8440f3d688d682c 100644 (file)
@@ -12,7 +12,6 @@
 
 use prelude::*;
 use std::io::{Read, Write};
-use std::collections::HashMap;
 use core::hash::Hash;
 use std::sync::Mutex;
 use core::cmp;
index 60aafd9169a600c8ee19414c8b41f3a19db144b9..b4d274f683cb5b414b9a174e781feb978600e815 100644 (file)
@@ -42,7 +42,6 @@ use core::time::Duration;
 use std::sync::{Mutex, Arc};
 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use core::{cmp, mem};
-use std::collections::{HashMap, HashSet, VecDeque};
 use chain::keysinterface::InMemorySigner;
 
 pub struct TestVecWriter(pub Vec<u8>);