Merge pull request #537 from TheBlueMatt/2020-03-data-loss-spec-550
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 17 Mar 2020 18:49:06 +0000 (18:49 +0000)
committerGitHub <noreply@github.com>
Tue, 17 Mar 2020 18:49:06 +0000 (18:49 +0000)
Update pre-HTLC DataLossProtect to match new spec changes

17 files changed:
fuzz/src/full_stack.rs
fuzz/src/router.rs
lightning-net-tokio/Cargo.toml
lightning-net-tokio/src/lib.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/features.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/msgs.rs
lightning/src/ln/onchaintx.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/router.rs
lightning/src/ln/wire.rs
lightning/src/util/config.rs
lightning/src/util/events.rs

index ab63162f54e8166415b12b2a1159bb801232edbc..3ae4e56b5f1903093d621d6f91327fcc58d79b5f 100644 (file)
@@ -99,7 +99,7 @@ impl FeeEstimator for FuzzEstimator {
                //TODO: We should actually be testing at least much more than 64k...
                match self.input.get_slice(2) {
                        Some(slice) => cmp::max(slice_to_be16(slice) as u64, 253),
-                       None => 0
+                       None => 253
                }
        }
 }
@@ -384,7 +384,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
                        3 => {
                                let peer_id = get_slice!(1)[0];
                                if !peers.borrow()[peer_id as usize] { return; }
-                               match loss_detector.handler.read_event(&mut Peer{id: peer_id, peers_connected: &peers}, get_slice!(get_slice!(1)[0]).to_vec()) {
+                               match loss_detector.handler.read_event(&mut Peer{id: peer_id, peers_connected: &peers}, get_slice!(get_slice!(1)[0])) {
                                        Ok(res) => assert!(!res),
                                        Err(_) => { peers.borrow_mut()[peer_id as usize] = false; }
                                }
index b0d7b6031486039e98f974065102b1010a5d0677..b1a766ba79149b5640a080c55a625eb338424a03 100644 (file)
@@ -124,7 +124,6 @@ pub fn do_test(data: &[u8]) {
                                        msgs::DecodeError::UnknownVersion => return,
                                        msgs::DecodeError::UnknownRequiredFeature => return,
                                        msgs::DecodeError::InvalidValue => return,
-                                       msgs::DecodeError::ExtraAddressesPerType => return,
                                        msgs::DecodeError::BadLengthDescriptor => return,
                                        msgs::DecodeError::ShortRead => panic!("We picked the length..."),
                                        msgs::DecodeError::Io(e) => panic!(format!("{}", e)),
index 1bd9805a7c901bdb609e82f43bb6ae8c7d04013b..9c84b1353aeb460f7dc9821fa6b3f76dfad5a785 100644 (file)
@@ -1,11 +1,12 @@
 [package]
 name = "lightning-net-tokio"
-version = "0.0.2"
+version = "0.0.3"
 authors = ["Matt Corallo"]
 license = "Apache-2.0"
+edition = "2018"
 description = """
 Implementation of the rust-lightning network stack using Tokio.
-For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the nerequired network stack, especially for those already using Tokio.
+For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the required network stack, especially for those already using Tokio.
 """
 
 [dependencies]
@@ -13,7 +14,7 @@ bitcoin = "0.21"
 bitcoin_hashes = "0.7"
 lightning = { version = "0.0.10", path = "../lightning" }
 secp256k1 = "0.15"
-tokio-codec = "0.1"
-futures = "0.1"
-tokio = "0.1"
-bytes = "0.4"
+tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "sync", "tcp", "time" ] }
+
+[dev-dependencies]
+tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "rt-threaded", "sync", "tcp", "time" ] }
index c2bac324bd3c245490797fb5daf02f3554a56859..94c3eee151164142668507a01d7081bac5c4fa04 100644 (file)
-extern crate bytes;
-extern crate tokio;
-extern crate tokio_codec;
-extern crate futures;
-extern crate lightning;
-extern crate secp256k1;
-
-use bytes::BufMut;
-
-use futures::future;
-use futures::future::Future;
-use futures::{AsyncSink, Stream, Sink};
-use futures::sync::mpsc;
+//! A socket handling library for those running in Tokio environments who wish to use
+//! rust-lightning with native TcpStreams.
+//!
+//! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a
+//! TcpStream and a reference to a PeerManager and the rest is handled", except for the
+//! [Event](../lightning/util/events/enum.Event.html) handlng mechanism, see below.
+//!
+//! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use
+//! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor.
+//!
+//! Three methods are exposed to register a new connection for handling in tokio::spawn calls, see
+//! their individual docs for more. All three take a
+//! [mpsc::Sender<()>](../tokio/sync/mpsc/struct.Sender.html) which is sent into every time
+//! something occurs which may result in lightning [Events](../lightning/util/events/enum.Event.html).
+//! The call site should, thus, look something like this:
+//! ```
+//! use tokio::sync::mpsc;
+//! use tokio::net::TcpStream;
+//! use secp256k1::key::PublicKey;
+//! use lightning::util::events::EventsProvider;
+//! use std::net::SocketAddr;
+//! use std::sync::Arc;
+//!
+//! // Define concrete types for our high-level objects:
+//! type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
+//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
+//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>>;
+//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChannelMonitor, TxBroadcaster, FeeEstimator>;
+//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator>;
+//!
+//! // Connect to node with pubkey their_node_id at addr:
+//! async fn connect_to_node(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
+//!     let (sender, mut receiver) = mpsc::channel(2);
+//!     lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
+//!     loop {
+//!         receiver.recv().await;
+//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
+//!             // Handle the event!
+//!         }
+//!         for _event in channel_monitor.get_and_clear_pending_events().drain(..) {
+//!             // Handle the event!
+//!         }
+//!     }
+//! }
+//!
+//! // Begin reading from a newly accepted socket and talk to the peer:
+//! async fn accept_socket(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, socket: TcpStream) {
+//!     let (sender, mut receiver) = mpsc::channel(2);
+//!     lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
+//!     loop {
+//!         receiver.recv().await;
+//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
+//!             // Handle the event!
+//!         }
+//!         for _event in channel_monitor.get_and_clear_pending_events().drain(..) {
+//!             // Handle the event!
+//!         }
+//!     }
+//! }
+//! ```
 
 use secp256k1::key::PublicKey;
 
-use tokio::timer::Delay;
 use tokio::net::TcpStream;
+use tokio::{io, time};
+use tokio::sync::mpsc;
+use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
 
 use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::msgs::ChannelMessageHandler;
 
-use std::mem;
+use std::{task, thread};
 use std::net::SocketAddr;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard};
 use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::{Duration, Instant};
-use std::vec::Vec;
+use std::time::Duration;
 use std::hash::Hash;
 
 static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
 
-/// A connection to a remote peer. Can be constructed either as a remote connection using
-/// Connection::setup_outbound o
-pub struct Connection {
-       writer: Option<mpsc::Sender<bytes::Bytes>>,
+/// Connection contains all our internal state for a connection - we hold a reference to the
+/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
+/// read future (which is returned by schedule_read).
+struct Connection {
+       writer: Option<io::WriteHalf<TcpStream>>,
        event_notify: mpsc::Sender<()>,
-       pending_read: Vec<u8>,
-       read_blocker: Option<futures::sync::oneshot::Sender<Result<(), ()>>>,
+       // Because our PeerManager is templated by user-provided types, and we can't (as far as I can
+       // tell) have a const RawWakerVTable built out of templated functions, we need some indirection
+       // between being woken up with write-ready and calling PeerManager::write_buffer_spce_avail.
+       // This provides that indirection, with a Sender which gets handed to the PeerManager Arc on
+       // the schedule_read stack.
+       //
+       // An alternative (likely more effecient) approach would involve creating a RawWakerVTable at
+       // runtime with functions templated by the Arc<PeerManager> type, calling
+       // write_buffer_space_avail directly from tokio's write wake, however doing so would require
+       // more unsafe voodo than I really feel like writing.
+       write_avail: mpsc::Sender<()>,
+       // When we are told by rust-lightning to pause read (because we have writes backing up), we do
+       // so by setting read_paused. At that point, the read task will stop reading bytes from the
+       // socket. To wake it up (without otherwise changing its state, we can push a value into this
+       // Sender.
+       read_waker: mpsc::Sender<()>,
+       // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
+       // are sure we won't call any more read/write PeerManager functions with the same connection.
+       // This is set to true if we're in such a condition (with disconnect checked before with the
+       // top-level mutex held) and false when we can return.
+       block_disconnect_socket: bool,
        read_paused: bool,
-       need_disconnect: bool,
+       rl_requested_disconnect: bool,
        id: u64,
 }
 impl Connection {
-       fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
-               let us_ref = us.clone();
-               let us_close_ref = us.clone();
+       fn event_trigger(us: &mut MutexGuard<Self>) {
+               match us.event_notify.try_send(()) {
+                       Ok(_) => {},
+                       Err(mpsc::error::TrySendError::Full(_)) => {
+                               // Ignore full errors as we just need the user to poll after this point, so if they
+                               // haven't received the last send yet, it doesn't matter.
+                       },
+                       _ => panic!()
+               }
+       }
+       async fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) {
                let peer_manager_ref = peer_manager.clone();
-               tokio::spawn(reader.for_each(move |b| {
-                       let pending_read = b.to_vec();
-                       {
-                               let mut lock = us_ref.lock().unwrap();
-                               assert!(lock.pending_read.is_empty());
-                               if lock.read_paused {
-                                       lock.pending_read = pending_read;
-                                       let (sender, blocker) = futures::sync::oneshot::channel();
-                                       lock.read_blocker = Some(sender);
-                                       return future::Either::A(blocker.then(|_| { Ok(()) }));
-                               }
-                       }
-                       //TODO: There's a race where we don't meet the requirements of socket_disconnected if its
-                       //called right here, after we release the us_ref lock in the scope above, but before we
-                       //call read_event!
-                       match peer_manager.read_event(&mut SocketDescriptor::new(us_ref.clone(), peer_manager.clone()), pending_read) {
-                               Ok(pause_read) => {
-                                       if pause_read {
-                                               let mut lock = us_ref.lock().unwrap();
-                                               lock.read_paused = true;
-                                       }
-                               },
-                               Err(e) => {
-                                       us_ref.lock().unwrap().need_disconnect = false;
-                                       return future::Either::B(future::result(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))));
-                               }
+               // 8KB is nice and big but also should never cause any issues with stack overflowing.
+               let mut buf = [0; 8192];
+
+               let mut our_descriptor = SocketDescriptor::new(us.clone());
+               // An enum describing why we did/are disconnecting:
+               enum Disconnect {
+                       // Rust-Lightning told us to disconnect, either by returning an Err or by calling
+                       // SocketDescriptor::disconnect_socket.
+                       // In this case, we do not call peer_manager.socket_disconnected() as Rust-Lightning
+                       // already knows we're disconnected.
+                       CloseConnection,
+                       // The connection was disconnected for some other reason, ie because the socket was
+                       // closed.
+                       // In this case, we do need to call peer_manager.socket_disconnected() to inform
+                       // Rust-Lightning that the socket is gone.
+                       PeerDisconnected
+               };
+               let disconnect_type = loop {
+                       macro_rules! shutdown_socket {
+                               ($err: expr, $need_disconnect: expr) => { {
+                                       println!("Disconnecting peer due to {}!", $err);
+                                       break $need_disconnect;
+                               } }
                        }
 
-                       if let Err(e) = us_ref.lock().unwrap().event_notify.try_send(()) {
-                               // Ignore full errors as we just need them to poll after this point, so if the user
-                               // hasn't received the last send yet, it doesn't matter.
-                               assert!(e.is_full());
+                       macro_rules! prepare_read_write_call {
+                               () => { {
+                                       let mut us_lock = us.lock().unwrap();
+                                       if us_lock.rl_requested_disconnect {
+                                               shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection);
+                                       }
+                                       us_lock.block_disconnect_socket = true;
+                               } }
                        }
 
-                       future::Either::B(future::result(Ok(())))
-               }).then(move |_| {
-                       if us_close_ref.lock().unwrap().need_disconnect {
-                               peer_manager_ref.socket_disconnected(&SocketDescriptor::new(us_close_ref, peer_manager_ref.clone()));
-                               println!("Peer disconnected!");
-                       } else {
-                               println!("We disconnected peer!");
+                       let read_paused = us.lock().unwrap().read_paused;
+                       tokio::select! {
+                               v = write_avail_receiver.recv() => {
+                                       assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
+                                       prepare_read_write_call!();
+                                       if let Err(e) = peer_manager.write_buffer_space_avail(&mut our_descriptor) {
+                                               shutdown_socket!(e, Disconnect::CloseConnection);
+                                       }
+                                       us.lock().unwrap().block_disconnect_socket = false;
+                               },
+                               _ = read_wake_receiver.recv() => {},
+                               read = reader.read(&mut buf), if !read_paused => match read {
+                                       Ok(0) => shutdown_socket!("Connection closed", Disconnect::PeerDisconnected),
+                                       Ok(len) => {
+                                               prepare_read_write_call!();
+                                               let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
+                                               let mut us_lock = us.lock().unwrap();
+                                               match read_res {
+                                                       Ok(pause_read) => {
+                                                               if pause_read {
+                                                                       us_lock.read_paused = true;
+                                                               }
+                                                               Self::event_trigger(&mut us_lock);
+                                                       },
+                                                       Err(e) => shutdown_socket!(e, Disconnect::CloseConnection),
+                                               }
+                                               us_lock.block_disconnect_socket = false;
+                                       },
+                                       Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected),
+                               },
                        }
-                       Ok(())
-               }));
+               };
+               let writer_option = us.lock().unwrap().writer.take();
+               if let Some(mut writer) = writer_option {
+                       // If the socket is already closed, shutdown() will fail, so just ignore it.
+                       let _ = writer.shutdown().await;
+               }
+               if let Disconnect::PeerDisconnected = disconnect_type {
+                       peer_manager_ref.socket_disconnected(&our_descriptor);
+                       Self::event_trigger(&mut us.lock().unwrap());
+               }
        }
 
