From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Wed, 23 Jun 2021 17:22:11 +0000 (+0000) Subject: Merge pull request #950 from TheBlueMatt/2021-06-changelog X-Git-Tag: v0.0.99~17 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=bfd9646092e3c63f07d96df35dd72488af55a176;hp=66726fb508bce61dc5443bf93e8d780bb99f3b75;p=rust-lightning Merge pull request #950 from TheBlueMatt/2021-06-changelog Add a dummy first CHANGELOG entry for future tracking --- diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d24b2be7..85aaac6a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,21 +14,34 @@ jobs: # 1.41.0 is Debian stable 1.41.0, # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, and coverage generation - 1.45.2] + 1.45.2, + # 1.49.0 is MSRV for no_std builds using hashbrown + 1.49.0] include: - toolchain: stable build-net-tokio: true + build-no-std: true - toolchain: stable platform: macos-latest build-net-tokio: true + build-no-std: true - toolchain: stable platform: windows-latest build-net-tokio: true + build-no-std: true - toolchain: beta build-net-tokio: true + build-no-std: true + - toolchain: 1.36.0 + build-no-std: false + - toolchain: 1.41.0 + build-no-std: false - toolchain: 1.45.2 build-net-tokio: true + build-no-std: false coverage: true + - toolchain: 1.49.0 + build-no-std: true runs-on: ${{ matrix.platform }} steps: - name: Checkout source code @@ -47,7 +60,10 @@ jobs: run: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always - name: Build on Rust ${{ matrix.toolchain }} if: "! matrix.build-net-tokio" - run: cargo build --verbose --color always -p lightning && cargo build --verbose --color always -p lightning-invoice && cargo build --verbose --color always -p lightning-persister + run: | + cargo build --verbose --color always -p lightning + cargo build --verbose --color always -p lightning-invoice + cargo build --verbose --color always -p lightning-persister - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features if: "matrix.build-net-tokio && !matrix.coverage" run: | @@ -56,7 +72,6 @@ jobs: cargo build --verbose --color always --features rpc-client cargo build --verbose --color always --features rpc-client,rest-client cargo build --verbose --color always --features rpc-client,rest-client,tokio - cd .. - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation if: matrix.coverage run: | @@ -65,16 +80,30 @@ jobs: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio - cd .. - name: Test on Rust ${{ matrix.toolchain }} with net-tokio if: "matrix.build-net-tokio && !matrix.coverage" run: cargo test --verbose --color always - name: Test on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation if: matrix.coverage run: RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always + - name: Test on no_std bullds Rust ${{ matrix.toolchain }} + if: "matrix.build-no-std && !matrix.coverage" + run: | + cd lightning + cargo test --verbose --color always --features hashbrown + cd .. + - name: Test on no_std bullds Rust ${{ matrix.toolchain }} and full code-linking for coverage generation + if: "matrix.build-no-std && matrix.coverage" + run: | + cd lightning + RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features hashbrown + cd .. - name: Test on Rust ${{ matrix.toolchain }} if: "! matrix.build-net-tokio" - run: cargo test --verbose --color always -p lightning && cargo test --verbose --color always -p lightning-invoice && cargo build --verbose --color always -p lightning-persister + run: | + cargo test --verbose --color always -p lightning + cargo test --verbose --color always -p lightning-invoice + cargo build --verbose --color always -p lightning-persister - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features if: "matrix.build-net-tokio && !matrix.coverage" run: | @@ -83,7 +112,6 @@ jobs: cargo test --verbose --color always --features rpc-client cargo test --verbose --color always --features rpc-client,rest-client cargo test --verbose --color always --features rpc-client,rest-client,tokio - cd .. - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation if: matrix.coverage run: | @@ -92,7 +120,6 @@ jobs: RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client,tokio - cd .. - name: Install deps for kcov if: matrix.coverage run: | @@ -157,6 +184,7 @@ jobs: run: | cd lightning RUSTFLAGS="--cfg=require_route_graph_test" cargo test + RUSTFLAGS="--cfg=require_route_graph_test" cargo test --features hashbrown cd .. - name: Run benchmarks on Rust ${{ matrix.toolchain }} run: | @@ -165,7 +193,9 @@ jobs: check_commits: runs-on: ubuntu-latest env: - TOOLCHAIN: stable + # rustc 1.53 regressed and panics when building our (perfectly fine) docs. + # See https://github.com/rust-lang/rust/issues/84738 + TOOLCHAIN: 1.52.1 steps: - name: Checkout source code uses: actions/checkout@v2 diff --git a/ci/check-compiles.sh b/ci/check-compiles.sh index 9222f609..79c2d92b 100755 --- a/ci/check-compiles.sh +++ b/ci/check-compiles.sh @@ -6,3 +6,4 @@ cargo check cargo doc cargo doc --document-private-items cd fuzz && cargo check --features=stdin_fuzz +cd ../lightning && cargo check --no-default-features --features=no_std diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index fbe803b4..c59652ea 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -44,7 +44,9 @@ pub enum ChainTip { } /// The `Validate` trait defines behavior for validating chain data. -pub(crate) trait Validate { +/// +/// This trait is sealed and not meant to be implemented outside of this crate. +pub trait Validate: sealed::Validate { /// The validated data wrapper which can be dereferenced to obtain the validated data. type T: std::ops::Deref; @@ -156,16 +158,24 @@ impl std::ops::Deref for ValidatedBlock { } } +mod sealed { + /// Used to prevent implementing [`super::Validate`] outside the crate but still allow its use. + pub trait Validate {} + + impl Validate for crate::BlockHeaderData {} + impl Validate for bitcoin::blockdata::block::Block {} +} + /// The canonical `Poll` implementation used for a single `BlockSource`. /// -/// Other `Poll` implementations must be built using `ChainPoller` as it provides the only means of -/// validating chain data. -pub struct ChainPoller + Sized , T: BlockSource> { +/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way +/// of validating chain data and checking consistency. +pub struct ChainPoller + Sized, T: BlockSource> { block_source: B, network: Network, } -impl + Sized , T: BlockSource> ChainPoller { +impl + Sized, T: BlockSource> ChainPoller { /// Creates a new poller for the given block source. /// /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for diff --git a/lightning-invoice/src/lib.rs b/lightning-invoice/src/lib.rs index fa6a8eed..2ce58f29 100644 --- a/lightning-invoice/src/lib.rs +++ b/lightning-invoice/src/lib.rs @@ -60,12 +60,16 @@ const MAX_EXPIRY_TIME: u64 = 60 * 60 * 24 * 356; /// Default expiry time as defined by [BOLT 11]. /// /// [BOLT 11]: https://github.com/lightningnetwork/lightning-rfc/blob/master/11-payment-encoding.md -const DEFAULT_EXPIRY_TIME: u64 = 3600; +pub const DEFAULT_EXPIRY_TIME: u64 = 3600; /// Default minimum final CLTV expiry as defined by [BOLT 11]. /// +/// Note that this is *not* the same value as rust-lightning's minimum CLTV expiry, which is +/// provided in [`MIN_FINAL_CLTV_EXPIRY`]. +/// /// [BOLT 11]: https://github.com/lightningnetwork/lightning-rfc/blob/master/11-payment-encoding.md -const DEFAULT_MIN_FINAL_CLTV_EXPIRY: u64 = 18; +/// [`MIN_FINAL_CLTV_EXPIRY`]: lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY +pub const DEFAULT_MIN_FINAL_CLTV_EXPIRY: u64 = 18; /// This function is used as a static assert for the size of `SystemTime`. If the crate fails to /// compile due to it this indicates that your system uses unexpected bounds for `SystemTime`. You diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index d102778a..5f5fece0 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -83,7 +83,7 @@ use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; -use std::{task, thread}; +use std::task; use std::net::SocketAddr; use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex}; @@ -114,11 +114,6 @@ struct Connection { // socket. To wake it up (without otherwise changing its state, we can push a value into this // Sender. read_waker: mpsc::Sender<()>, - // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we - // are sure we won't call any more read/write PeerManager functions with the same connection. - // This is set to true if we're in such a condition (with disconnect checked before with the - // top-level mutex held) and false when we can return. - block_disconnect_socket: bool, read_paused: bool, rl_requested_disconnect: bool, id: u64, @@ -153,31 +148,24 @@ impl Connection { } } } - macro_rules! prepare_read_write_call { - () => { { - let mut us_lock = us.lock().unwrap(); - if us_lock.rl_requested_disconnect { - shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection); - } - us_lock.block_disconnect_socket = true; - } } - } - - let read_paused = us.lock().unwrap().read_paused; + let read_paused = { + let us_lock = us.lock().unwrap(); + if us_lock.rl_requested_disconnect { + shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection); + } + us_lock.read_paused + }; tokio::select! { v = write_avail_receiver.recv() => { assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc! - prepare_read_write_call!(); if let Err(e) = peer_manager.write_buffer_space_avail(&mut our_descriptor) { shutdown_socket!(e, Disconnect::CloseConnection); } - us.lock().unwrap().block_disconnect_socket = false; }, _ = read_wake_receiver.recv() => {}, read = reader.read(&mut buf), if !read_paused => match read { Ok(0) => shutdown_socket!("Connection closed", Disconnect::PeerDisconnected), Ok(len) => { - prepare_read_write_call!(); let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]); let mut us_lock = us.lock().unwrap(); match read_res { @@ -188,7 +176,6 @@ impl Connection { }, Err(e) => shutdown_socket!(e, Disconnect::CloseConnection), } - us_lock.block_disconnect_socket = false; }, Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected), }, @@ -223,7 +210,7 @@ impl Connection { (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { writer: Some(writer), write_avail, read_waker, read_paused: false, - block_disconnect_socket: false, rl_requested_disconnect: false, + rl_requested_disconnect: false, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) }))) } @@ -450,18 +437,10 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { } fn disconnect_socket(&mut self) { - { - let mut us = self.conn.lock().unwrap(); - us.rl_requested_disconnect = true; - us.read_paused = true; - // Wake up the sending thread, assuming it is still alive - let _ = us.write_avail.try_send(()); - // Happy-path return: - if !us.block_disconnect_socket { return; } - } - while self.conn.lock().unwrap().block_disconnect_socket { - thread::yield_now(); - } + let mut us = self.conn.lock().unwrap(); + us.rl_requested_disconnect = true; + // Wake up the sending thread, assuming it is still alive + let _ = us.write_avail.try_send(()); } } impl Clone for SocketDescriptor { diff --git a/lightning/Cargo.toml b/lightning/Cargo.toml index affe46e3..71d85872 100644 --- a/lightning/Cargo.toml +++ b/lightning/Cargo.toml @@ -25,10 +25,12 @@ max_level_debug = [] # This is unsafe to use in production because it may result in the counterparty publishing taking our funds. unsafe_revoked_tx_signing = [] unstable = [] +no_std = ["hashbrown"] [dependencies] bitcoin = "0.26" +hashbrown = { version = "0.11", optional = true } hex = { version = "0.3", optional = true } regex = { version = "0.1.80", optional = true } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index a3365f2b..23fc42f5 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -38,7 +38,6 @@ use util::events; use util::events::EventHandler; use prelude::*; -use std::collections::{HashMap, hash_map}; use std::sync::RwLock; use core::ops::Deref; diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index a0f5eb71..0bd21270 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -52,7 +52,6 @@ use util::byte_utils; use util::events::Event; use prelude::*; -use std::collections::{HashMap, HashSet}; use core::{cmp, mem}; use std::io::Error; use core::ops::Deref; diff --git a/lightning/src/chain/keysinterface.rs b/lightning/src/chain/keysinterface.rs index 84d83196..c5ad6f28 100644 --- a/lightning/src/chain/keysinterface.rs +++ b/lightning/src/chain/keysinterface.rs @@ -38,7 +38,6 @@ use ln::chan_utils::{HTLCOutputInCommitment, make_funding_redeemscript, ChannelP use ln::msgs::UnsignedChannelAnnouncement; use prelude::*; -use std::collections::HashSet; use core::sync::atomic::{AtomicUsize, Ordering}; use std::io::Error; use ln::msgs::{DecodeError, MAX_VALUE_MSAT}; diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index dd4d1d8f..30493eb5 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -34,7 +34,6 @@ use util::byte_utils; use prelude::*; use alloc::collections::BTreeMap; -use std::collections::HashMap; use core::cmp; use core::ops::Deref; use core::mem::replace; diff --git a/lightning/src/lib.rs b/lightning/src/lib.rs index a9f46b43..c84e3d2d 100644 --- a/lightning/src/lib.rs +++ b/lightning/src/lib.rs @@ -44,5 +44,12 @@ pub mod ln; pub mod routing; mod prelude { - pub use alloc::{vec, vec::Vec, string::String}; -} \ No newline at end of file + #[cfg(feature = "hashbrown")] + extern crate hashbrown; + + pub use alloc::{vec, vec::Vec, string::String, collections::VecDeque}; + #[cfg(not(feature = "hashbrown"))] + pub use std::collections::{HashMap, HashSet, hash_map}; + #[cfg(feature = "hashbrown")] + pub use self::hashbrown::{HashMap, HashSet, hash_map}; +} diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index e945f051..fdcf7c2d 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -41,7 +41,6 @@ use ln::functional_test_utils::*; use util::test_utils; use prelude::*; -use std::collections::HashMap; use std::sync::{Arc, Mutex}; // If persister_fail is true, we have the persister return a PermanentFailure diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 13a993fd..ee266849 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -64,7 +64,6 @@ use util::errors::APIError; use prelude::*; use core::{cmp, mem}; use core::cell::RefCell; -use std::collections::{HashMap, hash_map, HashSet}; use std::io::{Cursor, Read}; use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index fb0435a0..166e943e 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -44,7 +44,6 @@ use core::cell::RefCell; use std::rc::Rc; use std::sync::{Arc, Mutex}; use core::mem; -use std::collections::HashMap; pub const CHAN_CONFIRM_DEPTH: u32 = 10; diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 95d114aa..6298f654 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -52,7 +52,6 @@ use regex; use prelude::*; use alloc::collections::BTreeSet; -use std::collections::{HashMap, HashSet}; use core::default::Default; use std::sync::{Arc, Mutex}; diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 33c4050b..5546227e 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -32,7 +32,6 @@ use routing::network_graph::NetGraphMsgHandler; use prelude::*; use alloc::collections::LinkedList; -use std::collections::{HashMap,hash_map,HashSet}; use std::sync::{Arc, Mutex}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -161,10 +160,15 @@ pub struct MessageHandler where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler { /// A message handler which handles messages specific to channels. Usually this is just a - /// ChannelManager object or a ErroringMessageHandler. + /// [`ChannelManager`] object or an [`ErroringMessageHandler`]. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub chan_handler: CM, /// A message handler which handles messages updating our knowledge of the network channel - /// graph. Usually this is just a NetGraphMsgHandlerMonitor object or an IgnoringMessageHandler. + /// graph. Usually this is just a [`NetGraphMsgHandler`] object or an + /// [`IgnoringMessageHandler`]. + /// + /// [`NetGraphMsgHandler`]: crate::routing::network_graph::NetGraphMsgHandler pub route_handler: RM, } @@ -174,32 +178,35 @@ pub struct MessageHandler where /// /// For efficiency, Clone should be relatively cheap for this type. /// -/// You probably want to just extend an int and put a file descriptor in a struct and implement -/// send_data. Note that if you are using a higher-level net library that may call close() itself, -/// be careful to ensure you don't have races whereby you might register a new connection with an -/// fd which is the same as a previous one which has yet to be removed via -/// PeerManager::socket_disconnected(). +/// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original +/// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it +/// having triggered the disconnection or a call to [`PeerManager::socket_disconnected`]), and no +/// further calls to the [`PeerManager`] related to the original socket occur. This allows you to +/// use a file descriptor for your SocketDescriptor directly, however for simplicity you may wish +/// to simply use another value which is guaranteed to be globally unique instead. pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { /// Attempts to send some data from the given slice to the peer. /// /// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected. - /// Note that in the disconnected case, socket_disconnected must still fire and further write - /// attempts may occur until that time. + /// Note that in the disconnected case, [`PeerManager::socket_disconnected`] must still be + /// called and further write attempts may occur until that time. /// - /// If the returned size is smaller than data.len(), a write_available event must - /// trigger the next time more data can be written. Additionally, until the a send_data event - /// completes fully, no further read_events should trigger on the same peer! + /// If the returned size is smaller than `data.len()`, a + /// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be + /// written. Additionally, until a `send_data` event completes fully, no further + /// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to + /// prevent denial-of-service issues, you should not read or buffer any data from the socket + /// until then. /// - /// If a read_event on this descriptor had previously returned true (indicating that read - /// events should be paused to prevent DoS in the send buffer), resume_read may be set - /// indicating that read events on this descriptor should resume. A resume_read of false does - /// *not* imply that further read events should be paused. + /// If a [`PeerManager::read_event`] call on this descriptor had previously returned true + /// (indicating that read events should be paused to prevent DoS in the send buffer), + /// `resume_read` may be set indicating that read events on this descriptor should resume. A + /// `resume_read` of false carries no meaning, and should not cause any action. fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize; - /// Disconnect the socket pointed to by this SocketDescriptor. Once this function returns, no - /// more calls to write_buffer_space_avail, read_event or socket_disconnected may be made with - /// this descriptor. No socket_disconnected call should be generated as a result of this call, - /// though races may occur whereby disconnect_socket is called after a call to - /// socket_disconnected but prior to socket_disconnected returning. + /// Disconnect the socket pointed to by this SocketDescriptor. + /// + /// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this + /// call (doing so is a noop). fn disconnect_socket(&mut self); } @@ -234,6 +241,15 @@ enum InitSyncTracker{ NodesSyncing(PublicKey), } +/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until +/// we have fewer than this many messages in the outbound buffer again. +/// We also use this as the target number of outbound gossip messages to keep in the write buffer, +/// refilled as we send bytes. +const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; +/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to +/// the peer. +const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20; + struct Peer { channel_encryptor: PeerChannelEncryptor, their_node_id: Option, @@ -279,9 +295,6 @@ impl Peer { struct PeerHolder { peers: HashMap, - /// Added to by do_read_event for cases where we pushed a message onto the send buffer but - /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events() - peers_needing_send: HashSet, /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } @@ -307,14 +320,25 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>; -/// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket -/// events into messages which it passes on to its MessageHandlers. +/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls +/// socket events into messages which it passes on to its [`MessageHandler`]. +/// +/// Locks are taken internally, so you must never assume that reentrancy from a +/// [`SocketDescriptor`] call back into [`PeerManager`] methods will not deadlock. +/// +/// Calls to [`read_event`] will decode relevant messages and pass them to the +/// [`ChannelMessageHandler`], likely doing message processing in-line. Thus, the primary form of +/// parallelism in Rust-Lightning is in calls to [`read_event`]. Note, however, that calls to any +/// [`PeerManager`] functions related to the same connection must occur only in serial, making new +/// calls only after previous ones have returned. /// /// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager /// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but /// essentially you should default to using a SimpleRefPeerManager, and use a /// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when /// you're using lightning-net-tokio. +/// +/// [`read_event`]: PeerManager::read_event pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, @@ -395,8 +419,6 @@ impl PeerManager PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, @@ -412,7 +434,6 @@ impl PeerManager PeerManager Result, PeerHandleError> { let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key()); let res = peer_encryptor.get_act_one().to_vec(); @@ -494,8 +517,10 @@ impl PeerManager Result<(), PeerHandleError> { let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes @@ -532,13 +557,12 @@ impl PeerManager {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { - let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; + let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps); for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() { encode_and_send_msg!(announce); @@ -555,7 +579,7 @@ impl PeerManager { - let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; + let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); for msg in all_messages.iter() { encode_and_send_msg!(msg); @@ -567,7 +591,7 @@ impl PeerManager unreachable!(), InitSyncTracker::NodesSyncing(key) => { - let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; + let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); for msg in all_messages.iter() { encode_and_send_msg!(msg); @@ -586,7 +610,7 @@ impl PeerManager buff, }; - let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE; + let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; let data_sent = descriptor.send_data(pending, should_be_reading); peer.pending_outbound_buffer_first_msg_offset += data_sent; @@ -604,16 +628,23 @@ impl PeerManager Result<(), PeerHandleError> { let mut peers = self.peers.lock().unwrap(); match peers.peers.get_mut(descriptor) { - None => panic!("Descriptor for write_event is not already known to PeerManager"), + None => { + // This is most likely a simple race condition where the user found that the socket + // was writeable, then we told the user to `disconnect_socket()`, then they called + // this method. Return an error to make sure we get disconnected. + return Err(PeerHandleError { no_connection_possible: false }); + }, Some(peer) => { peer.awaiting_write_event = false; self.do_attempt_write_data(descriptor, peer); @@ -626,14 +657,16 @@ impl PeerManager Result { match self.do_read_event(peer_descriptor, data) { Ok(res) => Ok(res), @@ -645,22 +678,28 @@ impl PeerManager(&self, peers_needing_send: &mut HashSet, peer: &mut Peer, descriptor: Descriptor, message: &M) { + fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::new()); wire::write(message, &mut buffer).unwrap(); // crash if the write failed let encoded_message = buffer.0; log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); - peers_needing_send.insert(descriptor); } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { let pause_read = { let mut peers_lock = self.peers.lock().unwrap(); let peers = &mut *peers_lock; + let mut msgs_to_forward = Vec::new(); + let mut peer_node_id = None; let pause_read = match peers.peers.get_mut(peer_descriptor) { - None => panic!("Descriptor for read_event is not already known to PeerManager"), + None => { + // This is most likely a simple race condition where the user read some bytes + // from the socket, then we told the user to `disconnect_socket()`, then they + // called this method. Return an error to make sure we get disconnected. + return Err(PeerHandleError { no_connection_possible: false }); + }, Some(peer) => { assert!(peer.pending_read_buffer.len() > 0); assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos); @@ -694,7 +733,7 @@ impl PeerManager { log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err); - self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg); + self.enqueue_message(peer, &msg); continue; }, } @@ -736,7 +775,7 @@ impl PeerManager { let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); @@ -746,7 +785,7 @@ impl PeerManager { if peer.pending_read_is_header { @@ -794,13 +833,18 @@ impl PeerManager match handling_error { MessageHandlingError::PeerHandleError(e) => { return Err(e) }, MessageHandlingError::LightningError(e) => { try_potential_handleerror!(Err(e)); }, - } + }, + Ok(Some(msg)) => { + peer_node_id = Some(peer.their_node_id.expect("After noise is complete, their_node_id is always set")); + msgs_to_forward.push(msg); + }, + Ok(None) => {}, } } } @@ -808,12 +852,14 @@ impl PeerManager 10 // pause_read + peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read } }; + for msg in msgs_to_forward.drain(..) { + self.forward_broadcast_msg(peers, &msg, peer_node_id.as_ref()); + } + pause_read }; @@ -821,7 +867,8 @@ impl PeerManager, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> { + /// Returns the message back if it needs to be broadcasted to all other peers. + fn handle_message(&self, peer: &mut Peer, message: wire::Message) -> Result, MessageHandlingError> { log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); // Need an Init as first message @@ -831,6 +878,8 @@ impl PeerManager { @@ -854,7 +903,6 @@ impl PeerManager PeerManager { if msg.ponglen < 65532 { let resp = msgs::Pong { byteslen: msg.ponglen }; - self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp); + self.enqueue_message(peer, &resp); } }, wire::Message::Pong(_msg) => { @@ -953,34 +1001,22 @@ impl PeerManager { - let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) { - Ok(v) => v, - Err(e) => { return Err(e.into()); }, - }; - - if should_forward { - // TODO: forward msg along to all our other peers! + if self.message_handler.route_handler.handle_channel_announcement(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } }, wire::Message::NodeAnnouncement(msg) => { - let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) { - Ok(v) => v, - Err(e) => { return Err(e.into()); }, - }; - - if should_forward { - // TODO: forward msg along to all our other peers! + if self.message_handler.route_handler.handle_node_announcement(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::NodeAnnouncement(msg)); } }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg); - let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) { - Ok(v) => v, - Err(e) => { return Err(e.into()); }, - }; - - if should_forward { - // TODO: forward msg along to all our other peers! + if self.message_handler.route_handler.handle_channel_update(&msg) + .map_err(|e| -> MessageHandlingError { e.into() })? { + should_forward = Some(wire::Message::ChannelUpdate(msg)); } }, wire::Message::QueryShortChannelIds(msg) => { @@ -1009,12 +1045,83 @@ impl PeerManager, msg: &wire::Message, except_node: Option<&PublicKey>) { + match msg { + wire::Message::ChannelAnnouncement(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (_, peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { + continue + } + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + continue; + } + if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) || + peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) { + continue; + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + } + }, + wire::Message::NodeAnnouncement(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (_, peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_node_announcement(msg.contents.node_id) { + continue + } + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + continue; + } + if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) { + continue; + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + } + }, + wire::Message::ChannelUpdate(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (_, peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { + continue + } + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + continue; + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + } + }, + _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), + } } /// Checks for any events generated by our handlers and processes them. Includes sending most /// response messages as well as messages generated by calls to handler functions directly (eg - /// functions like ChannelManager::process_pending_htlc_forward or send_payment). + /// functions like [`ChannelManager::process_pending_htlc_forwards`] or [`send_payment`]). + /// + /// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy + /// issues! + /// + /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment + /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards + /// [`send_data`]: SocketDescriptor::send_data pub fn process_events(&self) { { // TODO: There are some DoS attacks here where you can flood someone's outbound send @@ -1027,24 +1134,21 @@ impl PeerManager { + ($node_id: expr) => { { - let descriptor = match peers.node_id_to_descriptor.get($node_id) { - Some(descriptor) => descriptor.clone(), + match peers.node_id_to_descriptor.get($node_id) { + Some(descriptor) => match peers.peers.get_mut(&descriptor) { + Some(peer) => { + if peer.their_features.is_none() { + continue; + } + peer + }, + None => panic!("Inconsistent peers set state!"), + }, None => { - $handle_no_such_peer; continue; }, - }; - match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { - $handle_no_such_peer; - continue; - } - (descriptor, peer) - }, - None => panic!("Inconsistent peers set state!"), } } } @@ -1054,65 +1158,46 @@ impl PeerManager { log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Drop the pending channel? (or just let it timeout, but that sucks) - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id), log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: generate a DiscardFunding event indicating to the wallet that - //they should just throw away this funding transaction - }); + // TODO: If the peer is gone we should generate a DiscardFunding event + // indicating to the wallet that they should just throw away this funding transaction + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: generate a DiscardFunding event indicating to the wallet that - //they should just throw away this funding transaction - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Do whatever we're gonna do for handling dropped messages - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: generate a DiscardFunding event indicating to the wallet that - //they should just throw away this funding transaction - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_trace!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -1121,9 +1206,7 @@ impl PeerManager PeerManager { log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Do whatever we're gonna do for handling dropped messages - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Do whatever we're gonna do for handling dropped messages - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Do whatever we're gonna do for handling dropped messages - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { - //TODO: Do whatever we're gonna do for handling dropped messages - }); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, - MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); - if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { - let encoded_msg = encode_msg!(msg); - let encoded_update_msg = encode_msg!(update_msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { - continue - } - match peer.their_node_id { - None => continue, - Some(their_node_id) => { - if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 { - continue - } - } - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None); + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None); } }, - MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => { + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler"); - if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() { - let encoded_msg = encode_msg!(msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_node_announcement(msg.contents.node_id) { - continue - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None); } }, - MessageSendEvent::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { msg } => { log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); - if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { - let encoded_msg = encode_msg!(msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { - continue - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None); } }, MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => { @@ -1244,7 +1278,6 @@ impl PeerManager { if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { - peers.peers_needing_send.remove(&descriptor); if let Some(mut peer) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", @@ -1267,23 +1300,18 @@ impl PeerManager { - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + let peer = get_peer_for_forwarding!(node_id); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); - self.do_attempt_write_data(&mut descriptor, peer); } MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", @@ -1292,40 +1320,32 @@ impl PeerManager self.do_attempt_write_data(&mut descriptor, peer), - None => panic!("Inconsistent peers set state!"), - } + for (descriptor, peer) in peers.peers.iter_mut() { + self.do_attempt_write_data(&mut (*descriptor).clone(), peer); } } } /// Indicates that the given socket descriptor's connection is now closed. - /// - /// This must only be called if the socket has been disconnected by the peer or your own - /// decision to disconnect it and must NOT be called in any case where other parts of this - /// library (eg PeerHandleError, explicit disconnect_socket calls) instruct you to disconnect - /// the peer. - /// - /// Panics if the descriptor was not previously registered in a successful new_*_connection event. pub fn socket_disconnected(&self, descriptor: &Descriptor) { self.disconnect_event_internal(descriptor, false); } fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.lock().unwrap(); - peers.peers_needing_send.remove(descriptor); let peer_option = peers.peers.remove(descriptor); match peer_option { - None => panic!("Descriptor for disconnect_event is not already known to PeerManager"), + None => { + // This is most likely a simple race condition where the user found that the socket + // was disconnected, then we told the user to `disconnect_socket()`, then they + // called this method. Either way we're disconnected, return. + }, Some(peer) => { match peer.their_node_id { Some(node_id) => { @@ -1340,38 +1360,41 @@ impl PeerManager { @@ -1495,7 +1518,9 @@ mod tests { let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap(); peer_a.new_inbound_connection(fd_a.clone()).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); + peer_a.process_events(); assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); + peer_b.process_events(); assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); (fd_a.clone(), fd_b.clone()) } @@ -1534,10 +1559,12 @@ mod tests { // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); + peers[0].process_events(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); + peers[0].process_events(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); } @@ -1558,7 +1585,9 @@ mod tests { let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); // Make each peer to read the messages that the other peer just wrote to them. + peers[0].process_events(); peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(); + peers[1].process_events(); peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(); // Check that each peer has received the expected number of channel updates and channel diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index fb316caf..9946cc24 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -23,7 +23,6 @@ use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; use prelude::*; -use std::collections::HashMap; use core::mem; use ln::functional_test_utils::*; diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 9f4b8ac3..0edeef0c 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -24,7 +24,6 @@ use util::logger::Logger; use prelude::*; use alloc::collections::BinaryHeap; use core::cmp; -use std::collections::HashMap; use core::ops::Deref; /// A hop in a route @@ -3843,6 +3842,7 @@ mod tests { } } + #[cfg(not(feature = "no_std"))] pub(super) fn random_init_seed() -> u64 { // Because the default HashMap in std pulls OS randomness, we can use it as a (bad) RNG. use core::hash::{BuildHasher, Hasher}; @@ -3850,9 +3850,11 @@ mod tests { println!("Using seed of {}", seed); seed } + #[cfg(not(feature = "no_std"))] use util::ser::Readable; #[test] + #[cfg(not(feature = "no_std"))] fn generate_routes() { let mut d = match super::test_utils::get_route_file() { Ok(f) => f, @@ -3880,6 +3882,7 @@ mod tests { } #[test] + #[cfg(not(feature = "no_std"))] fn generate_routes_mpp() { let mut d = match super::test_utils::get_route_file() { Ok(f) => f, @@ -3907,7 +3910,7 @@ mod tests { } } -#[cfg(test)] +#[cfg(all(test, not(feature = "no_std")))] pub(crate) mod test_utils { use std::fs::File; /// Tries to open a network graph file, or panics with a URL to fetch it. @@ -3934,7 +3937,7 @@ pub(crate) mod test_utils { } } -#[cfg(all(test, feature = "unstable"))] +#[cfg(all(test, feature = "unstable", not(feature = "no_std")))] mod benches { use super::*; use util::logger::{Logger, Record}; diff --git a/lightning/src/util/message_signing.rs b/lightning/src/util/message_signing.rs index c39da1d5..69c802e7 100644 --- a/lightning/src/util/message_signing.rs +++ b/lightning/src/util/message_signing.rs @@ -12,13 +12,13 @@ //! Note this is not part of the specs, but follows lnd's signing and verifying protocol, which can is defined as follows: //! //! signature = zbase32(SigRec(sha256d(("Lightning Signed Message:" + msg))) -//! zbase32 from https://philzimmermann.com/docs/human-oriented-base-32-encoding.txt +//! zbase32 from //! SigRec has first byte 31 + recovery id, followed by 64 byte sig. //! //! This implementation is compatible with both lnd's and c-lightning's //! -//! https://lightning.readthedocs.io/lightning-signmessage.7.html -//! https://api.lightning.community/#signmessage +//! +//! use prelude::*; use crate::util::zbase32; diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 4e69a37d..b02fef27 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -12,7 +12,6 @@ use prelude::*; use std::io::{Read, Write}; -use std::collections::HashMap; use core::hash::Hash; use std::sync::Mutex; use core::cmp; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 60aafd91..b4d274f6 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -42,7 +42,6 @@ use core::time::Duration; use std::sync::{Mutex, Arc}; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use core::{cmp, mem}; -use std::collections::{HashMap, HashSet, VecDeque}; use chain::keysinterface::InMemorySigner; pub struct TestVecWriter(pub Vec);