-       fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>, Arc<Mutex<Self>>) {
-               let (writer, reader) = tokio_codec::Framed::new(stream, tokio_codec::BytesCodec::new()).split();
-               let (send_sink, send_stream) = mpsc::channel(3);
-               tokio::spawn(writer.send_all(send_stream.map_err(|_| -> std::io::Error {
-                       unreachable!();
-               })).then(|_| {
-                       future::result(Ok(()))
-               }));
-               let us = Arc::new(Mutex::new(Self { writer: Some(send_sink), event_notify, pending_read: Vec::new(), read_blocker: None, read_paused: false, need_disconnect: true, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) }));
-
-               (reader, us)
+       fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
+               // We only ever need a channel of depth 1 here: if we returned a non-full write to the
+               // PeerManager, we will eventually get notified that there is room in the socket to write
+               // new bytes, which will generate an event. That event will be popped off the queue before
+               // we call write_buffer_space_avail, ensuring that we have room to push a new () if, during
+               // the write_buffer_space_avail() call, send_data() returns a non-full write.
+               let (write_avail, write_receiver) = mpsc::channel(1);
+               // Similarly here - our only goal is to make sure the reader wakes up at some point after
+               // we shove a value into the channel which comes after we've reset the read_paused bool to
+               // false.
+               let (read_waker, read_receiver) = mpsc::channel(1);
+               let (reader, writer) = io::split(stream);
+
+               (reader, write_receiver, read_receiver,
+               Arc::new(Mutex::new(Self {
+                       writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false,
+                       block_disconnect_socket: false, rl_requested_disconnect: false,
+                       id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
+               })))
        }
+}
+
+/// Process incoming messages and feed outgoing messages on the provided socket generated by
+/// accepting an incoming connection.
+///
+/// The returned future will complete when the peer is disconnected and associated handling
+/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
+/// not need to poll the provided future in order to make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> {
+       let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
+       #[cfg(debug_assertions)]
+       let last_us = Arc::clone(&us);
 
-       /// Process incoming messages and feed outgoing messages on the provided socket generated by
-       /// accepting an incoming connection (by scheduling futures with tokio::spawn).
-       ///
-       /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
-       /// ChannelManager and ChannelMonitor objects.
-       pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
-               let (reader, us) = Self::new(event_notify, stream);
+       let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone())) {
+               Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
+       } else {
+               // Note that we will skip socket_disconnected here, in accordance with the PeerManager
+               // requirements.
+               None
+       };
 
-               if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
-                       Self::schedule_read(peer_manager, us, reader);
+       async move {
+               if let Some(handle) = handle_opt {
+                       if let Err(e) = handle.await {
+                               assert!(e.is_cancelled());
+                       } else {
+                               // This is certainly not guaranteed to always be true - the read loop may exit
+                               // while there are still pending write wakers that need to be woken up after the
+                               // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
+                               // keep too many wakers around, this makes sense. The race should be rare (we do
+                               // some work after shutdown()) and an error would be a major memory leak.
+                               #[cfg(debug_assertions)]
+                               assert!(Arc::try_unwrap(last_us).is_ok());
+                       }
                }
        }
+}
 
-       /// Process incoming messages and feed outgoing messages on the provided socket generated by
-       /// making an outbound connection which is expected to be accepted by a peer with the given
-       /// public key (by scheduling futures with tokio::spawn).
-       ///
-       /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
-       /// ChannelManager and ChannelMonitor objects.
-       pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
-               let (reader, us) = Self::new(event_notify, stream);
-
-               if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
-                       if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, true) == initial_send.len() {
-                               Self::schedule_read(peer_manager, us, reader);
+/// Process incoming messages and feed outgoing messages on the provided socket generated by
+/// making an outbound connection which is expected to be accepted by a peer with the given
+/// public key. The relevant processing is set to run free (via tokio::spawn).
+///
+/// The returned future will complete when the peer is disconnected and associated handling
+/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
+/// not need to poll the provided future in order to make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
+       let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
+       #[cfg(debug_assertions)]
+       let last_us = Arc::clone(&us);
+
+       let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
+               Some(tokio::spawn(async move {
+                       // We should essentially always have enough room in a TCP socket buffer to send the
+                       // initial 10s of bytes. However, tokio running in single-threaded mode will always
+                       // fail writes and wake us back up later to write. Thus, we handle a single
+                       // std::task::Poll::Pending but still expect to write the full set of bytes at once
+                       // and use a relatively tight timeout.
+                       if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
+                               loop {
+                                       match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
+                                               v if v == initial_send.len() => break Ok(()),
+                                               0 => {
+                                                       write_receiver.recv().await;
+                                                       // In theory we could check for if we've been instructed to disconnect
+                                                       // the peer here, but its OK to just skip it - we'll check for it in
+                                                       // schedule_read prior to any relevant calls into RL.
+                                               },
+                                               _ => {
+                                                       eprintln!("Failed to write first full message to socket!");
+                                                       peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
+                                                       break Err(());
+                                               }
+                                       }
+                               }
+                       }).await {
+                               Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
+                       }
+               }))
+       } else {
+               // Note that we will skip socket_disconnected here, in accordance with the PeerManager
+               // requirements.
+               None
+       };
+
+       async move {
+               if let Some(handle) = handle_opt {
+                       if let Err(e) = handle.await {
+                               assert!(e.is_cancelled());
                        } else {
-                               println!("Failed to write first full message to socket!");
+                               // This is certainly not guaranteed to always be true - the read loop may exit
+                               // while there are still pending write wakers that need to be woken up after the
+                               // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
+                               // keep too many wakers around, this makes sense. The race should be rare (we do
+                               // some work after shutdown()) and an error would be a major memory leak.
+                               #[cfg(debug_assertions)]
+                               assert!(Arc::try_unwrap(last_us).is_ok());
                        }
                }
        }
+}
 
-       /// Process incoming messages and feed outgoing messages on a new connection made to the given
-       /// socket address which is expected to be accepted by a peer with the given public key (by
-       /// scheduling futures with tokio::spawn).
-       ///
-       /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
-       /// ChannelManager and ChannelMonitor objects.
-       pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
-               let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
-                       future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
-               });
-               tokio::spawn(TcpStream::connect(&addr).select(connect_timeout)
-                       .and_then(move |stream| {
-                               Connection::setup_outbound(peer_manager, event_notify, their_node_id, stream.0);
-                               future::ok(())
-                       }).or_else(|_| {
-                               //TODO: return errors somehow
-                               future::ok(())
-                       }));
-       }
+/// Process incoming messages and feed outgoing messages on a new connection made to the given
+/// socket address which is expected to be accepted by a peer with the given public key (by
+/// scheduling futures with tokio::spawn).
+///
+/// Shorthand for TcpStream::connect(addr) with a timeout followed by setup_outbound().
+///
+/// Returns a future (as the fn is async) which needs to be polled to complete the connection and
+/// connection setup. That future then returns a future which will complete when the peer is
+/// disconnected and associated handling futures are freed, though, because all processing in said
+/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
+/// make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub async fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> {
+       if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)).await {
+               Some(setup_outbound(peer_manager, event_notify, their_node_id, stream))
+       } else { None }
+}
+
+const SOCK_WAKER_VTABLE: task::RawWakerVTable =
+       task::RawWakerVTable::new(clone_socket_waker, wake_socket_waker, wake_socket_waker_by_ref, drop_socket_waker);
+
+fn clone_socket_waker(orig_ptr: *const ()) -> task::RawWaker {
+       write_avail_to_waker(orig_ptr as *const mpsc::Sender<()>)
+}
+// When waking, an error should be fine. Most likely we got two send_datas in a row, both of which
+// failed to fully write, but we only need to call write_buffer_space_avail() once. Otherwise, the
+// sending thread may have already gone away due to a socket close, in which case there's nothing
+// to wake up anyway.
+fn wake_socket_waker(orig_ptr: *const ()) {
+       let sender = unsafe { &mut *(orig_ptr as *mut mpsc::Sender<()>) };
+       let _ = sender.try_send(());
+       drop_socket_waker(orig_ptr);
+}
+fn wake_socket_waker_by_ref(orig_ptr: *const ()) {
+       let sender_ptr = orig_ptr as *const mpsc::Sender<()>;
+       let mut sender = unsafe { (*sender_ptr).clone() };
+       let _ = sender.try_send(());
+}
+fn drop_socket_waker(orig_ptr: *const ()) {
+       let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut mpsc::Sender<()>) };
+       // _orig_box is now dropped
+}
+fn write_avail_to_waker(sender: *const mpsc::Sender<()>) -> task::RawWaker {
+       let new_box = Box::leak(Box::new(unsafe { (*sender).clone() }));
+       let new_ptr = new_box as *const mpsc::Sender<()>;
+       task::RawWaker::new(new_ptr as *const (), &SOCK_WAKER_VTABLE)
 }
 
-pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
+/// The SocketDescriptor used to refer to sockets by a PeerHandler. This is pub only as it is a
+/// type in the template of PeerHandler.
+pub struct SocketDescriptor {
        conn: Arc<Mutex<Connection>>,
        id: u64,
-       peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
 }
-impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
-       fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
+impl SocketDescriptor {
+       fn new(conn: Arc<Mutex<Connection>>) -> Self {
                let id = conn.lock().unwrap().id;
-               Self { conn, id, peer_manager }
+               Self { conn, id }
        }
 }
-impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
+impl peer_handler::SocketDescriptor for SocketDescriptor {
        fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
-               macro_rules! schedule_read {
-                       ($us_ref: expr) => {
-                               tokio::spawn(future::lazy(move || -> Result<(), ()> {
-                                       let mut read_data = Vec::new();
-                                       {
-                                               let mut us = $us_ref.conn.lock().unwrap();
-                                               mem::swap(&mut read_data, &mut us.pending_read);
-                                       }
-                                       if !read_data.is_empty() {
-                                               let mut us_clone = $us_ref.clone();
-                                               match $us_ref.peer_manager.read_event(&mut us_clone, read_data) {
-                                                       Ok(pause_read) => {
-                                                               if pause_read { return Ok(()); }
-                                                       },
-                                                       Err(_) => {
-                                                               //TODO: Not actually sure how to do this
-                                                               return Ok(());
-                                                       }
-                                               }
-                                       }
-                                       let mut us = $us_ref.conn.lock().unwrap();
-                                       if let Some(sender) = us.read_blocker.take() {
-                                               sender.send(Ok(())).unwrap();
-                                       }
-                                       us.read_paused = false;
-                                       if let Err(e) = us.event_notify.try_send(()) {
-                                               // Ignore full errors as we just need them to poll after this point, so if the user
-                                               // hasn't received the last send yet, it doesn't matter.
-                                               assert!(e.is_full());
-                                       }
-                                       Ok(())
-                               }));
-                       }
-               }
-
+               // To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
+               // writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
+               // a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
+               // processing future which will call write_buffer_space_avail and we'll end up back here.
                let mut us = self.conn.lock().unwrap();
-               if resume_read {
-                       let us_ref = self.clone();
-                       schedule_read!(us_ref);
-               }
-               if data.is_empty() { return 0; }
                if us.writer.is_none() {
-                       us.read_paused = true;
+                       // The writer gets take()n when it is time to shut down, so just fast-return 0 here.
                        return 0;
                }
 
-               let mut bytes = bytes::BytesMut::with_capacity(data.len());
-               bytes.put(data);
-               let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze());
-               match write_res {
-                       Ok(res) => {
-                               match res {
-                                       AsyncSink::Ready => {
-                                               data.len()
-                                       },
-                                       AsyncSink::NotReady(_) => {
-                                               us.read_paused = true;
-                                               let us_ref = self.clone();
-                                               tokio::spawn(us.writer.take().unwrap().flush().then(move |writer_res| -> Result<(), ()> {
-                                                       if let Ok(writer) = writer_res {
-                                                               {
-                                                                       let mut us = us_ref.conn.lock().unwrap();
-                                                                       us.writer = Some(writer);
-                                                               }
-                                                               schedule_read!(us_ref);
-                                                       } // we'll fire the disconnect event on the socket reader end
-                                                       Ok(())
-                                               }));
-                                               0
-                                       }
-                               }
-                       },
-                       Err(_) => {
-                               // We'll fire the disconnected event on the socket reader end
-                               0
-                       },
+               if resume_read && us.read_paused {
+                       // The schedule_read future may go to lock up but end up getting woken up by there
+                       // being more room in the write buffer, dropping the other end of this Sender
+                       // before we get here, so we ignore any failures to wake it up.
+                       us.read_paused = false;
+                       let _ = us.read_waker.try_send(());
+               }
+               if data.is_empty() { return 0; }
+               let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&us.write_avail)) };
+               let mut ctx = task::Context::from_waker(&waker);
+               let mut written_len = 0;
+               loop {
+                       match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
+                               task::Poll::Ready(Ok(res)) => {
+                                       // The tokio docs *seem* to indicate this can't happen, and I certainly don't
+                                       // know how to handle it if it does (cause it should be a Poll::Pending
+                                       // instead):
+                                       assert_ne!(res, 0);
+                                       written_len += res;
+                                       if written_len == data.len() { return written_len; }
+                               },
+                               task::Poll::Ready(Err(e)) => {
+                                       // The tokio docs *seem* to indicate this can't happen, and I certainly don't
+                                       // know how to handle it if it does (cause it should be a Poll::Pending
+                                       // instead):
+                                       assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
+                                       // Probably we've already been closed, just return what we have and let the
+                                       // read thread handle closing logic.
+                                       return written_len;
+                               },
+                               task::Poll::Pending => {
+                                       // We're queued up for a write event now, but we need to make sure we also
+                                       // pause read given we're now waiting on the remote end to ACK (and in
+                                       // accordance with the send_data() docs).
+                                       us.read_paused = true;
+                                       return written_len;
+                               },
+                       }
                }
        }
 
        fn disconnect_socket(&mut self) {
-               let mut us = self.conn.lock().unwrap();
-               us.need_disconnect = true;
-               us.read_paused = true;
+               {
+                       let mut us = self.conn.lock().unwrap();
+                       us.rl_requested_disconnect = true;
+                       us.read_paused = true;
+                       // Wake up the sending thread, assuming it is still alive
+                       let _ = us.write_avail.try_send(());
+                       // Happy-path return:
+                       if !us.block_disconnect_socket { return; }
+               }
+               while self.conn.lock().unwrap().block_disconnect_socket {
+                       thread::yield_now();
+               }
        }
 }
-impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
+impl Clone for SocketDescriptor {
        fn clone(&self) -> Self {
                Self {
                        conn: Arc::clone(&self.conn),
                        id: self.id,
-                       peer_manager: Arc::clone(&self.peer_manager),
                }
        }
 }
-impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
-impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
+impl Eq for SocketDescriptor {}
+impl PartialEq for SocketDescriptor {
        fn eq(&self, o: &Self) -> bool {
                self.id == o.id
        }
 }
-impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
+impl Hash for SocketDescriptor {
        fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
                self.id.hash(state);
        }
 }
 
+#[cfg(test)]
+mod tests {
+       use lightning::ln::features::*;
+       use lightning::ln::msgs::*;
+       use lightning::ln::peer_handler::{MessageHandler, PeerManager};
+       use lightning::util::events::*;
+       use secp256k1::{Secp256k1, SecretKey, PublicKey};
+
+       use tokio::sync::mpsc;
+
+       use std::mem;
+       use std::sync::{Arc, Mutex};
+       use std::time::Duration;
+
+       pub struct TestLogger();
+       impl lightning::util::logger::Logger for TestLogger {
+               fn log(&self, record: &lightning::util::logger::Record) {
+                       println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+               }
+       }
+
+       struct MsgHandler{
+               expected_pubkey: PublicKey,
+               pubkey_connected: mpsc::Sender<()>,
+               pubkey_disconnected: mpsc::Sender<()>,
+               msg_events: Mutex<Vec<MessageSendEvent>>,
+       }
+       impl RoutingMessageHandler for MsgHandler {
+               fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+               fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+               fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
+               fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
+               fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)> { Vec::new() }
+               fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
+               fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+       }
+       impl ChannelMessageHandler for MsgHandler {
+               fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
+               fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &AcceptChannel) {}
+               fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &FundingCreated) {}
+               fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &FundingSigned) {}
+               fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &FundingLocked) {}
+               fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {}
+               fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {}
+               fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {}
+               fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {}
+               fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailHTLC) {}
+               fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC) {}
+               fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &CommitmentSigned) {}
+               fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {}
+               fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {}
+               fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {}
+               fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
+                       if *their_node_id == self.expected_pubkey {
+                               self.pubkey_disconnected.clone().try_send(()).unwrap();
+                       }
+               }
+               fn peer_connected(&self, their_node_id: &PublicKey, _msg: &Init) {
+                       if *their_node_id == self.expected_pubkey {
+                               self.pubkey_connected.clone().try_send(()).unwrap();
+                       }
+               }
+               fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {}
+               fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {}
+       }
+       impl MessageSendEventsProvider for MsgHandler {
+               fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+                       let mut ret = Vec::new();
+                       mem::swap(&mut *self.msg_events.lock().unwrap(), &mut ret);
+                       ret
+               }
+       }
+
+       async fn do_basic_connection_test() {
+               let secp_ctx = Secp256k1::new();
+               let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
+               let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
+               let a_pub = PublicKey::from_secret_key(&secp_ctx, &a_key);
+               let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
+
+               let (a_connected_sender, mut a_connected) = mpsc::channel(1);
+               let (a_disconnected_sender, mut a_disconnected) = mpsc::channel(1);
+               let a_handler = Arc::new(MsgHandler {
+                       expected_pubkey: b_pub,
+                       pubkey_connected: a_connected_sender,
+                       pubkey_disconnected: a_disconnected_sender,
+                       msg_events: Mutex::new(Vec::new()),
+               });
+               let a_manager = Arc::new(PeerManager::new(MessageHandler {
+                       chan_handler: Arc::clone(&a_handler),
+                       route_handler: Arc::clone(&a_handler) as Arc<dyn RoutingMessageHandler>,
+               }, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
+
+               let (b_connected_sender, mut b_connected) = mpsc::channel(1);
+               let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
+               let b_handler = Arc::new(MsgHandler {
+                       expected_pubkey: a_pub,
+                       pubkey_connected: b_connected_sender,
+                       pubkey_disconnected: b_disconnected_sender,
+                       msg_events: Mutex::new(Vec::new()),
+               });
+               let b_manager = Arc::new(PeerManager::new(MessageHandler {
+                       chan_handler: Arc::clone(&b_handler),
+                       route_handler: Arc::clone(&b_handler) as Arc<dyn RoutingMessageHandler>,
+               }, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
+
+               // We bind on localhost, hoping the environment is properly configured with a local
+               // address. This may not always be the case in containers and the like, so if this test is
+               // failing for you check that you have a loopback interface and it is configured with
+               // 127.0.0.1.
+               let (conn_a, conn_b) = if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9735") {
+                       (std::net::TcpStream::connect("127.0.0.1:9735").unwrap(), listener.accept().unwrap().0)
+               } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9999") {
+                       (std::net::TcpStream::connect("127.0.0.1:9999").unwrap(), listener.accept().unwrap().0)
+               } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:46926") {
+                       (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0)
+               } else { panic!("Failed to bind to v4 localhost on common ports"); };
+
+               let (sender, _receiver) = mpsc::channel(2);
+               let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap());
+               let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap());
+
+               tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap();
+               tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap();
+
+               a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError {
+                       node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None }
+               });
+               assert!(a_disconnected.try_recv().is_err());
+               assert!(b_disconnected.try_recv().is_err());
+
+               a_manager.process_events();
+               tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap();
+               tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap();
+
+               fut_a.await;
+               fut_b.await;
+       }
+
+       #[tokio::test(threaded_scheduler)]
+       async fn basic_threaded_connection_test() {
+               do_basic_connection_test().await;
+       }
+       #[tokio::test]
+       async fn basic_unthreaded_connection_test() {
+               do_basic_connection_test().await;
+       }
+}
index 64f7e4f0b682d78db36cecd00d3cdaa0251cf387..81762923fda224f83874ff4413dc3e4d7605ef3c 100644 (file)
@@ -295,7 +295,7 @@ pub(super) struct Channel<ChanSigner: ChannelKeys> {
        holding_cell_update_fee: Option<u64>,
        next_local_htlc_id: u64,
        next_remote_htlc_id: u64,
-       channel_update_count: u32,
+       update_time_counter: u32,
        feerate_per_kw: u64,
 
        #[cfg(debug_assertions)]
@@ -433,10 +433,6 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                cmp::max(at_open_background_feerate * B_OUTPUT_PLUS_SPENDING_INPUT_WEIGHT / 1000, 546) //TODO
        }
 
-       fn derive_our_htlc_minimum_msat(_at_open_channel_feerate_per_kw: u64) -> u64 {
-               1000 // TODO
-       }
-
        // Constructors:
        pub fn new_outbound<K: Deref, F: Deref>(fee_estimator: &F, keys_provider: &K, their_node_id: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64, logger: Arc<Logger>, config: &UserConfig) -> Result<Channel<ChanSigner>, APIError>
        where K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
@@ -490,7 +486,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        holding_cell_update_fee: None,
                        next_local_htlc_id: 0,
                        next_remote_htlc_id: 0,
-                       channel_update_count: 1,
+                       update_time_counter: 1,
 
                        resend_order: RAACommitmentOrder::CommitmentFirst,
 
@@ -519,7 +515,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        their_max_htlc_value_in_flight_msat: 0,
                        their_channel_reserve_satoshis: 0,
                        their_htlc_minimum_msat: 0,
-                       our_htlc_minimum_msat: Channel::<ChanSigner>::derive_our_htlc_minimum_msat(feerate),
+                       our_htlc_minimum_msat: if config.own_channel_config.our_htlc_minimum_msat == 0 { 1 } else { config.own_channel_config.our_htlc_minimum_msat },
                        their_to_self_delay: 0,
                        our_to_self_delay: config.own_channel_config.our_to_self_delay,
                        their_max_accepted_htlcs: 0,
@@ -714,7 +710,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        holding_cell_update_fee: None,
                        next_local_htlc_id: 0,
                        next_remote_htlc_id: 0,
-                       channel_update_count: 1,
+                       update_time_counter: 1,
 
                        resend_order: RAACommitmentOrder::CommitmentFirst,
 
@@ -744,7 +740,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        their_max_htlc_value_in_flight_msat: cmp::min(msg.max_htlc_value_in_flight_msat, msg.funding_satoshis * 1000),
                        their_channel_reserve_satoshis: msg.channel_reserve_satoshis,
                        their_htlc_minimum_msat: msg.htlc_minimum_msat,
-                       our_htlc_minimum_msat: Channel::<ChanSigner>::derive_our_htlc_minimum_msat(msg.feerate_per_kw as u64),
+                       our_htlc_minimum_msat: if config.own_channel_config.our_htlc_minimum_msat == 0 { 1 } else { config.own_channel_config.our_htlc_minimum_msat },
                        their_to_self_delay: msg.to_self_delay,
                        our_to_self_delay: config.own_channel_config.our_to_self_delay,
                        their_max_accepted_htlcs: msg.max_accepted_htlcs,
@@ -1586,7 +1582,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        self.channel_state |= ChannelState::TheirFundingLocked as u32;
                } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) {
                        self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS);
-                       self.channel_update_count += 1;
+                       self.update_time_counter += 1;
                } else if (self.channel_state & (ChannelState::ChannelFunded as u32) != 0 &&
                                 // Note that funding_signed/funding_created will have decremented both by 1!
                                 self.cur_local_commitment_transaction_number == INITIAL_COMMITMENT_NUMBER - 1 &&
@@ -1656,6 +1652,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                if msg.amount_msat > self.channel_value_satoshis * 1000 {
                        return Err(ChannelError::Close("Remote side tried to send more than the total value of the channel"));
                }
+               if msg.amount_msat == 0 {
+                       return Err(ChannelError::Close("Remote side tried to send a 0-msat HTLC"));
+               }
                if msg.amount_msat < self.our_htlc_minimum_msat {
                        return Err(ChannelError::Close("Remote side tried to send less than our minimum HTLC value"));
                }
@@ -2480,7 +2479,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                }
                Channel::<ChanSigner>::check_remote_fee(fee_estimator, msg.feerate_per_kw)?;
                self.pending_update_fee = Some(msg.feerate_per_kw as u64);
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
                Ok(())
        }
 
@@ -2763,7 +2762,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                // From here on out, we may not fail!
 
                self.channel_state |= ChannelState::RemoteShutdownSent as u32;
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
 
                // We can't send our shutdown until we've committed all of our pending HTLCs, but the
                // remote side is unlikely to accept any new HTLCs, so we go ahead and "free" any holding
@@ -2793,7 +2792,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                };
 
                self.channel_state |= ChannelState::LocalShutdownSent as u32;
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
 
                Ok((our_shutdown, self.maybe_propose_first_closing_signed(fee_estimator), dropped_outbound_htlcs))
        }
@@ -2860,7 +2859,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        if last_fee == msg.fee_satoshis {
                                self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &our_sig);
                                self.channel_state = ChannelState::ShutdownComplete as u32;
-                               self.channel_update_count += 1;
+                               self.update_time_counter += 1;
                                return Ok((None, Some(closing_tx)));
                        }
                }
@@ -2910,7 +2909,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &our_sig);
 
                self.channel_state = ChannelState::ShutdownComplete as u32;
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
 
                Ok((Some(msgs::ClosingSigned {
                        channel_id: self.channel_id,
@@ -3022,8 +3021,8 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
        }
 
        /// Allowed in any state (including after shutdown)
-       pub fn get_channel_update_count(&self) -> u32 {
-               self.channel_update_count
+       pub fn get_update_time_counter(&self) -> u32 {
+               self.update_time_counter
        }
 
        pub fn get_latest_monitor_update_id(&self) -> u64 {
@@ -3149,7 +3148,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                                                        panic!("Client called ChannelManager::funding_transaction_generated with bogus transaction!");
                                                }
                                                self.channel_state = ChannelState::ShutdownComplete as u32;
-                                               self.channel_update_count += 1;
+                                               self.update_time_counter += 1;
                                                return Err(msgs::ErrorMessage {
                                                        channel_id: self.channel_id(),
                                                        data: "funding tx had wrong script/value".to_owned()
@@ -3175,6 +3174,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                }
                if header.bitcoin_hash() != self.last_block_connected {
                        self.last_block_connected = header.bitcoin_hash();
+                       self.update_time_counter = cmp::max(self.update_time_counter, header.time);
                        if let Some(channel_monitor) = self.channel_monitor.as_mut() {
                                channel_monitor.last_block_hash = self.last_block_connected;
                        }
@@ -3185,7 +3185,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                                                true
                                        } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::TheirFundingLocked as u32) {
                                                self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS);
-                                               self.channel_update_count += 1;
+                                               self.update_time_counter += 1;
                                                true
                                        } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) {
                                                // We got a reorg but not enough to trigger a force close, just update
@@ -3492,6 +3492,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                if amount_msat > self.channel_value_satoshis * 1000 {
                        return Err(ChannelError::Ignore("Cannot send more than the total value of the channel"));
                }
+
+               if amount_msat == 0 {
+                       return Err(ChannelError::Ignore("Cannot send 0-msat HTLC"));
+               }
+
                if amount_msat < self.their_htlc_minimum_msat {
                        return Err(ChannelError::Ignore("Cannot send less than their minimum HTLC value"));
                }
@@ -3728,7 +3733,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                } else {
                        self.channel_state |= ChannelState::LocalShutdownSent as u32;
                }
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
 
                // Go ahead and drop holding cell updates as we'd rather fail payments than wait to send
                // our shutdown until we've committed all of the pending changes.
@@ -3777,7 +3782,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                }
 
                self.channel_state = ChannelState::ShutdownComplete as u32;
-               self.channel_update_count += 1;
+               self.update_time_counter += 1;
                if self.channel_monitor.is_some() {
                        (self.channel_monitor.as_mut().unwrap().get_latest_local_commitment_txn(), dropped_outbound_htlcs)
                } else {
@@ -3964,7 +3969,7 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
 
                self.next_local_htlc_id.write(writer)?;
                (self.next_remote_htlc_id - dropped_inbound_htlcs).write(writer)?;
-               self.channel_update_count.write(writer)?;
+               self.update_time_counter.write(writer)?;
                self.feerate_per_kw.write(writer)?;
 
                match self.last_sent_closing_fee {
@@ -4124,7 +4129,7 @@ impl<ChanSigner: ChannelKeys + Readable> ReadableArgs<Arc<Logger>> for Channel<C
 
                let next_local_htlc_id = Readable::read(reader)?;
                let next_remote_htlc_id = Readable::read(reader)?;
-               let channel_update_count = Readable::read(reader)?;
+               let update_time_counter = Readable::read(reader)?;
                let feerate_per_kw = Readable::read(reader)?;
 
                let last_sent_closing_fee = match <u8 as Readable>::read(reader)? {
@@ -4203,7 +4208,7 @@ impl<ChanSigner: ChannelKeys + Readable> ReadableArgs<Arc<Logger>> for Channel<C
                        holding_cell_update_fee,
                        next_local_htlc_id,
                        next_remote_htlc_id,
-                       channel_update_count,
+                       update_time_counter,
                        feerate_per_kw,
 
                        #[cfg(debug_assertions)]
index 5f7e903fd201c6da81c0eadf20dd61d6f98e2dd6..307b41d37a81e1e3719379a2c959119833e23fe7 100644 (file)
@@ -29,8 +29,8 @@ use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
 use chain::transaction::OutPoint;
 use ln::channel::{Channel, ChannelError};
 use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::features::{InitFeatures, NodeFeatures};
 use ln::router::Route;
-use ln::features::InitFeatures;
 use ln::msgs;
 use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
@@ -368,6 +368,10 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
        channel_state: Mutex<ChannelHolder<ChanSigner>>,
        our_network_key: SecretKey,
 
+       /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
+       /// value increases strictly since we don't assume access to a time source.
+       last_node_announcement_serial: AtomicUsize,
+
        /// The bulk of our storage will eventually be here (channels and message queues and the like).
        /// If we are connected to a peer we always at least have an entry here, even if no channels
        /// are currently open with that peer.
@@ -665,6 +669,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                        }),
                        our_network_key: keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(0),
+
                        per_peer_state: RwLock::new(HashMap::new()),
 
                        pending_events: Mutex::new(Vec::new()),
@@ -1118,7 +1124,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                let unsigned = msgs::UnsignedChannelUpdate {
                        chain_hash: self.genesis_hash,
                        short_channel_id: short_channel_id,
-                       timestamp: chan.get_channel_update_count(),
+                       timestamp: chan.get_update_time_counter(),
                        flags: (!were_node_one) as u16 | ((!chan.is_live() as u16) << 1),
                        cltv_expiry_delta: CLTV_EXPIRY_DELTA,
                        htlc_minimum_msat: chan.get_our_htlc_minimum_msat(),
@@ -1334,6 +1340,57 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
                })
        }
 
+       #[allow(dead_code)]
+       // Messages of up to 64KB should never end up more than half full with addresses, as that would
+       // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
+       // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
+       // message...
+       const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (msgs::NetAddress::MAX_LEN as u32 + 1) / 2;
+       #[deny(const_err)]
+       #[allow(dead_code)]
+       // ...by failing to compile if the number of addresses that would be half of a message is
+       // smaller than 500:
+       const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500;
+
+       /// Generates a signed node_announcement from the given arguments and creates a
+       /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have
+       /// seen a channel_announcement from us (ie unless we have public channels open).
+       ///
+       /// RGB is a node "color" and alias is a printable human-readable string to describe this node
+       /// to humans. They carry no in-protocol meaning.
+       ///
+       /// addresses represent the set (possibly empty) of socket addresses on which this node accepts
+       /// incoming connections. These will be broadcast to the network, publicly tying these
+       /// addresses together. If you wish to preserve user privacy, addresses should likely contain
+       /// only Tor Onion addresses.
+       ///
+       /// Panics if addresses is absurdly large (more than 500).
+       pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec<msgs::NetAddress>) {
+               let _ = self.total_consistency_lock.read().unwrap();
+
+               if addresses.len() > 500 {
+                       panic!("More than half the message size was taken up by public addresses!");
+               }
+
+               let announcement = msgs::UnsignedNodeAnnouncement {
+                       features: NodeFeatures::supported(),
+                       timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
+                       node_id: self.get_our_node_id(),
+                       rgb, alias, addresses,
+                       excess_address_data: Vec::new(),
+                       excess_data: Vec::new(),
+               };
+               let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
+
+               let mut channel_state = self.channel_state.lock().unwrap();
+               channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
+                       msg: msgs::NodeAnnouncement {
+                               signature: self.secp_ctx.sign(&msghash, &self.our_network_key),
+                               contents: announcement
+                       },
+               });
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -2719,6 +2776,18 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                }
                self.latest_block_height.store(height as usize, Ordering::Release);
                *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash;
+               loop {
+                       // Update last_node_announcement_serial to be the max of its current value and the
+                       // block timestamp. This should keep us close to the current time without relying on
+                       // having an explicit local time source.
+                       // Just in case we end up in a race, we loop until we either successfully update
+                       // last_node_announcement_serial or decide we don't need to.
+                       let old_serial = self.last_node_announcement_serial.load(Ordering::Acquire);
+                       if old_serial >= header.time as usize { break; }
+                       if self.last_node_announcement_serial.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
+                               break;
+                       }
+               }
        }
 
        /// We force-close the channel without letting our counterparty participate in the shutdown
@@ -2970,6 +3039,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
+                                       &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id,
                                        &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
@@ -3288,6 +3358,8 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
                        peer_state.latest_features.write(writer)?;
                }
 
+               (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+
                Ok(())
        }
 }
@@ -3459,6 +3531,8 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
 
+               let last_node_announcement_serial: u32 = Readable::read(reader)?;
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
@@ -3478,6 +3552,8 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
                        }),
                        our_network_key: args.keys_manager.get_node_secret(),
 
+                       last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
+
                        per_peer_state: RwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(Vec::new()),
index 205f4b011ac0fb32d62304f5bf1e3ceadbbf2ca1..514a95d27db9339edc8b78989a0477695ae102ab 100644 (file)
@@ -1024,7 +1024,7 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
                        for ev in events.iter() {
                                match *ev {
                                        OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                               writer.write_all(&[1; 1])?;
+                                               0u8.write(writer)?;
                                                htlc_update.0.write(writer)?;
                                                htlc_update.1.write(writer)?;
                                        },
@@ -1668,22 +1668,22 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
        }
 
        /// Attempts to claim a remote HTLC-Success/HTLC-Timeout's outputs using the revocation key
-       fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32) -> Vec<ClaimRequest> {
-               //TODO: send back new outputs to guarantee pending_claim_request consistency
+       fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32) -> (Vec<ClaimRequest>, Option<(Sha256dHash, Vec<TxOut>)>) {
+               let htlc_txid = tx.txid();
                if tx.input.len() != 1 || tx.output.len() != 1 || tx.input[0].witness.len() != 5 {
-                       return Vec::new()
+                       return (Vec::new(), None)
                }
 
                macro_rules! ignore_error {
                        ( $thing : expr ) => {
                                match $thing {
                                        Ok(a) => a,
-                                       Err(_) => return Vec::new()
+                                       Err(_) => return (Vec::new(), None)
                                }
                        };
                }
 
-               let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return Vec::new(); };
+               let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return (Vec::new(), None); };
                let per_commitment_key = ignore_error!(SecretKey::from_slice(&secret));
                let per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key);
                let (revocation_pubkey, revocation_key) = match self.key_storage {
@@ -1694,16 +1694,15 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        Storage::Watchtower { .. } => { unimplemented!() }
                };
                let delayed_key = match self.their_delayed_payment_base_key {
-                       None => return Vec::new(),
+                       None => return (Vec::new(), None),
                        Some(their_delayed_payment_base_key) => ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &per_commitment_point, &their_delayed_payment_base_key)),
                };
                let redeemscript = chan_utils::get_revokeable_redeemscript(&revocation_pubkey, self.our_to_self_delay, &delayed_key);
-               let htlc_txid = tx.txid(); //TODO: This is gonna be a performance bottleneck for watchtowers!
 
                log_trace!(self, "Remote HTLC broadcast {}:{}", htlc_txid, 0);
                let witness_data = InputMaterial::Revoked { witness_script: redeemscript, pubkey: Some(revocation_pubkey), key: revocation_key, is_htlc: false, amount: tx.output[0].value };
                let claimable_outpoints = vec!(ClaimRequest { absolute_timelock: height + self.our_to_self_delay as u32, aggregable: true, outpoint: BitcoinOutPoint { txid: htlc_txid, vout: 0}, witness_data });
-               claimable_outpoints
+               (claimable_outpoints, Some((htlc_txid, tx.output.clone())))
        }
 
        fn broadcast_by_local_state(&self, local_tx: &LocalSignedTx, delayed_payment_base_key: &SecretKey) -> (Vec<Transaction>, Vec<SpendableOutputDescriptor>, Vec<TxOut>) {
@@ -2019,8 +2018,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                        }
                                } else {
                                        if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) {
-                                               let mut new_outpoints = self.check_spend_remote_htlc(&tx, commitment_number, height);
+                                               let (mut new_outpoints, new_outputs_option) = self.check_spend_remote_htlc(&tx, commitment_number, height);
                                                claimable_outpoints.append(&mut new_outpoints);
+                                               if let Some(new_outputs) = new_outputs_option {
+                                                       watch_outputs.push(new_outputs);
+                                               }
                                        }
                                }
                        }
index 990e0478c4d7b70a77d436af67f0888428a70e5d..18ea7bffbb8a5840ec7441e6171572630ea9573e 100644 (file)
@@ -150,11 +150,18 @@ impl NodeFeatures {
 
        /// Takes the flags that we know how to interpret in an init-context features that are also
        /// relevant in a node-context features and creates a node-context features from them.
+       /// Be sure to blank out features that are unknown to us.
        pub(crate) fn with_known_relevant_init_flags(init_ctx: &InitFeatures) -> Self {
                let mut flags = Vec::new();
-               if init_ctx.flags.len() > 0 {
-                       // Pull out data_loss_protect and upfront_shutdown_script (bits 0, 1, 4, and 5)
-                       flags.push(init_ctx.flags.last().unwrap() & 0b00110011);
+               for (i, feature_byte)in init_ctx.flags.iter().enumerate() {
+                       match i {
+                               // Blank out initial_routing_sync (feature bits 2/3), gossip_queries (6/7),
+                               // gossip_queries_ex (10/11), option_static_remotekey (12/13), and
+                               // payment_secret (14/15)
+                               0 => flags.push(feature_byte & 0b00110011),
+                               1 => flags.push(feature_byte & 0b00000011),
+                               _ => (),
+                       }
                }
                Self { flags, mark: PhantomData, }
        }
@@ -296,7 +303,7 @@ impl<T: sealed::Context> Readable for Features<T> {
 
 #[cfg(test)]
 mod tests {
-       use super::{ChannelFeatures, InitFeatures, NodeFeatures};
+       use super::{ChannelFeatures, InitFeatures, NodeFeatures, Features};
 
        #[test]
        fn sanity_test_our_features() {
@@ -330,4 +337,26 @@ mod tests {
                features.clear_require_unknown_bits();
                assert!(!features.requires_unknown_bits());
        }
+
+       #[test]
+       fn test_node_with_known_relevant_init_flags() {
+               // Create an InitFeatures with initial_routing_sync supported.
+               let mut init_features = InitFeatures::supported();
+               init_features.set_initial_routing_sync();
+
+               // Attempt to pull out non-node-context feature flags from these InitFeatures.
+               let res = NodeFeatures::with_known_relevant_init_flags(&init_features);
+
+               {
+                       // Check that the flags are as expected: optional_data_loss_protect,
+                       // option_upfront_shutdown_script, and var_onion_optin set.
+                       assert_eq!(res.flags[0], 0b00100010);
+                       assert_eq!(res.flags[1], 0b00000010);
+                       assert_eq!(res.flags.len(), 2);
+               }
+
+               // Check that the initial_routing_sync feature was correctly blanked out.
+               let new_features: InitFeatures = Features::from_le_bytes(res.flags);
+               assert!(!new_features.initial_routing_sync());
+       }
 }
index 489647dcfa094764883d9ee2adfb106226afbf4c..bd8394d14729ec77bc9c91e129b112b2f5199ffe 100644 (file)
@@ -394,10 +394,33 @@ pub fn create_announced_chan_between_nodes<'a, 'b, 'c, 'd>(nodes: &'a Vec<Node<'
 
 pub fn create_announced_chan_between_nodes_with_value<'a, 'b, 'c, 'd>(nodes: &'a Vec<Node<'b, 'c, 'd>>, a: usize, b: usize, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
        let chan_announcement = create_chan_between_nodes_with_value(&nodes[a], &nodes[b], channel_value, push_msat, a_flags, b_flags);
+
+       nodes[a].node.broadcast_node_announcement([0, 0, 0], [0; 32], Vec::new());
+       let a_events = nodes[a].node.get_and_clear_pending_msg_events();
+       assert_eq!(a_events.len(), 1);
+       let a_node_announcement = match a_events[0] {
+               MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                       (*msg).clone()
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       nodes[b].node.broadcast_node_announcement([1, 1, 1], [1; 32], Vec::new());
+       let b_events = nodes[b].node.get_and_clear_pending_msg_events();
+       assert_eq!(b_events.len(), 1);
+       let b_node_announcement = match b_events[0] {
+               MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                       (*msg).clone()
+               },
+               _ => panic!("Unexpected event"),
+       };
+
        for node in nodes {
                assert!(node.router.handle_channel_announcement(&chan_announcement.0).unwrap());
                node.router.handle_channel_update(&chan_announcement.1).unwrap();
                node.router.handle_channel_update(&chan_announcement.2).unwrap();
+               node.router.handle_node_announcement(&a_node_announcement).unwrap();
+               node.router.handle_node_announcement(&b_node_announcement).unwrap();
        }
        (chan_announcement.1, chan_announcement.2, chan_announcement.3, chan_announcement.4)
 }
@@ -982,6 +1005,7 @@ pub fn create_node_chanmgrs<'a, 'b>(node_count: usize, cfgs: &'a Vec<NodeCfg<'b>
                let mut default_config = UserConfig::default();
                default_config.channel_options.announced_channel = true;
                default_config.peer_channel_config_limits.force_announced_channel_preference = false;
+               default_config.own_channel_config.our_htlc_minimum_msat = 1000; // sanitization being done by the sender, to exerce receiver logic we need to lift of limit
                let node = ChannelManager::new(Network::Testnet, cfgs[i].fee_estimator, &cfgs[i].chan_monitor, cfgs[i].tx_broadcaster, cfgs[i].logger.clone(), &cfgs[i].keys_manager, if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, 0).unwrap();
                chanmgrs.push(node);
        }
@@ -1074,9 +1098,9 @@ pub fn test_txn_broadcast<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, chan: &(msgs::Cha
 /// HTLC transaction.
 pub fn test_revoked_htlc_claim_txn_broadcast<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, revoked_tx: Transaction, commitment_revoked_tx: Transaction)  {
        let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
-       // We should issue a 2nd transaction if one htlc is dropped from initial claiming tx
-       // but sometimes not as feerate is too-low
-       if node_txn.len() != 1 && node_txn.len() != 2 { assert!(false); }
+       // We may issue multiple claiming transaction on revoked outputs due to block rescan
+       // for revoked htlc outputs
+       if node_txn.len() != 1 && node_txn.len() != 2 && node_txn.len() != 3 { assert!(false); }
        node_txn.retain(|tx| {
                if tx.input.len() == 1 && tx.input[0].previous_output.txid == revoked_tx.txid() {
                        check_spends!(tx, revoked_tx);
index a41ae11af2c2d19cb95060ef72d230b1e1cc4ca8..bd4244091c7a21ccae4e405627d51e889011821b 100644 (file)
@@ -2127,8 +2127,10 @@ fn test_justice_tx() {
                test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
 
                nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               // Verify broadcast of revoked HTLC-timeout
                let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+               // Broadcast revoked HTLC-timeout on node 1
                nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
                test_revoked_htlc_claim_txn_broadcast(&nodes[1], node_txn[1].clone(), revoked_local_txn[0].clone());
        }
@@ -4182,9 +4184,14 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_timeout_tx() {
        check_closed_broadcast!(nodes[1], false);
 
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
-       assert_eq!(node_txn.len(), 4 ); // ChannelMonitor: justice tx on revoked commitment, justice tx on revoked HTLC-timeout, adjusted justice tx, ChannelManager: local commitment tx
+       assert_eq!(node_txn.len(), 4); // ChannelMonitor: justice tx on revoked commitment, justice tx on revoked HTLC-timeout, adjusted justice tx, ChannelManager: local commitment tx
+       assert_eq!(node_txn[0].input.len(), 2);
+       check_spends!(node_txn[0], revoked_local_txn[0]);
+       check_spends!(node_txn[1], chan_1.3);
        assert_eq!(node_txn[2].input.len(), 1);
        check_spends!(node_txn[2], revoked_htlc_txn[0]);
+       assert_eq!(node_txn[3].input.len(), 1);
+       check_spends!(node_txn[3], revoked_local_txn[0]);
 
        // Check B's ChannelMonitor was able to generate the right spendable output descriptor
        let spend_txn = check_spendable_outputs!(nodes[1], 1);
@@ -4233,7 +4240,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
 
        // Check A's ChannelMonitor was able to generate the right spendable output descriptor
        let spend_txn = check_spendable_outputs!(nodes[0], 1);
-       assert_eq!(spend_txn.len(), 4);
+       assert_eq!(spend_txn.len(), 5); // Duplicated SpendableOutput due to block rescan after revoked htlc output tracking
        assert_eq!(spend_txn[0], spend_txn[2]);
        check_spends!(spend_txn[0], revoked_local_txn[0]); // spending to_remote output from revoked local tx
        check_spends!(spend_txn[1], node_txn[0]); // spending justice tx output from revoked local tx htlc received output
@@ -5486,7 +5493,6 @@ fn bolt2_open_channel_sending_node_checks_part2() {
 
 #[test]
 fn test_update_add_htlc_bolt2_sender_value_below_minimum_msat() {
-       //BOLT2 Requirement: MUST offer amount_msat greater than 0.
        //BOLT2 Requirement: MUST NOT offer amount_msat below the receiving node's htlc_minimum_msat (same validation check catches both of these)
        let chanmon_cfgs = create_chanmon_cfgs(2);
        let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
@@ -5496,7 +5502,7 @@ fn test_update_add_htlc_bolt2_sender_value_below_minimum_msat() {
        let mut route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
        let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
 
-       route.hops[0].fee_msat = 0;
+       route.hops[0].fee_msat = 100;
 
        let err = nodes[0].node.send_payment(route, our_payment_hash);
 
@@ -5509,6 +5515,51 @@ fn test_update_add_htlc_bolt2_sender_value_below_minimum_msat() {
        nodes[0].logger.assert_log("lightning::ln::channelmanager".to_string(), "Cannot send less than their minimum HTLC value".to_string(), 1);
 }
 
+#[test]
+fn test_update_add_htlc_bolt2_sender_zero_value_msat() {
+       //BOLT2 Requirement: MUST offer amount_msat greater than 0.
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+       let _chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::supported(), InitFeatures::supported());
+       let mut route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
+       let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+
+       route.hops[0].fee_msat = 0;
+
+       let err = nodes[0].node.send_payment(route, our_payment_hash);
+
+       if let Err(APIError::ChannelUnavailable{err}) = err {
+               assert_eq!(err, "Cannot send 0-msat HTLC");
+       } else {
+               assert!(false);
+       }
+       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+       nodes[0].logger.assert_log("lightning::ln::channelmanager".to_string(), "Cannot send 0-msat HTLC".to_string(), 1);
+}
+
+#[test]
+fn test_update_add_htlc_bolt2_receiver_zero_value_msat() {
+       //BOLT2 Requirement: MUST offer amount_msat greater than 0.
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+       let _chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::supported(), InitFeatures::supported());
+       let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
+       let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+
+       nodes[0].node.send_payment(route, our_payment_hash).unwrap();
+       check_added_monitors!(nodes[0], 1);
+       let mut updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id());
+       updates.update_add_htlcs[0].amount_msat = 0;
+
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
+       nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Remote side tried to send a 0-msat HTLC".to_string(), 1);
+       check_closed_broadcast!(nodes[1], true).unwrap();
+}
+
 #[test]
 fn test_update_add_htlc_bolt2_sender_cltv_expiry_too_high() {
        //BOLT 2 Requirement: MUST set cltv_expiry less than 500000000.
@@ -6949,7 +7000,7 @@ fn test_bump_penalty_txn_on_revoked_htlcs() {
 
                assert_eq!(node_txn[0].input.len(), 2);
                check_spends!(node_txn[0], revoked_htlc_txn[0], revoked_htlc_txn[1]);
-               //// Verify bumped tx is different and 25% bump heuristic
+               // Verify bumped tx is different and 25% bump heuristic
                assert_ne!(first, node_txn[0].txid());
                let fee_2 = revoked_htlc_txn[0].output[0].value + revoked_htlc_txn[1].output[0].value - node_txn[0].output[0].value;
                let feerate_2 = fee_2 * 1000 / node_txn[0].get_weight() as u64;
@@ -6964,7 +7015,13 @@ fn test_bump_penalty_txn_on_revoked_htlcs() {
        connect_blocks(&nodes[0].block_notifier, 20, 145, true, header_145.bitcoin_hash());
        {
                let mut node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
-               assert_eq!(node_txn.len(), 1); //TODO: fix check_spend_remote_htlc lack of watch output
+               // We verify than no new transaction has been broadcast because previously
+               // we were buggy on this exact behavior by not tracking for monitoring remote HTLC outputs (see #411)
+               // which means we wouldn't see a spend of them by a justice tx and bumped justice tx
+               // were generated forever instead of safe cleaning after confirmation and ANTI_REORG_SAFE_DELAY blocks.
+               // Enforce spending of revoked htlc output by claiming transaction remove request as expected and dry
+               // up bumped justice generation.
+               assert_eq!(node_txn.len(), 0);
                node_txn.clear();
        }
        check_closed_broadcast!(nodes[0], false);
@@ -7271,3 +7328,21 @@ fn test_override_channel_config() {
        assert_eq!(res.channel_flags, 0);
        assert_eq!(res.to_self_delay, 200);
 }
+
+#[test]
+fn test_override_0msat_htlc_minimum() {
+       let mut zero_config = UserConfig::default();
+       zero_config.own_channel_config.our_htlc_minimum_msat = 0;
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(zero_config.clone())]);
+       let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 16_000_000, 12_000_000, 42, Some(zero_config)).unwrap();
+       let res = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+       assert_eq!(res.htlc_minimum_msat, 1);
+
+       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), InitFeatures::supported(), &res);
+       let res = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+       assert_eq!(res.htlc_minimum_msat, 1);
+}
index 8d23ac9a8e575436f2234014edd91ad53485e8da..1ad798fd4bfaf2417d20fdeb4030dd876d7dd77c 100644 (file)
@@ -47,8 +47,6 @@ pub enum DecodeError {
        InvalidValue,
        /// Buffer too short
        ShortRead,
-       /// node_announcement included more than one address of a given type!
-       ExtraAddressesPerType,
        /// A length descriptor in the packet didn't describe the later data correctly
        BadLengthDescriptor,
        /// Error from std::io
@@ -304,6 +302,9 @@ impl NetAddress {
                        &NetAddress::OnionV3 { .. } => { 37 },
                }
        }
+
+       /// The maximum length of any address descriptor, not including the 1-byte type
+       pub(crate) const MAX_LEN: u16 = 37;
 }
 
 impl Writeable for NetAddress {
@@ -599,10 +600,11 @@ pub trait RoutingMessageHandler : Send + Sync {
        fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
        /// Gets a subset of the channel announcements and updates required to dump our routing table
        /// to a remote node, starting at the short_channel_id indicated by starting_point and
-       /// including batch_amount entries.
+       /// including the batch_amount entries immediately higher in numerical value than starting_point.
        fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)>;
        /// Gets a subset of the node announcements required to dump our routing table to a remote node,
-       /// starting at the node *after* the provided publickey and including batch_amount entries.
+       /// starting at the node *after* the provided publickey and including batch_amount entries
+       /// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
        /// If None is provided for starting_point, we start at the first node.
        fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
        /// Returns whether a full sync should be requested from a peer.
@@ -677,7 +679,6 @@ impl Error for DecodeError {
                        DecodeError::UnknownRequiredFeature => "Unknown required feature preventing decode",
                        DecodeError::InvalidValue => "Nonsense bytes didn't map to the type they were interpreted as",
                        DecodeError::ShortRead => "Packet extended beyond the provided bytes",
-                       DecodeError::ExtraAddressesPerType => "More than one address of a single type",
                        DecodeError::BadLengthDescriptor => "A length descriptor in the packet didn't describe the later data correctly",
                        DecodeError::Io(ref e) => e.description(),
                }
@@ -1209,8 +1210,7 @@ impl Writeable for UnsignedNodeAnnouncement {
                self.alias.write(w)?;
 
                let mut addrs_to_encode = self.addresses.clone();
-               addrs_to_encode.sort_unstable_by(|a, b| { a.get_id().cmp(&b.get_id()) });
-               addrs_to_encode.dedup_by(|a, b| { a.get_id() == b.get_id() });
+               addrs_to_encode.sort_by(|a, b| { a.get_id().cmp(&b.get_id()) });
                let mut addr_len = 0;
                for addr in &addrs_to_encode {
                        addr_len += 1 + addr.len();
@@ -1235,7 +1235,8 @@ impl Readable for UnsignedNodeAnnouncement {
                let alias: [u8; 32] = Readable::read(r)?;
 
                let addr_len: u16 = Readable::read(r)?;
-               let mut addresses: Vec<NetAddress> = Vec::with_capacity(4);
+               let mut addresses: Vec<NetAddress> = Vec::new();
+               let mut highest_addr_type = 0;
                let mut addr_readpos = 0;
                let mut excess = false;
                let mut excess_byte = 0;
@@ -1243,28 +1244,11 @@ impl Readable for UnsignedNodeAnnouncement {
                        if addr_len <= addr_readpos { break; }
                        match Readable::read(r) {
                                Ok(Ok(addr)) => {
-                                       match addr {
-                                               NetAddress::IPv4 { .. } => {
-                                                       if addresses.len() > 0 {
-                                                               return Err(DecodeError::ExtraAddressesPerType);
-                                                       }
-                                               },
-                                               NetAddress::IPv6 { .. } => {
-                                                       if addresses.len() > 1 || (addresses.len() == 1 && addresses[0].get_id() != 1) {
-                                                               return Err(DecodeError::ExtraAddressesPerType);
-                                                       }
-                                               },
-                                               NetAddress::OnionV2 { .. } => {
-                                                       if addresses.len() > 2 || (addresses.len() > 0 && addresses.last().unwrap().get_id() > 2) {
-                                                               return Err(DecodeError::ExtraAddressesPerType);
-                                                       }
-                                               },
-                                               NetAddress::OnionV3 { .. } => {
-                                                       if addresses.len() > 3 || (addresses.len() > 0 && addresses.last().unwrap().get_id() > 3) {
-                                                               return Err(DecodeError::ExtraAddressesPerType);
-                                                       }
-                                               },
+                                       if addr.get_id() < highest_addr_type {
+                                               // Addresses must be sorted in increasing order
+                                               return Err(DecodeError::InvalidValue);
                                        }
+                                       highest_addr_type = addr.get_id();
                                        if addr_len < addr_readpos + 1 + addr.len() {
                                                return Err(DecodeError::BadLengthDescriptor);
                                        }
@@ -1311,7 +1295,7 @@ impl Readable for UnsignedNodeAnnouncement {
 
 impl_writeable_len_match!(NodeAnnouncement, {
                { NodeAnnouncement { contents: UnsignedNodeAnnouncement { ref features, ref addresses, ref excess_address_data, ref excess_data, ..}, .. },
-                       64 + 76 + features.byte_count() + addresses.len()*38 + excess_address_data.len() + excess_data.len() }
+                       64 + 76 + features.byte_count() + addresses.len()*(NetAddress::MAX_LEN as usize + 1) + excess_address_data.len() + excess_data.len() }
        }, {
        signature,
        contents
index e456b7164b30a1e2f776dc273db29fbaf110c34e..3f21d4c1f64b4a4a73b708c9e924863960384e50 100644 (file)
@@ -482,6 +482,7 @@ impl OnchainTxHandler {
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator
        {
+               log_trace!(self, "Block at height {} connected with {} claim requests", height, claimable_outpoints.len());
                let mut new_claims = Vec::new();
                let mut aggregated_claim = HashMap::new();
                let mut aggregated_soonest = ::std::u32::MAX;
@@ -573,9 +574,11 @@ impl OnchainTxHandler {
                                                if set_equality {
                                                        clean_claim_request_after_safety_delay!();
                                                } else { // If false, generate new claim request with update outpoint set
+                                                       let mut at_least_one_drop = false;
                                                        for input in tx.input.iter() {
                                                                if let Some(input_material) = claim_material.per_input_material.remove(&input.previous_output) {
                                                                        claimed_outputs_material.push((input.previous_output, input_material));
+                                                                       at_least_one_drop = true;
                                                                }
                                                                // If there are no outpoints left to claim in this request, drop it entirely after ANTI_REORG_DELAY.
                                                                if claim_material.per_input_material.is_empty() {
@@ -583,7 +586,9 @@ impl OnchainTxHandler {
                                                                }
                                                        }
                                                        //TODO: recompute soonest_timelock to avoid wasting a bit on fees
-                                                       bump_candidates.insert(first_claim_txid_height.0.clone());
+                                                       if at_least_one_drop {
+                                                               bump_candidates.insert(first_claim_txid_height.0.clone());
+                                                       }
                                                }
                                                break; //No need to iterate further, either tx is our or their
                                        } else {
@@ -634,6 +639,7 @@ impl OnchainTxHandler {
                }
 
                // Build, bump and rebroadcast tx accordingly
+               log_trace!(self, "Bumping {} candidates", bump_candidates.len());
                for first_claim_txid in bump_candidates.iter() {
                        if let Some((new_timer, new_feerate)) = {
                                if let Some(claim_material) = self.pending_claim_requests.get(first_claim_txid) {
index 60df3d80196698234146400260411ef5f4a70b2e..12cf937bc0f07785fbb58d93db58355dbbf0dce9 100644 (file)
@@ -133,13 +133,22 @@ impl Peer {
        /// announcements/updates for the given channel_id then we will send it when we get to that
        /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
        /// sent the old versions, we should send the update, and so return true here.
-       fn should_forward_channel(&self, channel_id: u64)->bool{
+       fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
                match self.sync_status {
                        InitSyncTracker::NoSyncRequested => true,
                        InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
                        InitSyncTracker::NodesSyncing(_) => true,
                }
        }
+
+       /// Similar to the above, but for node announcements indexed by node_id.
+       fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
+               match self.sync_status {
+                       InitSyncTracker::NoSyncRequested => true,
+                       InitSyncTracker::ChannelsSyncing(_) => false,
+                       InitSyncTracker::NodesSyncing(pk) => pk < node_id,
+               }
+       }
 }
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
@@ -436,7 +445,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
        /// on this file descriptor has resume_read set (preventing DoS issues in the send buffer).
        ///
        /// Panics if the descriptor was not previously registered in a new_*_connection event.
-       pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+       pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
                match self.do_read_event(peer_descriptor, data) {
                        Ok(res) => Ok(res),
                        Err(e) => {
@@ -446,7 +455,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                }
        }
 
-       fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+       fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
                let pause_read = {
                        let mut peers_lock = self.peers.lock().unwrap();
                        let peers = &mut *peers_lock;
@@ -586,10 +595,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                                                log_debug!(self, "Deserialization failed due to shortness of message");
                                                                                                                return Err(PeerHandleError { no_connection_possible: false });
                                                                                                        }
-                                                                                                       msgs::DecodeError::ExtraAddressesPerType => {
-                                                                                                               log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407");
-                                                                                                               continue;
-                                                                                                       }
                                                                                                        msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
                                                                                                        msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
                                                                                                }
@@ -958,7 +963,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
                                                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel(msg.contents.short_channel_id) {
+                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                                        continue
                                                                }
                                                                match peer.their_node_id {
@@ -975,6 +980,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                        }
                                                }
                                        },
+                                       MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                                               log_trace!(self, "Handling BroadcastNodeAnnouncement event in peer_handler");
+                                               if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
+                                                       let encoded_msg = encode_msg!(msg);
+
+                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                                               !peer.should_forward_node_announcement(msg.contents.node_id) {
+                                                                       continue
+                                                               }
+                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
+                                                       }
+                                               }
+                                       },
                                        MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                                log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
@@ -982,7 +1002,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
                                                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel(msg.contents.short_channel_id)  {
+                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                                        continue
                                                                }
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
@@ -1208,9 +1228,9 @@ mod tests {
                let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
                let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
                peer_a.new_inbound_connection(fd_a.clone()).unwrap();
-               assert_eq!(peer_a.read_event(&mut fd_a, initial_data).unwrap(), false);
-               assert_eq!(peer_b.read_event(&mut fd_b, fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
-               assert_eq!(peer_a.read_event(&mut fd_a, fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
        }
 
        #[test]
index 2b23ae56ddf9a1514f10f7edbd7571500982a449..39e8e6114d330ecb3ba22605de23ad8e677da1cc 100644 (file)
@@ -156,7 +156,10 @@ struct NodeInfo {
        lowest_inbound_channel_fee_proportional_millionths: u32,
 
        features: NodeFeatures,
-       last_update: u32,
+       /// Unlike for channels, we may have a NodeInfo entry before having received a node_update.
+       /// Thus, we have to be able to capture "no update has been received", which we do with an
+       /// Option here.
+       last_update: Option<u32>,
        rgb: [u8; 3],
        alias: [u8; 32],
        addresses: Vec<NetAddress>,
@@ -167,7 +170,7 @@ struct NodeInfo {
 
 impl std::fmt::Display for NodeInfo {
        fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
-               write!(f, "features: {}, last_update: {}, lowest_inbound_channel_fee_base_msat: {}, lowest_inbound_channel_fee_proportional_millionths: {}, channels: {:?}", log_bytes!(self.features.encode()), self.last_update, self.lowest_inbound_channel_fee_base_msat, self.lowest_inbound_channel_fee_proportional_millionths, &self.channels[..])?;
+               write!(f, "features: {}, last_update: {:?}, lowest_inbound_channel_fee_base_msat: {}, lowest_inbound_channel_fee_proportional_millionths: {}, channels: {:?}", log_bytes!(self.features.encode()), self.last_update, self.lowest_inbound_channel_fee_base_msat, self.lowest_inbound_channel_fee_proportional_millionths, &self.channels[..])?;
                Ok(())
        }
 }
@@ -418,12 +421,15 @@ impl RoutingMessageHandler for Router {
                match network.nodes.get_mut(&msg.contents.node_id) {
                        None => Err(LightningError{err: "No existing channels for node_announcement", action: ErrorAction::IgnoreError}),
                        Some(node) => {
-                               if node.last_update >= msg.contents.timestamp {
-                                       return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
+                               match node.last_update {
+                                       Some(last_update) => if last_update >= msg.contents.timestamp {
+                                               return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
+                                       },
+                                       None => {},
                                }
 
                                node.features = msg.contents.features.clone();
-                               node.last_update = msg.contents.timestamp;
+                               node.last_update = Some(msg.contents.timestamp);
                                node.rgb = msg.contents.rgb;
                                node.alias = msg.contents.alias;
                                node.addresses = msg.contents.addresses.clone();
@@ -539,7 +545,7 @@ impl RoutingMessageHandler for Router {
                                                        lowest_inbound_channel_fee_base_msat: u32::max_value(),
                                                        lowest_inbound_channel_fee_proportional_millionths: u32::max_value(),
                                                        features: NodeFeatures::empty(),
-                                                       last_update: 0,
+                                                       last_update: None,
                                                        rgb: [0; 3],
                                                        alias: [0; 32],
                                                        addresses: Vec::new(),
@@ -752,7 +758,7 @@ impl Router {
                        lowest_inbound_channel_fee_base_msat: u32::max_value(),
                        lowest_inbound_channel_fee_proportional_millionths: u32::max_value(),
                        features: NodeFeatures::empty(),
-                       last_update: 0,
+                       last_update: None,
                        rgb: [0; 3],
                        alias: [0; 32],
                        addresses: Vec::new(),
@@ -1175,7 +1181,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 100,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(1)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1209,7 +1215,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(2)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1243,7 +1249,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(8)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1283,7 +1289,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(3)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1363,7 +1369,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(4)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1397,7 +1403,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(5)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
@@ -1454,7 +1460,7 @@ mod tests {
                                lowest_inbound_channel_fee_base_msat: 0,
                                lowest_inbound_channel_fee_proportional_millionths: 0,
                                features: NodeFeatures::from_le_bytes(id_to_feature_flags!(6)),
-                               last_update: 1,
+                               last_update: Some(1),
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
index dfd11fc7e571760fca83d1364f679e986dc04b60..15a218060e6ffc52b4843272c3d6fa64113408b2 100644 (file)
@@ -401,4 +401,64 @@ mod tests {
                let message = Message::Unknown(MessageType(43));
                assert!(!message.type_id().is_even());
        }
+
+       #[test]
+       fn read_lnd_init_msg() {
+               // Taken from lnd v0.9.0-beta.
+               let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 162, 161];
+               check_init_msg(buffer);
+       }
+
+       #[test]
+       fn read_clightning_init_msg() {
+               // Taken from c-lightning v0.8.0.
+               let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 170, 162, 1, 32, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15];
+               check_init_msg(buffer);
+       }
+
+       fn check_init_msg(buffer: Vec<u8>) {
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let decoded_msg = read(&mut reader).unwrap();
+               match decoded_msg {
+                       Message::Init(msgs::Init { features }) => {
+                               assert!(features.supports_variable_length_onion());
+                               assert!(features.supports_upfront_shutdown_script());
+                               assert!(features.supports_unknown_bits());
+                               assert!(!features.requires_unknown_bits());
+                               assert!(!features.initial_routing_sync());
+                       },
+                       _ => panic!("Expected init message, found message type: {}", decoded_msg.type_id())
+               }
+       }
+
+       #[test]
+       fn read_lnd_node_announcement() {
+               // Taken from lnd v0.9.0-beta.
+               let buffer = vec![1, 1, 91, 164, 146, 213, 213, 165, 21, 227, 102, 33, 105, 179, 214, 21, 221, 175, 228, 93, 57, 177, 191, 127, 107, 229, 31, 50, 21, 81, 179, 71, 39, 18, 35, 2, 89, 224, 110, 123, 66, 39, 148, 246, 177, 85, 12, 19, 70, 226, 173, 132, 156, 26, 122, 146, 71, 213, 247, 48, 93, 190, 185, 177, 12, 172, 0, 3, 2, 162, 161, 94, 103, 195, 37, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 51, 153, 255, 97, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 1, 172, 21, 0, 2, 38, 7];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let decoded_msg = read(&mut reader).unwrap();
+               match decoded_msg {
+                       Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => {
+                               assert!(features.supports_variable_length_onion());
+                               assert!(features.supports_upfront_shutdown_script());
+                               assert!(features.supports_unknown_bits());
+                               assert!(!features.requires_unknown_bits());
+                       },
+                       _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
+               }
+       }
+
+       #[test]
+       fn read_lnd_chan_announcement() {
+               // Taken from lnd v0.9.0-beta.
+               let buffer = vec![1, 0, 82, 238, 153, 33, 128, 87, 215, 2, 28, 241, 140, 250, 98, 255, 56, 5, 79, 240, 214, 231, 172, 35, 240, 171, 44, 9, 78, 91, 8, 193, 102, 5, 17, 178, 142, 106, 180, 183, 46, 38, 217, 212, 25, 236, 69, 47, 92, 217, 181, 221, 161, 205, 121, 201, 99, 38, 158, 216, 186, 193, 230, 86, 222, 6, 206, 67, 22, 255, 137, 212, 141, 161, 62, 134, 76, 48, 241, 54, 50, 167, 187, 247, 73, 27, 74, 1, 129, 185, 197, 153, 38, 90, 255, 138, 39, 161, 102, 172, 213, 74, 107, 88, 150, 90, 0, 49, 104, 7, 182, 184, 194, 219, 181, 172, 8, 245, 65, 226, 19, 228, 101, 145, 25, 159, 52, 31, 58, 93, 53, 59, 218, 91, 37, 84, 103, 17, 74, 133, 33, 35, 2, 203, 101, 73, 19, 94, 175, 122, 46, 224, 47, 168, 128, 128, 25, 26, 25, 214, 52, 247, 43, 241, 117, 52, 206, 94, 135, 156, 52, 164, 143, 234, 58, 185, 50, 185, 140, 198, 174, 71, 65, 18, 105, 70, 131, 172, 137, 0, 164, 51, 215, 143, 117, 119, 217, 241, 197, 177, 227, 227, 170, 199, 114, 7, 218, 12, 107, 30, 191, 236, 203, 21, 61, 242, 48, 192, 90, 233, 200, 199, 111, 162, 68, 234, 54, 219, 1, 233, 66, 5, 82, 74, 84, 211, 95, 199, 245, 202, 89, 223, 102, 124, 62, 166, 253, 253, 90, 180, 118, 21, 61, 110, 37, 5, 96, 167, 0, 0, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15, 0, 2, 65, 0, 0, 1, 0, 0, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 3, 54, 61, 144, 88, 171, 247, 136, 208, 99, 9, 135, 37, 201, 178, 253, 136, 0, 185, 235, 68, 160, 106, 110, 12, 46, 21, 125, 204, 18, 75, 234, 16, 3, 42, 171, 28, 52, 224, 11, 30, 30, 253, 156, 148, 175, 203, 121, 250, 111, 122, 195, 84, 122, 77, 183, 56, 135, 101, 88, 41, 60, 191, 99, 232, 85, 2, 36, 17, 156, 11, 8, 12, 189, 177, 68, 88, 28, 15, 207, 21, 179, 151, 56, 226, 158, 148, 3, 120, 113, 177, 243, 184, 17, 173, 37, 46, 222, 16];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let decoded_msg = read(&mut reader).unwrap();
+               match decoded_msg {
+                       Message::ChannelAnnouncement(msgs::ChannelAnnouncement { contents: msgs::UnsignedChannelAnnouncement { features, ..}, ..}) => {
+                               assert!(!features.requires_unknown_bits());
+                       },
+                       _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
+               }
+       }
 }
index 3c4ab77c16bc1d921d6057c21bb620fe410dce68..c76747cbbf8e1de83ab1ac381ae09c8c266de5d3 100644 (file)
@@ -51,6 +51,14 @@ pub struct ChannelHandshakeConfig {
        /// Default value: BREAKDOWN_TIMEOUT (currently 144), we enforce it as a minimum at channel
        /// opening so you can tweak config to ask for more security, not less.
        pub our_to_self_delay: u16,
+       /// Set to the smallest value HTLC we will accept to process.
+       ///
+       /// This value is sent to our counterparty on channel-open and we close the channel any time
+       /// our counterparty misbehaves by sending us an HTLC with a value smaller than this.
+       ///
+       /// Default value: 1. If the value is less than 1, it is ignored and set to 1, as is required
+       /// by the protocol.
+       pub our_htlc_minimum_msat: u64,
 }
 
 impl Default for ChannelHandshakeConfig {
@@ -58,6 +66,7 @@ impl Default for ChannelHandshakeConfig {
                ChannelHandshakeConfig {
                        minimum_depth: 6,
                        our_to_self_delay: BREAKDOWN_TIMEOUT,
+                       our_htlc_minimum_msat: 1,
                }
        }
 }
index 5fb70f65417c31ca45ccbb0b87ad8791a9e0fb3c..420d2fefad31ed1b20044be9aea70864a57cabb5 100644 (file)
@@ -278,12 +278,23 @@ pub enum MessageSendEvent {
        },
        /// Used to indicate that a channel_announcement and channel_update should be broadcast to all
        /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
+       ///
+       /// Note that after doing so, you very likely (unless you did so very recently) want to call
+       /// ChannelManager::broadcast_node_announcement to trigger a BroadcastNodeAnnouncement event.
+       /// This ensures that any nodes which see our channel_announcement also have a relevant
+       /// node_announcement, including relevant feature flags which may be important for routing
+       /// through or to us.
        BroadcastChannelAnnouncement {
                /// The channel_announcement which should be sent.
                msg: msgs::ChannelAnnouncement,
                /// The followup channel_update which should be sent.
                update_msg: msgs::ChannelUpdate,
        },
+       /// Used to indicate that a node_announcement should be broadcast to all peers.
+       BroadcastNodeAnnouncement {
+               /// The node_announcement which should be sent.
+               msg: msgs::NodeAnnouncement,
+       },
        /// Used to indicate that a channel_update should be broadcast to all peers.
        BroadcastChannelUpdate {
                /// The channel_update which should be sent.