//TODO: We should actually be testing at least much more than 64k...
match self.input.get_slice(2) {
Some(slice) => cmp::max(slice_to_be16(slice) as u64, 253),
- None => 0
+ None => 253
}
}
}
3 => {
let peer_id = get_slice!(1)[0];
if !peers.borrow()[peer_id as usize] { return; }
- match loss_detector.handler.read_event(&mut Peer{id: peer_id, peers_connected: &peers}, get_slice!(get_slice!(1)[0]).to_vec()) {
+ match loss_detector.handler.read_event(&mut Peer{id: peer_id, peers_connected: &peers}, get_slice!(get_slice!(1)[0])) {
Ok(res) => assert!(!res),
Err(_) => { peers.borrow_mut()[peer_id as usize] = false; }
}
msgs::DecodeError::UnknownVersion => return,
msgs::DecodeError::UnknownRequiredFeature => return,
msgs::DecodeError::InvalidValue => return,
- msgs::DecodeError::ExtraAddressesPerType => return,
msgs::DecodeError::BadLengthDescriptor => return,
msgs::DecodeError::ShortRead => panic!("We picked the length..."),
msgs::DecodeError::Io(e) => panic!(format!("{}", e)),
[package]
name = "lightning-net-tokio"
-version = "0.0.2"
+version = "0.0.3"
authors = ["Matt Corallo"]
license = "Apache-2.0"
+edition = "2018"
description = """
Implementation of the rust-lightning network stack using Tokio.
-For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the nerequired network stack, especially for those already using Tokio.
+For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the required network stack, especially for those already using Tokio.
"""
[dependencies]
bitcoin_hashes = "0.7"
lightning = { version = "0.0.10", path = "../lightning" }
secp256k1 = "0.15"
-tokio-codec = "0.1"
-futures = "0.1"
-tokio = "0.1"
-bytes = "0.4"
+tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "sync", "tcp", "time" ] }
+
+[dev-dependencies]
+tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "rt-threaded", "sync", "tcp", "time" ] }
-extern crate bytes;
-extern crate tokio;
-extern crate tokio_codec;
-extern crate futures;
-extern crate lightning;
-extern crate secp256k1;
-
-use bytes::BufMut;
-
-use futures::future;
-use futures::future::Future;
-use futures::{AsyncSink, Stream, Sink};
-use futures::sync::mpsc;
+//! A socket handling library for those running in Tokio environments who wish to use
+//! rust-lightning with native TcpStreams.
+//!
+//! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a
+//! TcpStream and a reference to a PeerManager and the rest is handled", except for the
+//! [Event](../lightning/util/events/enum.Event.html) handlng mechanism, see below.
+//!
+//! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use
+//! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor.
+//!
+//! Three methods are exposed to register a new connection for handling in tokio::spawn calls, see
+//! their individual docs for more. All three take a
+//! [mpsc::Sender<()>](../tokio/sync/mpsc/struct.Sender.html) which is sent into every time
+//! something occurs which may result in lightning [Events](../lightning/util/events/enum.Event.html).
+//! The call site should, thus, look something like this:
+//! ```
+//! use tokio::sync::mpsc;
+//! use tokio::net::TcpStream;
+//! use secp256k1::key::PublicKey;
+//! use lightning::util::events::EventsProvider;
+//! use std::net::SocketAddr;
+//! use std::sync::Arc;
+//!
+//! // Define concrete types for our high-level objects:
+//! type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
+//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
+//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>>;
+//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChannelMonitor, TxBroadcaster, FeeEstimator>;
+//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator>;
+//!
+//! // Connect to node with pubkey their_node_id at addr:
+//! async fn connect_to_node(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
+//! let (sender, mut receiver) = mpsc::channel(2);
+//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
+//! loop {
+//! receiver.recv().await;
+//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
+//! // Handle the event!
+//! }
+//! for _event in channel_monitor.get_and_clear_pending_events().drain(..) {
+//! // Handle the event!
+//! }
+//! }
+//! }
+//!
+//! // Begin reading from a newly accepted socket and talk to the peer:
+//! async fn accept_socket(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, socket: TcpStream) {
+//! let (sender, mut receiver) = mpsc::channel(2);
+//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
+//! loop {
+//! receiver.recv().await;
+//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
+//! // Handle the event!
+//! }
+//! for _event in channel_monitor.get_and_clear_pending_events().drain(..) {
+//! // Handle the event!
+//! }
+//! }
+//! }
+//! ```
use secp256k1::key::PublicKey;
-use tokio::timer::Delay;
use tokio::net::TcpStream;
+use tokio::{io, time};
+use tokio::sync::mpsc;
+use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::msgs::ChannelMessageHandler;
-use std::mem;
+use std::{task, thread};
use std::net::SocketAddr;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::{Duration, Instant};
-use std::vec::Vec;
+use std::time::Duration;
use std::hash::Hash;
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
-/// A connection to a remote peer. Can be constructed either as a remote connection using
-/// Connection::setup_outbound o
-pub struct Connection {
- writer: Option<mpsc::Sender<bytes::Bytes>>,
+/// Connection contains all our internal state for a connection - we hold a reference to the
+/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
+/// read future (which is returned by schedule_read).
+struct Connection {
+ writer: Option<io::WriteHalf<TcpStream>>,
event_notify: mpsc::Sender<()>,
- pending_read: Vec<u8>,
- read_blocker: Option<futures::sync::oneshot::Sender<Result<(), ()>>>,
+ // Because our PeerManager is templated by user-provided types, and we can't (as far as I can
+ // tell) have a const RawWakerVTable built out of templated functions, we need some indirection
+ // between being woken up with write-ready and calling PeerManager::write_buffer_spce_avail.
+ // This provides that indirection, with a Sender which gets handed to the PeerManager Arc on
+ // the schedule_read stack.
+ //
+ // An alternative (likely more effecient) approach would involve creating a RawWakerVTable at
+ // runtime with functions templated by the Arc<PeerManager> type, calling
+ // write_buffer_space_avail directly from tokio's write wake, however doing so would require
+ // more unsafe voodo than I really feel like writing.
+ write_avail: mpsc::Sender<()>,
+ // When we are told by rust-lightning to pause read (because we have writes backing up), we do
+ // so by setting read_paused. At that point, the read task will stop reading bytes from the
+ // socket. To wake it up (without otherwise changing its state, we can push a value into this
+ // Sender.
+ read_waker: mpsc::Sender<()>,
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
+ // This is set to true if we're in such a condition (with disconnect checked before with the
+ // top-level mutex held) and false when we can return.
+ block_disconnect_socket: bool,
read_paused: bool,
- need_disconnect: bool,
+ rl_requested_disconnect: bool,
id: u64,
}
impl Connection {
- fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, us: Arc<Mutex<Self>>, reader: futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>) {
- let us_ref = us.clone();
- let us_close_ref = us.clone();
+ fn event_trigger(us: &mut MutexGuard<Self>) {
+ match us.event_notify.try_send(()) {
+ Ok(_) => {},
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ // Ignore full errors as we just need the user to poll after this point, so if they
+ // haven't received the last send yet, it doesn't matter.
+ },
+ _ => panic!()
+ }
+ }
+ async fn schedule_read<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) {
let peer_manager_ref = peer_manager.clone();
- tokio::spawn(reader.for_each(move |b| {
- let pending_read = b.to_vec();
- {
- let mut lock = us_ref.lock().unwrap();
- assert!(lock.pending_read.is_empty());
- if lock.read_paused {
- lock.pending_read = pending_read;
- let (sender, blocker) = futures::sync::oneshot::channel();
- lock.read_blocker = Some(sender);
- return future::Either::A(blocker.then(|_| { Ok(()) }));
- }
- }
- //TODO: There's a race where we don't meet the requirements of socket_disconnected if its
- //called right here, after we release the us_ref lock in the scope above, but before we
- //call read_event!
- match peer_manager.read_event(&mut SocketDescriptor::new(us_ref.clone(), peer_manager.clone()), pending_read) {
- Ok(pause_read) => {
- if pause_read {
- let mut lock = us_ref.lock().unwrap();
- lock.read_paused = true;
- }
- },
- Err(e) => {
- us_ref.lock().unwrap().need_disconnect = false;
- return future::Either::B(future::result(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))));
- }
+ // 8KB is nice and big but also should never cause any issues with stack overflowing.
+ let mut buf = [0; 8192];
+
+ let mut our_descriptor = SocketDescriptor::new(us.clone());
+ // An enum describing why we did/are disconnecting:
+ enum Disconnect {
+ // Rust-Lightning told us to disconnect, either by returning an Err or by calling
+ // SocketDescriptor::disconnect_socket.
+ // In this case, we do not call peer_manager.socket_disconnected() as Rust-Lightning
+ // already knows we're disconnected.
+ CloseConnection,
+ // The connection was disconnected for some other reason, ie because the socket was
+ // closed.
+ // In this case, we do need to call peer_manager.socket_disconnected() to inform
+ // Rust-Lightning that the socket is gone.
+ PeerDisconnected
+ };
+ let disconnect_type = loop {
+ macro_rules! shutdown_socket {
+ ($err: expr, $need_disconnect: expr) => { {
+ println!("Disconnecting peer due to {}!", $err);
+ break $need_disconnect;
+ } }
}
- if let Err(e) = us_ref.lock().unwrap().event_notify.try_send(()) {
- // Ignore full errors as we just need them to poll after this point, so if the user
- // hasn't received the last send yet, it doesn't matter.
- assert!(e.is_full());
+ macro_rules! prepare_read_write_call {
+ () => { {
+ let mut us_lock = us.lock().unwrap();
+ if us_lock.rl_requested_disconnect {
+ shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection);
+ }
+ us_lock.block_disconnect_socket = true;
+ } }
}
- future::Either::B(future::result(Ok(())))
- }).then(move |_| {
- if us_close_ref.lock().unwrap().need_disconnect {
- peer_manager_ref.socket_disconnected(&SocketDescriptor::new(us_close_ref, peer_manager_ref.clone()));
- println!("Peer disconnected!");
- } else {
- println!("We disconnected peer!");
+ let read_paused = us.lock().unwrap().read_paused;
+ tokio::select! {
+ v = write_avail_receiver.recv() => {
+ assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
+ prepare_read_write_call!();
+ if let Err(e) = peer_manager.write_buffer_space_avail(&mut our_descriptor) {
+ shutdown_socket!(e, Disconnect::CloseConnection);
+ }
+ us.lock().unwrap().block_disconnect_socket = false;
+ },
+ _ = read_wake_receiver.recv() => {},
+ read = reader.read(&mut buf), if !read_paused => match read {
+ Ok(0) => shutdown_socket!("Connection closed", Disconnect::PeerDisconnected),
+ Ok(len) => {
+ prepare_read_write_call!();
+ let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
+ let mut us_lock = us.lock().unwrap();
+ match read_res {
+ Ok(pause_read) => {
+ if pause_read {
+ us_lock.read_paused = true;
+ }
+ Self::event_trigger(&mut us_lock);
+ },
+ Err(e) => shutdown_socket!(e, Disconnect::CloseConnection),
+ }
+ us_lock.block_disconnect_socket = false;
+ },
+ Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected),
+ },
}
- Ok(())
- }));
+ };
+ let writer_option = us.lock().unwrap().writer.take();
+ if let Some(mut writer) = writer_option {
+ // If the socket is already closed, shutdown() will fail, so just ignore it.
+ let _ = writer.shutdown().await;
+ }
+ if let Disconnect::PeerDisconnected = disconnect_type {
+ peer_manager_ref.socket_disconnected(&our_descriptor);
+ Self::event_trigger(&mut us.lock().unwrap());
+ }
}
- fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (futures::stream::SplitStream<tokio_codec::Framed<TcpStream, tokio_codec::BytesCodec>>, Arc<Mutex<Self>>) {
- let (writer, reader) = tokio_codec::Framed::new(stream, tokio_codec::BytesCodec::new()).split();
- let (send_sink, send_stream) = mpsc::channel(3);
- tokio::spawn(writer.send_all(send_stream.map_err(|_| -> std::io::Error {
- unreachable!();
- })).then(|_| {
- future::result(Ok(()))
- }));
- let us = Arc::new(Mutex::new(Self { writer: Some(send_sink), event_notify, pending_read: Vec::new(), read_blocker: None, read_paused: false, need_disconnect: true, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) }));
-
- (reader, us)
+ fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
+ // We only ever need a channel of depth 1 here: if we returned a non-full write to the
+ // PeerManager, we will eventually get notified that there is room in the socket to write
+ // new bytes, which will generate an event. That event will be popped off the queue before
+ // we call write_buffer_space_avail, ensuring that we have room to push a new () if, during
+ // the write_buffer_space_avail() call, send_data() returns a non-full write.
+ let (write_avail, write_receiver) = mpsc::channel(1);
+ // Similarly here - our only goal is to make sure the reader wakes up at some point after
+ // we shove a value into the channel which comes after we've reset the read_paused bool to
+ // false.
+ let (read_waker, read_receiver) = mpsc::channel(1);
+ let (reader, writer) = io::split(stream);
+
+ (reader, write_receiver, read_receiver,
+ Arc::new(Mutex::new(Self {
+ writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false,
+ block_disconnect_socket: false, rl_requested_disconnect: false,
+ id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
+ })))
}
+}
+
+/// Process incoming messages and feed outgoing messages on the provided socket generated by
+/// accepting an incoming connection.
+///
+/// The returned future will complete when the peer is disconnected and associated handling
+/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
+/// not need to poll the provided future in order to make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> {
+ let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
+ #[cfg(debug_assertions)]
+ let last_us = Arc::clone(&us);
- /// Process incoming messages and feed outgoing messages on the provided socket generated by
- /// accepting an incoming connection (by scheduling futures with tokio::spawn).
- ///
- /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
- /// ChannelManager and ChannelMonitor objects.
- pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) {
- let (reader, us) = Self::new(event_notify, stream);
+ let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone())) {
+ Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
+ } else {
+ // Note that we will skip socket_disconnected here, in accordance with the PeerManager
+ // requirements.
+ None
+ };
- if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) {
- Self::schedule_read(peer_manager, us, reader);
+ async move {
+ if let Some(handle) = handle_opt {
+ if let Err(e) = handle.await {
+ assert!(e.is_cancelled());
+ } else {
+ // This is certainly not guaranteed to always be true - the read loop may exit
+ // while there are still pending write wakers that need to be woken up after the
+ // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
+ // keep too many wakers around, this makes sense. The race should be rare (we do
+ // some work after shutdown()) and an error would be a major memory leak.
+ #[cfg(debug_assertions)]
+ assert!(Arc::try_unwrap(last_us).is_ok());
+ }
}
}
+}
- /// Process incoming messages and feed outgoing messages on the provided socket generated by
- /// making an outbound connection which is expected to be accepted by a peer with the given
- /// public key (by scheduling futures with tokio::spawn).
- ///
- /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
- /// ChannelManager and ChannelMonitor objects.
- pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) {
- let (reader, us) = Self::new(event_notify, stream);
-
- if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) {
- if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, true) == initial_send.len() {
- Self::schedule_read(peer_manager, us, reader);
+/// Process incoming messages and feed outgoing messages on the provided socket generated by
+/// making an outbound connection which is expected to be accepted by a peer with the given
+/// public key. The relevant processing is set to run free (via tokio::spawn).
+///
+/// The returned future will complete when the peer is disconnected and associated handling
+/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
+/// not need to poll the provided future in order to make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
+ let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
+ #[cfg(debug_assertions)]
+ let last_us = Arc::clone(&us);
+
+ let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
+ Some(tokio::spawn(async move {
+ // We should essentially always have enough room in a TCP socket buffer to send the
+ // initial 10s of bytes. However, tokio running in single-threaded mode will always
+ // fail writes and wake us back up later to write. Thus, we handle a single
+ // std::task::Poll::Pending but still expect to write the full set of bytes at once
+ // and use a relatively tight timeout.
+ if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
+ loop {
+ match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
+ v if v == initial_send.len() => break Ok(()),
+ 0 => {
+ write_receiver.recv().await;
+ // In theory we could check for if we've been instructed to disconnect
+ // the peer here, but its OK to just skip it - we'll check for it in
+ // schedule_read prior to any relevant calls into RL.
+ },
+ _ => {
+ eprintln!("Failed to write first full message to socket!");
+ peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
+ break Err(());
+ }
+ }
+ }
+ }).await {
+ Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
+ }
+ }))
+ } else {
+ // Note that we will skip socket_disconnected here, in accordance with the PeerManager
+ // requirements.
+ None
+ };
+
+ async move {
+ if let Some(handle) = handle_opt {
+ if let Err(e) = handle.await {
+ assert!(e.is_cancelled());
} else {
- println!("Failed to write first full message to socket!");
+ // This is certainly not guaranteed to always be true - the read loop may exit
+ // while there are still pending write wakers that need to be woken up after the
+ // socket shutdown(). Still, as a check during testing, to make sure tokio doesn't
+ // keep too many wakers around, this makes sense. The race should be rare (we do
+ // some work after shutdown()) and an error would be a major memory leak.
+ #[cfg(debug_assertions)]
+ assert!(Arc::try_unwrap(last_us).is_ok());
}
}
}
+}
- /// Process incoming messages and feed outgoing messages on a new connection made to the given
- /// socket address which is expected to be accepted by a peer with the given public key (by
- /// scheduling futures with tokio::spawn).
- ///
- /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
- /// ChannelManager and ChannelMonitor objects.
- pub fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) {
- let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| {
- future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
- });
- tokio::spawn(TcpStream::connect(&addr).select(connect_timeout)
- .and_then(move |stream| {
- Connection::setup_outbound(peer_manager, event_notify, their_node_id, stream.0);
- future::ok(())
- }).or_else(|_| {
- //TODO: return errors somehow
- future::ok(())
- }));
- }
+/// Process incoming messages and feed outgoing messages on a new connection made to the given
+/// socket address which is expected to be accepted by a peer with the given public key (by
+/// scheduling futures with tokio::spawn).
+///
+/// Shorthand for TcpStream::connect(addr) with a timeout followed by setup_outbound().
+///
+/// Returns a future (as the fn is async) which needs to be polled to complete the connection and
+/// connection setup. That future then returns a future which will complete when the peer is
+/// disconnected and associated handling futures are freed, though, because all processing in said
+/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
+/// make progress.
+///
+/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
+pub async fn connect_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> {
+ if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)).await {
+ Some(setup_outbound(peer_manager, event_notify, their_node_id, stream))
+ } else { None }
+}
+
+const SOCK_WAKER_VTABLE: task::RawWakerVTable =
+ task::RawWakerVTable::new(clone_socket_waker, wake_socket_waker, wake_socket_waker_by_ref, drop_socket_waker);
+
+fn clone_socket_waker(orig_ptr: *const ()) -> task::RawWaker {
+ write_avail_to_waker(orig_ptr as *const mpsc::Sender<()>)
+}
+// When waking, an error should be fine. Most likely we got two send_datas in a row, both of which
+// failed to fully write, but we only need to call write_buffer_space_avail() once. Otherwise, the
+// sending thread may have already gone away due to a socket close, in which case there's nothing
+// to wake up anyway.
+fn wake_socket_waker(orig_ptr: *const ()) {
+ let sender = unsafe { &mut *(orig_ptr as *mut mpsc::Sender<()>) };
+ let _ = sender.try_send(());
+ drop_socket_waker(orig_ptr);
+}
+fn wake_socket_waker_by_ref(orig_ptr: *const ()) {
+ let sender_ptr = orig_ptr as *const mpsc::Sender<()>;
+ let mut sender = unsafe { (*sender_ptr).clone() };
+ let _ = sender.try_send(());
+}
+fn drop_socket_waker(orig_ptr: *const ()) {
+ let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut mpsc::Sender<()>) };
+ // _orig_box is now dropped
+}
+fn write_avail_to_waker(sender: *const mpsc::Sender<()>) -> task::RawWaker {
+ let new_box = Box::leak(Box::new(unsafe { (*sender).clone() }));
+ let new_ptr = new_box as *const mpsc::Sender<()>;
+ task::RawWaker::new(new_ptr as *const (), &SOCK_WAKER_VTABLE)
}
-pub struct SocketDescriptor<CMH: ChannelMessageHandler + 'static> {
+/// The SocketDescriptor used to refer to sockets by a PeerHandler. This is pub only as it is a
+/// type in the template of PeerHandler.
+pub struct SocketDescriptor {
conn: Arc<Mutex<Connection>>,
id: u64,
- peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>,
}
-impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
- fn new(conn: Arc<Mutex<Connection>>, peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor<CMH>, Arc<CMH>>>) -> Self {
+impl SocketDescriptor {
+ fn new(conn: Arc<Mutex<Connection>>) -> Self {
let id = conn.lock().unwrap().id;
- Self { conn, id, peer_manager }
+ Self { conn, id }
}
}
-impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescriptor<CMH> {
+impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
- macro_rules! schedule_read {
- ($us_ref: expr) => {
- tokio::spawn(future::lazy(move || -> Result<(), ()> {
- let mut read_data = Vec::new();
- {
- let mut us = $us_ref.conn.lock().unwrap();
- mem::swap(&mut read_data, &mut us.pending_read);
- }
- if !read_data.is_empty() {
- let mut us_clone = $us_ref.clone();
- match $us_ref.peer_manager.read_event(&mut us_clone, read_data) {
- Ok(pause_read) => {
- if pause_read { return Ok(()); }
- },
- Err(_) => {
- //TODO: Not actually sure how to do this
- return Ok(());
- }
- }
- }
- let mut us = $us_ref.conn.lock().unwrap();
- if let Some(sender) = us.read_blocker.take() {
- sender.send(Ok(())).unwrap();
- }
- us.read_paused = false;
- if let Err(e) = us.event_notify.try_send(()) {
- // Ignore full errors as we just need them to poll after this point, so if the user
- // hasn't received the last send yet, it doesn't matter.
- assert!(e.is_full());
- }
- Ok(())
- }));
- }
- }
-
+ // To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
+ // writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
+ // a SocketDescriptor in it which can wake up the write_avail Sender, waking up the
+ // processing future which will call write_buffer_space_avail and we'll end up back here.
let mut us = self.conn.lock().unwrap();
- if resume_read {
- let us_ref = self.clone();
- schedule_read!(us_ref);
- }
- if data.is_empty() { return 0; }
if us.writer.is_none() {
- us.read_paused = true;
+ // The writer gets take()n when it is time to shut down, so just fast-return 0 here.
return 0;
}
- let mut bytes = bytes::BytesMut::with_capacity(data.len());
- bytes.put(data);
- let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze());
- match write_res {
- Ok(res) => {
- match res {
- AsyncSink::Ready => {
- data.len()
- },
- AsyncSink::NotReady(_) => {
- us.read_paused = true;
- let us_ref = self.clone();
- tokio::spawn(us.writer.take().unwrap().flush().then(move |writer_res| -> Result<(), ()> {
- if let Ok(writer) = writer_res {
- {
- let mut us = us_ref.conn.lock().unwrap();
- us.writer = Some(writer);
- }
- schedule_read!(us_ref);
- } // we'll fire the disconnect event on the socket reader end
- Ok(())
- }));
- 0
- }
- }
- },
- Err(_) => {
- // We'll fire the disconnected event on the socket reader end
- 0
- },
+ if resume_read && us.read_paused {
+ // The schedule_read future may go to lock up but end up getting woken up by there
+ // being more room in the write buffer, dropping the other end of this Sender
+ // before we get here, so we ignore any failures to wake it up.
+ us.read_paused = false;
+ let _ = us.read_waker.try_send(());
+ }
+ if data.is_empty() { return 0; }
+ let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&us.write_avail)) };
+ let mut ctx = task::Context::from_waker(&waker);
+ let mut written_len = 0;
+ loop {
+ match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
+ task::Poll::Ready(Ok(res)) => {
+ // The tokio docs *seem* to indicate this can't happen, and I certainly don't
+ // know how to handle it if it does (cause it should be a Poll::Pending
+ // instead):
+ assert_ne!(res, 0);
+ written_len += res;
+ if written_len == data.len() { return written_len; }
+ },
+ task::Poll::Ready(Err(e)) => {
+ // The tokio docs *seem* to indicate this can't happen, and I certainly don't
+ // know how to handle it if it does (cause it should be a Poll::Pending
+ // instead):
+ assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
+ // Probably we've already been closed, just return what we have and let the
+ // read thread handle closing logic.
+ return written_len;
+ },
+ task::Poll::Pending => {
+ // We're queued up for a write event now, but we need to make sure we also
+ // pause read given we're now waiting on the remote end to ACK (and in
+ // accordance with the send_data() docs).
+ us.read_paused = true;
+ return written_len;
+ },
+ }
}
}
fn disconnect_socket(&mut self) {
- let mut us = self.conn.lock().unwrap();
- us.need_disconnect = true;
- us.read_paused = true;
+ {
+ let mut us = self.conn.lock().unwrap();
+ us.rl_requested_disconnect = true;
+ us.read_paused = true;
+ // Wake up the sending thread, assuming it is still alive
+ let _ = us.write_avail.try_send(());
+ // Happy-path return:
+ if !us.block_disconnect_socket { return; }
+ }
+ while self.conn.lock().unwrap().block_disconnect_socket {
+ thread::yield_now();
+ }
}
}
-impl<CMH: ChannelMessageHandler> Clone for SocketDescriptor<CMH> {
+impl Clone for SocketDescriptor {
fn clone(&self) -> Self {
Self {
conn: Arc::clone(&self.conn),
id: self.id,
- peer_manager: Arc::clone(&self.peer_manager),
}
}
}
-impl<CMH: ChannelMessageHandler> Eq for SocketDescriptor<CMH> {}
-impl<CMH: ChannelMessageHandler> PartialEq for SocketDescriptor<CMH> {
+impl Eq for SocketDescriptor {}
+impl PartialEq for SocketDescriptor {
fn eq(&self, o: &Self) -> bool {
self.id == o.id
}
}
-impl<CMH: ChannelMessageHandler> Hash for SocketDescriptor<CMH> {
+impl Hash for SocketDescriptor {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
+#[cfg(test)]
+mod tests {
+ use lightning::ln::features::*;
+ use lightning::ln::msgs::*;
+ use lightning::ln::peer_handler::{MessageHandler, PeerManager};
+ use lightning::util::events::*;
+ use secp256k1::{Secp256k1, SecretKey, PublicKey};
+
+ use tokio::sync::mpsc;
+
+ use std::mem;
+ use std::sync::{Arc, Mutex};
+ use std::time::Duration;
+
+ pub struct TestLogger();
+ impl lightning::util::logger::Logger for TestLogger {
+ fn log(&self, record: &lightning::util::logger::Record) {
+ println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+ }
+ }
+
+ struct MsgHandler{
+ expected_pubkey: PublicKey,
+ pubkey_connected: mpsc::Sender<()>,
+ pubkey_disconnected: mpsc::Sender<()>,
+ msg_events: Mutex<Vec<MessageSendEvent>>,
+ }
+ impl RoutingMessageHandler for MsgHandler {
+ fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
+ fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)> { Vec::new() }
+ fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
+ fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+ }
+ impl ChannelMessageHandler for MsgHandler {
+ fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
+ fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &AcceptChannel) {}
+ fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &FundingCreated) {}
+ fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &FundingSigned) {}
+ fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &FundingLocked) {}
+ fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {}
+ fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {}
+ fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {}
+ fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {}
+ fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailHTLC) {}
+ fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC) {}
+ fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &CommitmentSigned) {}
+ fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {}
+ fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {}
+ fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {}
+ fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
+ if *their_node_id == self.expected_pubkey {
+ self.pubkey_disconnected.clone().try_send(()).unwrap();
+ }
+ }
+ fn peer_connected(&self, their_node_id: &PublicKey, _msg: &Init) {
+ if *their_node_id == self.expected_pubkey {
+ self.pubkey_connected.clone().try_send(()).unwrap();
+ }
+ }
+ fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {}
+ fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {}
+ }
+ impl MessageSendEventsProvider for MsgHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ let mut ret = Vec::new();
+ mem::swap(&mut *self.msg_events.lock().unwrap(), &mut ret);
+ ret
+ }
+ }
+
+ async fn do_basic_connection_test() {
+ let secp_ctx = Secp256k1::new();
+ let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
+ let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
+ let a_pub = PublicKey::from_secret_key(&secp_ctx, &a_key);
+ let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
+
+ let (a_connected_sender, mut a_connected) = mpsc::channel(1);
+ let (a_disconnected_sender, mut a_disconnected) = mpsc::channel(1);
+ let a_handler = Arc::new(MsgHandler {
+ expected_pubkey: b_pub,
+ pubkey_connected: a_connected_sender,
+ pubkey_disconnected: a_disconnected_sender,
+ msg_events: Mutex::new(Vec::new()),
+ });
+ let a_manager = Arc::new(PeerManager::new(MessageHandler {
+ chan_handler: Arc::clone(&a_handler),
+ route_handler: Arc::clone(&a_handler) as Arc<dyn RoutingMessageHandler>,
+ }, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
+
+ let (b_connected_sender, mut b_connected) = mpsc::channel(1);
+ let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
+ let b_handler = Arc::new(MsgHandler {
+ expected_pubkey: a_pub,
+ pubkey_connected: b_connected_sender,
+ pubkey_disconnected: b_disconnected_sender,
+ msg_events: Mutex::new(Vec::new()),
+ });
+ let b_manager = Arc::new(PeerManager::new(MessageHandler {
+ chan_handler: Arc::clone(&b_handler),
+ route_handler: Arc::clone(&b_handler) as Arc<dyn RoutingMessageHandler>,
+ }, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
+
+ // We bind on localhost, hoping the environment is properly configured with a local
+ // address. This may not always be the case in containers and the like, so if this test is
+ // failing for you check that you have a loopback interface and it is configured with
+ // 127.0.0.1.
+ let (conn_a, conn_b) = if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9735") {
+ (std::net::TcpStream::connect("127.0.0.1:9735").unwrap(), listener.accept().unwrap().0)
+ } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9999") {
+ (std::net::TcpStream::connect("127.0.0.1:9999").unwrap(), listener.accept().unwrap().0)
+ } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:46926") {
+ (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0)
+ } else { panic!("Failed to bind to v4 localhost on common ports"); };
+
+ let (sender, _receiver) = mpsc::channel(2);
+ let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap());
+ let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap());
+
+ tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap();
+ tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap();
+
+ a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError {
+ node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None }
+ });
+ assert!(a_disconnected.try_recv().is_err());
+ assert!(b_disconnected.try_recv().is_err());
+
+ a_manager.process_events();
+ tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap();
+ tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap();
+
+ fut_a.await;
+ fut_b.await;
+ }
+
+ #[tokio::test(threaded_scheduler)]
+ async fn basic_threaded_connection_test() {
+ do_basic_connection_test().await;
+ }
+ #[tokio::test]
+ async fn basic_unthreaded_connection_test() {
+ do_basic_connection_test().await;
+ }
+}
holding_cell_update_fee: Option<u64>,
next_local_htlc_id: u64,
next_remote_htlc_id: u64,
- channel_update_count: u32,
+ update_time_counter: u32,
feerate_per_kw: u64,
#[cfg(debug_assertions)]
cmp::max(at_open_background_feerate * B_OUTPUT_PLUS_SPENDING_INPUT_WEIGHT / 1000, 546) //TODO
}
- fn derive_our_htlc_minimum_msat(_at_open_channel_feerate_per_kw: u64) -> u64 {
- 1000 // TODO
- }
-
// Constructors:
pub fn new_outbound<K: Deref, F: Deref>(fee_estimator: &F, keys_provider: &K, their_node_id: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64, logger: Arc<Logger>, config: &UserConfig) -> Result<Channel<ChanSigner>, APIError>
where K::Target: KeysInterface<ChanKeySigner = ChanSigner>,
holding_cell_update_fee: None,
next_local_htlc_id: 0,
next_remote_htlc_id: 0,
- channel_update_count: 1,
+ update_time_counter: 1,
resend_order: RAACommitmentOrder::CommitmentFirst,
their_max_htlc_value_in_flight_msat: 0,
their_channel_reserve_satoshis: 0,
their_htlc_minimum_msat: 0,
- our_htlc_minimum_msat: Channel::<ChanSigner>::derive_our_htlc_minimum_msat(feerate),
+ our_htlc_minimum_msat: if config.own_channel_config.our_htlc_minimum_msat == 0 { 1 } else { config.own_channel_config.our_htlc_minimum_msat },
their_to_self_delay: 0,
our_to_self_delay: config.own_channel_config.our_to_self_delay,
their_max_accepted_htlcs: 0,
holding_cell_update_fee: None,
next_local_htlc_id: 0,
next_remote_htlc_id: 0,
- channel_update_count: 1,
+ update_time_counter: 1,
resend_order: RAACommitmentOrder::CommitmentFirst,
their_max_htlc_value_in_flight_msat: cmp::min(msg.max_htlc_value_in_flight_msat, msg.funding_satoshis * 1000),
their_channel_reserve_satoshis: msg.channel_reserve_satoshis,
their_htlc_minimum_msat: msg.htlc_minimum_msat,
- our_htlc_minimum_msat: Channel::<ChanSigner>::derive_our_htlc_minimum_msat(msg.feerate_per_kw as u64),
+ our_htlc_minimum_msat: if config.own_channel_config.our_htlc_minimum_msat == 0 { 1 } else { config.own_channel_config.our_htlc_minimum_msat },
their_to_self_delay: msg.to_self_delay,
our_to_self_delay: config.own_channel_config.our_to_self_delay,
their_max_accepted_htlcs: msg.max_accepted_htlcs,
self.channel_state |= ChannelState::TheirFundingLocked as u32;
} else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) {
self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS);
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
} else if (self.channel_state & (ChannelState::ChannelFunded as u32) != 0 &&
// Note that funding_signed/funding_created will have decremented both by 1!
self.cur_local_commitment_transaction_number == INITIAL_COMMITMENT_NUMBER - 1 &&
if msg.amount_msat > self.channel_value_satoshis * 1000 {
return Err(ChannelError::Close("Remote side tried to send more than the total value of the channel"));
}
+ if msg.amount_msat == 0 {
+ return Err(ChannelError::Close("Remote side tried to send a 0-msat HTLC"));
+ }
if msg.amount_msat < self.our_htlc_minimum_msat {
return Err(ChannelError::Close("Remote side tried to send less than our minimum HTLC value"));
}
}
Channel::<ChanSigner>::check_remote_fee(fee_estimator, msg.feerate_per_kw)?;
self.pending_update_fee = Some(msg.feerate_per_kw as u64);
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
Ok(())
}
// From here on out, we may not fail!
self.channel_state |= ChannelState::RemoteShutdownSent as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
// We can't send our shutdown until we've committed all of our pending HTLCs, but the
// remote side is unlikely to accept any new HTLCs, so we go ahead and "free" any holding
};
self.channel_state |= ChannelState::LocalShutdownSent as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
Ok((our_shutdown, self.maybe_propose_first_closing_signed(fee_estimator), dropped_outbound_htlcs))
}
if last_fee == msg.fee_satoshis {
self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &our_sig);
self.channel_state = ChannelState::ShutdownComplete as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
return Ok((None, Some(closing_tx)));
}
}
self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &our_sig);
self.channel_state = ChannelState::ShutdownComplete as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
Ok((Some(msgs::ClosingSigned {
channel_id: self.channel_id,
}
/// Allowed in any state (including after shutdown)
- pub fn get_channel_update_count(&self) -> u32 {
- self.channel_update_count
+ pub fn get_update_time_counter(&self) -> u32 {
+ self.update_time_counter
}
pub fn get_latest_monitor_update_id(&self) -> u64 {
panic!("Client called ChannelManager::funding_transaction_generated with bogus transaction!");
}
self.channel_state = ChannelState::ShutdownComplete as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
return Err(msgs::ErrorMessage {
channel_id: self.channel_id(),
data: "funding tx had wrong script/value".to_owned()
}
if header.bitcoin_hash() != self.last_block_connected {
self.last_block_connected = header.bitcoin_hash();
+ self.update_time_counter = cmp::max(self.update_time_counter, header.time);
if let Some(channel_monitor) = self.channel_monitor.as_mut() {
channel_monitor.last_block_hash = self.last_block_connected;
}
true
} else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::TheirFundingLocked as u32) {
self.channel_state = ChannelState::ChannelFunded as u32 | (self.channel_state & MULTI_STATE_FLAGS);
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
true
} else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurFundingLocked as u32) {
// We got a reorg but not enough to trigger a force close, just update
if amount_msat > self.channel_value_satoshis * 1000 {
return Err(ChannelError::Ignore("Cannot send more than the total value of the channel"));
}
+
+ if amount_msat == 0 {
+ return Err(ChannelError::Ignore("Cannot send 0-msat HTLC"));
+ }
+
if amount_msat < self.their_htlc_minimum_msat {
return Err(ChannelError::Ignore("Cannot send less than their minimum HTLC value"));
}
} else {
self.channel_state |= ChannelState::LocalShutdownSent as u32;
}
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
// Go ahead and drop holding cell updates as we'd rather fail payments than wait to send
// our shutdown until we've committed all of the pending changes.
}
self.channel_state = ChannelState::ShutdownComplete as u32;
- self.channel_update_count += 1;
+ self.update_time_counter += 1;
if self.channel_monitor.is_some() {
(self.channel_monitor.as_mut().unwrap().get_latest_local_commitment_txn(), dropped_outbound_htlcs)
} else {
self.next_local_htlc_id.write(writer)?;
(self.next_remote_htlc_id - dropped_inbound_htlcs).write(writer)?;
- self.channel_update_count.write(writer)?;
+ self.update_time_counter.write(writer)?;
self.feerate_per_kw.write(writer)?;
match self.last_sent_closing_fee {
let next_local_htlc_id = Readable::read(reader)?;
let next_remote_htlc_id = Readable::read(reader)?;
- let channel_update_count = Readable::read(reader)?;
+ let update_time_counter = Readable::read(reader)?;
let feerate_per_kw = Readable::read(reader)?;
let last_sent_closing_fee = match <u8 as Readable>::read(reader)? {
holding_cell_update_fee,
next_local_htlc_id,
next_remote_htlc_id,
- channel_update_count,
+ update_time_counter,
feerate_per_kw,
#[cfg(debug_assertions)]
use chain::transaction::OutPoint;
use ln::channel::{Channel, ChannelError};
use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use ln::features::{InitFeatures, NodeFeatures};
use ln::router::Route;
-use ln::features::InitFeatures;
use ln::msgs;
use ln::onion_utils;
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
channel_state: Mutex<ChannelHolder<ChanSigner>>,
our_network_key: SecretKey,
+ /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
+ /// value increases strictly since we don't assume access to a time source.
+ last_node_announcement_serial: AtomicUsize,
+
/// The bulk of our storage will eventually be here (channels and message queues and the like).
/// If we are connected to a peer we always at least have an entry here, even if no channels
/// are currently open with that peer.
}),
our_network_key: keys_manager.get_node_secret(),
+ last_node_announcement_serial: AtomicUsize::new(0),
+
per_peer_state: RwLock::new(HashMap::new()),
pending_events: Mutex::new(Vec::new()),
let unsigned = msgs::UnsignedChannelUpdate {
chain_hash: self.genesis_hash,
short_channel_id: short_channel_id,
- timestamp: chan.get_channel_update_count(),
+ timestamp: chan.get_update_time_counter(),
flags: (!were_node_one) as u16 | ((!chan.is_live() as u16) << 1),
cltv_expiry_delta: CLTV_EXPIRY_DELTA,
htlc_minimum_msat: chan.get_our_htlc_minimum_msat(),
})
}
+ #[allow(dead_code)]
+ // Messages of up to 64KB should never end up more than half full with addresses, as that would
+ // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
+ // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
+ // message...
+ const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (msgs::NetAddress::MAX_LEN as u32 + 1) / 2;
+ #[deny(const_err)]
+ #[allow(dead_code)]
+ // ...by failing to compile if the number of addresses that would be half of a message is
+ // smaller than 500:
+ const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500;
+
+ /// Generates a signed node_announcement from the given arguments and creates a
+ /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have
+ /// seen a channel_announcement from us (ie unless we have public channels open).
+ ///
+ /// RGB is a node "color" and alias is a printable human-readable string to describe this node
+ /// to humans. They carry no in-protocol meaning.
+ ///
+ /// addresses represent the set (possibly empty) of socket addresses on which this node accepts
+ /// incoming connections. These will be broadcast to the network, publicly tying these
+ /// addresses together. If you wish to preserve user privacy, addresses should likely contain
+ /// only Tor Onion addresses.
+ ///
+ /// Panics if addresses is absurdly large (more than 500).
+ pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec<msgs::NetAddress>) {
+ let _ = self.total_consistency_lock.read().unwrap();
+
+ if addresses.len() > 500 {
+ panic!("More than half the message size was taken up by public addresses!");
+ }
+
+ let announcement = msgs::UnsignedNodeAnnouncement {
+ features: NodeFeatures::supported(),
+ timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
+ node_id: self.get_our_node_id(),
+ rgb, alias, addresses,
+ excess_address_data: Vec::new(),
+ excess_data: Vec::new(),
+ };
+ let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]);
+
+ let mut channel_state = self.channel_state.lock().unwrap();
+ channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement {
+ msg: msgs::NodeAnnouncement {
+ signature: self.secp_ctx.sign(&msghash, &self.our_network_key),
+ contents: announcement
+ },
+ });
+ }
+
/// Processes HTLCs which are pending waiting on random forward delay.
///
/// Should only really ever be called in response to a PendingHTLCsForwardable event.
}
self.latest_block_height.store(height as usize, Ordering::Release);
*self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash;
+ loop {
+ // Update last_node_announcement_serial to be the max of its current value and the
+ // block timestamp. This should keep us close to the current time without relying on
+ // having an explicit local time source.
+ // Just in case we end up in a race, we loop until we either successfully update
+ // last_node_announcement_serial or decide we don't need to.
+ let old_serial = self.last_node_announcement_serial.load(Ordering::Acquire);
+ if old_serial >= header.time as usize { break; }
+ if self.last_node_announcement_serial.compare_exchange(old_serial, header.time as usize, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
+ break;
+ }
+ }
}
/// We force-close the channel without letting our counterparty participate in the shutdown
&events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != their_node_id,
&events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
+ &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id,
&events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
peer_state.latest_features.write(writer)?;
}
+ (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
+
Ok(())
}
}
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
}
+ let last_node_announcement_serial: u32 = Readable::read(reader)?;
+
let channel_manager = ChannelManager {
genesis_hash,
fee_estimator: args.fee_estimator,
}),
our_network_key: args.keys_manager.get_node_secret(),
+ last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
+
per_peer_state: RwLock::new(per_peer_state),
pending_events: Mutex::new(Vec::new()),
for ev in events.iter() {
match *ev {
OnchainEvent::HTLCUpdate { ref htlc_update } => {
- writer.write_all(&[1; 1])?;
+ 0u8.write(writer)?;
htlc_update.0.write(writer)?;
htlc_update.1.write(writer)?;
},
}
/// Attempts to claim a remote HTLC-Success/HTLC-Timeout's outputs using the revocation key
- fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32) -> Vec<ClaimRequest> {
- //TODO: send back new outputs to guarantee pending_claim_request consistency
+ fn check_spend_remote_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32) -> (Vec<ClaimRequest>, Option<(Sha256dHash, Vec<TxOut>)>) {
+ let htlc_txid = tx.txid();
if tx.input.len() != 1 || tx.output.len() != 1 || tx.input[0].witness.len() != 5 {
- return Vec::new()
+ return (Vec::new(), None)
}
macro_rules! ignore_error {
( $thing : expr ) => {
match $thing {
Ok(a) => a,
- Err(_) => return Vec::new()
+ Err(_) => return (Vec::new(), None)
}
};
}
- let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return Vec::new(); };
+ let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return (Vec::new(), None); };
let per_commitment_key = ignore_error!(SecretKey::from_slice(&secret));
let per_commitment_point = PublicKey::from_secret_key(&self.secp_ctx, &per_commitment_key);
let (revocation_pubkey, revocation_key) = match self.key_storage {
Storage::Watchtower { .. } => { unimplemented!() }
};
let delayed_key = match self.their_delayed_payment_base_key {
- None => return Vec::new(),
+ None => return (Vec::new(), None),
Some(their_delayed_payment_base_key) => ignore_error!(chan_utils::derive_public_key(&self.secp_ctx, &per_commitment_point, &their_delayed_payment_base_key)),
};
let redeemscript = chan_utils::get_revokeable_redeemscript(&revocation_pubkey, self.our_to_self_delay, &delayed_key);
- let htlc_txid = tx.txid(); //TODO: This is gonna be a performance bottleneck for watchtowers!
log_trace!(self, "Remote HTLC broadcast {}:{}", htlc_txid, 0);
let witness_data = InputMaterial::Revoked { witness_script: redeemscript, pubkey: Some(revocation_pubkey), key: revocation_key, is_htlc: false, amount: tx.output[0].value };
let claimable_outpoints = vec!(ClaimRequest { absolute_timelock: height + self.our_to_self_delay as u32, aggregable: true, outpoint: BitcoinOutPoint { txid: htlc_txid, vout: 0}, witness_data });
- claimable_outpoints
+ (claimable_outpoints, Some((htlc_txid, tx.output.clone())))
}
fn broadcast_by_local_state(&self, local_tx: &LocalSignedTx, delayed_payment_base_key: &SecretKey) -> (Vec<Transaction>, Vec<SpendableOutputDescriptor>, Vec<TxOut>) {
}
} else {
if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) {
- let mut new_outpoints = self.check_spend_remote_htlc(&tx, commitment_number, height);
+ let (mut new_outpoints, new_outputs_option) = self.check_spend_remote_htlc(&tx, commitment_number, height);
claimable_outpoints.append(&mut new_outpoints);
+ if let Some(new_outputs) = new_outputs_option {
+ watch_outputs.push(new_outputs);
+ }
}
}
}
/// Takes the flags that we know how to interpret in an init-context features that are also
/// relevant in a node-context features and creates a node-context features from them.
+ /// Be sure to blank out features that are unknown to us.
pub(crate) fn with_known_relevant_init_flags(init_ctx: &InitFeatures) -> Self {
let mut flags = Vec::new();
- if init_ctx.flags.len() > 0 {
- // Pull out data_loss_protect and upfront_shutdown_script (bits 0, 1, 4, and 5)
- flags.push(init_ctx.flags.last().unwrap() & 0b00110011);
+ for (i, feature_byte)in init_ctx.flags.iter().enumerate() {
+ match i {
+ // Blank out initial_routing_sync (feature bits 2/3), gossip_queries (6/7),
+ // gossip_queries_ex (10/11), option_static_remotekey (12/13), and
+ // payment_secret (14/15)
+ 0 => flags.push(feature_byte & 0b00110011),
+ 1 => flags.push(feature_byte & 0b00000011),
+ _ => (),
+ }
}
Self { flags, mark: PhantomData, }
}
#[cfg(test)]
mod tests {
- use super::{ChannelFeatures, InitFeatures, NodeFeatures};
+ use super::{ChannelFeatures, InitFeatures, NodeFeatures, Features};
#[test]
fn sanity_test_our_features() {
features.clear_require_unknown_bits();
assert!(!features.requires_unknown_bits());
}
+
+ #[test]
+ fn test_node_with_known_relevant_init_flags() {
+ // Create an InitFeatures with initial_routing_sync supported.
+ let mut init_features = InitFeatures::supported();
+ init_features.set_initial_routing_sync();
+
+ // Attempt to pull out non-node-context feature flags from these InitFeatures.
+ let res = NodeFeatures::with_known_relevant_init_flags(&init_features);
+
+ {
+ // Check that the flags are as expected: optional_data_loss_protect,
+ // option_upfront_shutdown_script, and var_onion_optin set.
+ assert_eq!(res.flags[0], 0b00100010);
+ assert_eq!(res.flags[1], 0b00000010);
+ assert_eq!(res.flags.len(), 2);
+ }
+
+ // Check that the initial_routing_sync feature was correctly blanked out.
+ let new_features: InitFeatures = Features::from_le_bytes(res.flags);
+ assert!(!new_features.initial_routing_sync());
+ }
}
pub fn create_announced_chan_between_nodes_with_value<'a, 'b, 'c, 'd>(nodes: &'a Vec<Node<'b, 'c, 'd>>, a: usize, b: usize, channel_value: u64, push_msat: u64, a_flags: InitFeatures, b_flags: InitFeatures) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) {
let chan_announcement = create_chan_between_nodes_with_value(&nodes[a], &nodes[b], channel_value, push_msat, a_flags, b_flags);
+
+ nodes[a].node.broadcast_node_announcement([0, 0, 0], [0; 32], Vec::new());
+ let a_events = nodes[a].node.get_and_clear_pending_msg_events();
+ assert_eq!(a_events.len(), 1);
+ let a_node_announcement = match a_events[0] {
+ MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+ (*msg).clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
+ nodes[b].node.broadcast_node_announcement([1, 1, 1], [1; 32], Vec::new());
+ let b_events = nodes[b].node.get_and_clear_pending_msg_events();
+ assert_eq!(b_events.len(), 1);
+ let b_node_announcement = match b_events[0] {
+ MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+ (*msg).clone()
+ },
+ _ => panic!("Unexpected event"),
+ };
+
for node in nodes {
assert!(node.router.handle_channel_announcement(&chan_announcement.0).unwrap());
node.router.handle_channel_update(&chan_announcement.1).unwrap();
node.router.handle_channel_update(&chan_announcement.2).unwrap();
+ node.router.handle_node_announcement(&a_node_announcement).unwrap();
+ node.router.handle_node_announcement(&b_node_announcement).unwrap();
}
(chan_announcement.1, chan_announcement.2, chan_announcement.3, chan_announcement.4)
}
let mut default_config = UserConfig::default();
default_config.channel_options.announced_channel = true;
default_config.peer_channel_config_limits.force_announced_channel_preference = false;
+ default_config.own_channel_config.our_htlc_minimum_msat = 1000; // sanitization being done by the sender, to exerce receiver logic we need to lift of limit
let node = ChannelManager::new(Network::Testnet, cfgs[i].fee_estimator, &cfgs[i].chan_monitor, cfgs[i].tx_broadcaster, cfgs[i].logger.clone(), &cfgs[i].keys_manager, if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, 0).unwrap();
chanmgrs.push(node);
}
/// HTLC transaction.
pub fn test_revoked_htlc_claim_txn_broadcast<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, revoked_tx: Transaction, commitment_revoked_tx: Transaction) {
let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
- // We should issue a 2nd transaction if one htlc is dropped from initial claiming tx
- // but sometimes not as feerate is too-low
- if node_txn.len() != 1 && node_txn.len() != 2 { assert!(false); }
+ // We may issue multiple claiming transaction on revoked outputs due to block rescan
+ // for revoked htlc outputs
+ if node_txn.len() != 1 && node_txn.len() != 2 && node_txn.len() != 3 { assert!(false); }
node_txn.retain(|tx| {
if tx.input.len() == 1 && tx.input[0].previous_output.txid == revoked_tx.txid() {
check_spends!(tx, revoked_tx);
test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+ // Verify broadcast of revoked HTLC-timeout
let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+ // Broadcast revoked HTLC-timeout on node 1
nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
test_revoked_htlc_claim_txn_broadcast(&nodes[1], node_txn[1].clone(), revoked_local_txn[0].clone());
}
check_closed_broadcast!(nodes[1], false);
let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
- assert_eq!(node_txn.len(), 4 ); // ChannelMonitor: justice tx on revoked commitment, justice tx on revoked HTLC-timeout, adjusted justice tx, ChannelManager: local commitment tx
+ assert_eq!(node_txn.len(), 4); // ChannelMonitor: justice tx on revoked commitment, justice tx on revoked HTLC-timeout, adjusted justice tx, ChannelManager: local commitment tx
+ assert_eq!(node_txn[0].input.len(), 2);
+ check_spends!(node_txn[0], revoked_local_txn[0]);
+ check_spends!(node_txn[1], chan_1.3);
assert_eq!(node_txn[2].input.len(), 1);
check_spends!(node_txn[2], revoked_htlc_txn[0]);
+ assert_eq!(node_txn[3].input.len(), 1);
+ check_spends!(node_txn[3], revoked_local_txn[0]);
// Check B's ChannelMonitor was able to generate the right spendable output descriptor
let spend_txn = check_spendable_outputs!(nodes[1], 1);
// Check A's ChannelMonitor was able to generate the right spendable output descriptor
let spend_txn = check_spendable_outputs!(nodes[0], 1);
- assert_eq!(spend_txn.len(), 4);
+ assert_eq!(spend_txn.len(), 5); // Duplicated SpendableOutput due to block rescan after revoked htlc output tracking
assert_eq!(spend_txn[0], spend_txn[2]);
check_spends!(spend_txn[0], revoked_local_txn[0]); // spending to_remote output from revoked local tx
check_spends!(spend_txn[1], node_txn[0]); // spending justice tx output from revoked local tx htlc received output
#[test]
fn test_update_add_htlc_bolt2_sender_value_below_minimum_msat() {
- //BOLT2 Requirement: MUST offer amount_msat greater than 0.
//BOLT2 Requirement: MUST NOT offer amount_msat below the receiving node's htlc_minimum_msat (same validation check catches both of these)
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let mut route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
- route.hops[0].fee_msat = 0;
+ route.hops[0].fee_msat = 100;
let err = nodes[0].node.send_payment(route, our_payment_hash);
nodes[0].logger.assert_log("lightning::ln::channelmanager".to_string(), "Cannot send less than their minimum HTLC value".to_string(), 1);
}
+#[test]
+fn test_update_add_htlc_bolt2_sender_zero_value_msat() {
+ //BOLT2 Requirement: MUST offer amount_msat greater than 0.
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+ let _chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::supported(), InitFeatures::supported());
+ let mut route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
+ let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+
+ route.hops[0].fee_msat = 0;
+
+ let err = nodes[0].node.send_payment(route, our_payment_hash);
+
+ if let Err(APIError::ChannelUnavailable{err}) = err {
+ assert_eq!(err, "Cannot send 0-msat HTLC");
+ } else {
+ assert!(false);
+ }
+ assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+ nodes[0].logger.assert_log("lightning::ln::channelmanager".to_string(), "Cannot send 0-msat HTLC".to_string(), 1);
+}
+
+#[test]
+fn test_update_add_htlc_bolt2_receiver_zero_value_msat() {
+ //BOLT2 Requirement: MUST offer amount_msat greater than 0.
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+ let _chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::supported(), InitFeatures::supported());
+ let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &[], 100000, TEST_FINAL_CLTV).unwrap();
+ let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+
+ nodes[0].node.send_payment(route, our_payment_hash).unwrap();
+ check_added_monitors!(nodes[0], 1);
+ let mut updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id());
+ updates.update_add_htlcs[0].amount_msat = 0;
+
+ nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
+ nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Remote side tried to send a 0-msat HTLC".to_string(), 1);
+ check_closed_broadcast!(nodes[1], true).unwrap();
+}
+
#[test]
fn test_update_add_htlc_bolt2_sender_cltv_expiry_too_high() {
//BOLT 2 Requirement: MUST set cltv_expiry less than 500000000.
assert_eq!(node_txn[0].input.len(), 2);
check_spends!(node_txn[0], revoked_htlc_txn[0], revoked_htlc_txn[1]);
- //// Verify bumped tx is different and 25% bump heuristic
+ // Verify bumped tx is different and 25% bump heuristic
assert_ne!(first, node_txn[0].txid());
let fee_2 = revoked_htlc_txn[0].output[0].value + revoked_htlc_txn[1].output[0].value - node_txn[0].output[0].value;
let feerate_2 = fee_2 * 1000 / node_txn[0].get_weight() as u64;
connect_blocks(&nodes[0].block_notifier, 20, 145, true, header_145.bitcoin_hash());
{
let mut node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
- assert_eq!(node_txn.len(), 1); //TODO: fix check_spend_remote_htlc lack of watch output
+ // We verify than no new transaction has been broadcast because previously
+ // we were buggy on this exact behavior by not tracking for monitoring remote HTLC outputs (see #411)
+ // which means we wouldn't see a spend of them by a justice tx and bumped justice tx
+ // were generated forever instead of safe cleaning after confirmation and ANTI_REORG_SAFE_DELAY blocks.
+ // Enforce spending of revoked htlc output by claiming transaction remove request as expected and dry
+ // up bumped justice generation.
+ assert_eq!(node_txn.len(), 0);
node_txn.clear();
}
check_closed_broadcast!(nodes[0], false);
assert_eq!(res.channel_flags, 0);
assert_eq!(res.to_self_delay, 200);
}
+
+#[test]
+fn test_override_0msat_htlc_minimum() {
+ let mut zero_config = UserConfig::default();
+ zero_config.own_channel_config.our_htlc_minimum_msat = 0;
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, Some(zero_config.clone())]);
+ let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+ nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 16_000_000, 12_000_000, 42, Some(zero_config)).unwrap();
+ let res = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+ assert_eq!(res.htlc_minimum_msat, 1);
+
+ nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), InitFeatures::supported(), &res);
+ let res = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+ assert_eq!(res.htlc_minimum_msat, 1);
+}
InvalidValue,
/// Buffer too short
ShortRead,
- /// node_announcement included more than one address of a given type!
- ExtraAddressesPerType,
/// A length descriptor in the packet didn't describe the later data correctly
BadLengthDescriptor,
/// Error from std::io
&NetAddress::OnionV3 { .. } => { 37 },
}
}
+
+ /// The maximum length of any address descriptor, not including the 1-byte type
+ pub(crate) const MAX_LEN: u16 = 37;
}
impl Writeable for NetAddress {
fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
/// Gets a subset of the channel announcements and updates required to dump our routing table
/// to a remote node, starting at the short_channel_id indicated by starting_point and
- /// including batch_amount entries.
+ /// including the batch_amount entries immediately higher in numerical value than starting_point.
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)>;
/// Gets a subset of the node announcements required to dump our routing table to a remote node,
- /// starting at the node *after* the provided publickey and including batch_amount entries.
+ /// starting at the node *after* the provided publickey and including batch_amount entries
+ /// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
/// Returns whether a full sync should be requested from a peer.
DecodeError::UnknownRequiredFeature => "Unknown required feature preventing decode",
DecodeError::InvalidValue => "Nonsense bytes didn't map to the type they were interpreted as",
DecodeError::ShortRead => "Packet extended beyond the provided bytes",
- DecodeError::ExtraAddressesPerType => "More than one address of a single type",
DecodeError::BadLengthDescriptor => "A length descriptor in the packet didn't describe the later data correctly",
DecodeError::Io(ref e) => e.description(),
}
self.alias.write(w)?;
let mut addrs_to_encode = self.addresses.clone();
- addrs_to_encode.sort_unstable_by(|a, b| { a.get_id().cmp(&b.get_id()) });
- addrs_to_encode.dedup_by(|a, b| { a.get_id() == b.get_id() });
+ addrs_to_encode.sort_by(|a, b| { a.get_id().cmp(&b.get_id()) });
let mut addr_len = 0;
for addr in &addrs_to_encode {
addr_len += 1 + addr.len();
let alias: [u8; 32] = Readable::read(r)?;
let addr_len: u16 = Readable::read(r)?;
- let mut addresses: Vec<NetAddress> = Vec::with_capacity(4);
+ let mut addresses: Vec<NetAddress> = Vec::new();
+ let mut highest_addr_type = 0;
let mut addr_readpos = 0;
let mut excess = false;
let mut excess_byte = 0;
if addr_len <= addr_readpos { break; }
match Readable::read(r) {
Ok(Ok(addr)) => {
- match addr {
- NetAddress::IPv4 { .. } => {
- if addresses.len() > 0 {
- return Err(DecodeError::ExtraAddressesPerType);
- }
- },
- NetAddress::IPv6 { .. } => {
- if addresses.len() > 1 || (addresses.len() == 1 && addresses[0].get_id() != 1) {
- return Err(DecodeError::ExtraAddressesPerType);
- }
- },
- NetAddress::OnionV2 { .. } => {
- if addresses.len() > 2 || (addresses.len() > 0 && addresses.last().unwrap().get_id() > 2) {
- return Err(DecodeError::ExtraAddressesPerType);
- }
- },
- NetAddress::OnionV3 { .. } => {
- if addresses.len() > 3 || (addresses.len() > 0 && addresses.last().unwrap().get_id() > 3) {
- return Err(DecodeError::ExtraAddressesPerType);
- }
- },
+ if addr.get_id() < highest_addr_type {
+ // Addresses must be sorted in increasing order
+ return Err(DecodeError::InvalidValue);
}
+ highest_addr_type = addr.get_id();
if addr_len < addr_readpos + 1 + addr.len() {
return Err(DecodeError::BadLengthDescriptor);
}
impl_writeable_len_match!(NodeAnnouncement, {
{ NodeAnnouncement { contents: UnsignedNodeAnnouncement { ref features, ref addresses, ref excess_address_data, ref excess_data, ..}, .. },
- 64 + 76 + features.byte_count() + addresses.len()*38 + excess_address_data.len() + excess_data.len() }
+ 64 + 76 + features.byte_count() + addresses.len()*(NetAddress::MAX_LEN as usize + 1) + excess_address_data.len() + excess_data.len() }
}, {
signature,
contents
where B::Target: BroadcasterInterface,
F::Target: FeeEstimator
{
+ log_trace!(self, "Block at height {} connected with {} claim requests", height, claimable_outpoints.len());
let mut new_claims = Vec::new();
let mut aggregated_claim = HashMap::new();
let mut aggregated_soonest = ::std::u32::MAX;
if set_equality {
clean_claim_request_after_safety_delay!();
} else { // If false, generate new claim request with update outpoint set
+ let mut at_least_one_drop = false;
for input in tx.input.iter() {
if let Some(input_material) = claim_material.per_input_material.remove(&input.previous_output) {
claimed_outputs_material.push((input.previous_output, input_material));
+ at_least_one_drop = true;
}
// If there are no outpoints left to claim in this request, drop it entirely after ANTI_REORG_DELAY.
if claim_material.per_input_material.is_empty() {
}
}
//TODO: recompute soonest_timelock to avoid wasting a bit on fees
- bump_candidates.insert(first_claim_txid_height.0.clone());
+ if at_least_one_drop {
+ bump_candidates.insert(first_claim_txid_height.0.clone());
+ }
}
break; //No need to iterate further, either tx is our or their
} else {
}
// Build, bump and rebroadcast tx accordingly
+ log_trace!(self, "Bumping {} candidates", bump_candidates.len());
for first_claim_txid in bump_candidates.iter() {
if let Some((new_timer, new_feerate)) = {
if let Some(claim_material) = self.pending_claim_requests.get(first_claim_txid) {
/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
- fn should_forward_channel(&self, channel_id: u64)->bool{
+ fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
match self.sync_status {
InitSyncTracker::NoSyncRequested => true,
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
InitSyncTracker::NodesSyncing(_) => true,
}
}
+
+ /// Similar to the above, but for node announcements indexed by node_id.
+ fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
+ match self.sync_status {
+ InitSyncTracker::NoSyncRequested => true,
+ InitSyncTracker::ChannelsSyncing(_) => false,
+ InitSyncTracker::NodesSyncing(pk) => pk < node_id,
+ }
+ }
}
struct PeerHolder<Descriptor: SocketDescriptor> {
/// on this file descriptor has resume_read set (preventing DoS issues in the send buffer).
///
/// Panics if the descriptor was not previously registered in a new_*_connection event.
- pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+ pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
match self.do_read_event(peer_descriptor, data) {
Ok(res) => Ok(res),
Err(e) => {
}
}
- fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
+ fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
let pause_read = {
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
log_debug!(self, "Deserialization failed due to shortness of message");
return Err(PeerHandleError { no_connection_possible: false });
}
- msgs::DecodeError::ExtraAddressesPerType => {
- log_debug!(self, "Error decoding message, ignoring due to lnd spec incompatibility. See https://github.com/lightningnetwork/lnd/issues/1407");
- continue;
- }
msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
}
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
- !peer.should_forward_channel(msg.contents.short_channel_id) {
+ !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
}
}
},
+ MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+ log_trace!(self, "Handling BroadcastNodeAnnouncement event in peer_handler");
+ if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
+ let encoded_msg = encode_msg!(msg);
+
+ for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+ !peer.should_forward_node_announcement(msg.contents.node_id) {
+ continue
+ }
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
+ }
+ }
+ },
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
- !peer.should_forward_channel(msg.contents.short_channel_id) {
+ !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
peer_a.new_inbound_connection(fd_a.clone()).unwrap();
- assert_eq!(peer_a.read_event(&mut fd_a, initial_data).unwrap(), false);
- assert_eq!(peer_b.read_event(&mut fd_b, fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- assert_eq!(peer_a.read_event(&mut fd_a, fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+ assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
}
#[test]
lowest_inbound_channel_fee_proportional_millionths: u32,
features: NodeFeatures,
- last_update: u32,
+ /// Unlike for channels, we may have a NodeInfo entry before having received a node_update.
+ /// Thus, we have to be able to capture "no update has been received", which we do with an
+ /// Option here.
+ last_update: Option<u32>,
rgb: [u8; 3],
alias: [u8; 32],
addresses: Vec<NetAddress>,
impl std::fmt::Display for NodeInfo {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
- write!(f, "features: {}, last_update: {}, lowest_inbound_channel_fee_base_msat: {}, lowest_inbound_channel_fee_proportional_millionths: {}, channels: {:?}", log_bytes!(self.features.encode()), self.last_update, self.lowest_inbound_channel_fee_base_msat, self.lowest_inbound_channel_fee_proportional_millionths, &self.channels[..])?;
+ write!(f, "features: {}, last_update: {:?}, lowest_inbound_channel_fee_base_msat: {}, lowest_inbound_channel_fee_proportional_millionths: {}, channels: {:?}", log_bytes!(self.features.encode()), self.last_update, self.lowest_inbound_channel_fee_base_msat, self.lowest_inbound_channel_fee_proportional_millionths, &self.channels[..])?;
Ok(())
}
}
match network.nodes.get_mut(&msg.contents.node_id) {
None => Err(LightningError{err: "No existing channels for node_announcement", action: ErrorAction::IgnoreError}),
Some(node) => {
- if node.last_update >= msg.contents.timestamp {
- return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
+ match node.last_update {
+ Some(last_update) => if last_update >= msg.contents.timestamp {
+ return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
+ },
+ None => {},
}
node.features = msg.contents.features.clone();
- node.last_update = msg.contents.timestamp;
+ node.last_update = Some(msg.contents.timestamp);
node.rgb = msg.contents.rgb;
node.alias = msg.contents.alias;
node.addresses = msg.contents.addresses.clone();
lowest_inbound_channel_fee_base_msat: u32::max_value(),
lowest_inbound_channel_fee_proportional_millionths: u32::max_value(),
features: NodeFeatures::empty(),
- last_update: 0,
+ last_update: None,
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: u32::max_value(),
lowest_inbound_channel_fee_proportional_millionths: u32::max_value(),
features: NodeFeatures::empty(),
- last_update: 0,
+ last_update: None,
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 100,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(1)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(2)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(8)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(3)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(4)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(5)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
lowest_inbound_channel_fee_base_msat: 0,
lowest_inbound_channel_fee_proportional_millionths: 0,
features: NodeFeatures::from_le_bytes(id_to_feature_flags!(6)),
- last_update: 1,
+ last_update: Some(1),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
let message = Message::Unknown(MessageType(43));
assert!(!message.type_id().is_even());
}
+
+ #[test]
+ fn read_lnd_init_msg() {
+ // Taken from lnd v0.9.0-beta.
+ let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 162, 161];
+ check_init_msg(buffer);
+ }
+
+ #[test]
+ fn read_clightning_init_msg() {
+ // Taken from c-lightning v0.8.0.
+ let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 170, 162, 1, 32, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15];
+ check_init_msg(buffer);
+ }
+
+ fn check_init_msg(buffer: Vec<u8>) {
+ let mut reader = ::std::io::Cursor::new(buffer);
+ let decoded_msg = read(&mut reader).unwrap();
+ match decoded_msg {
+ Message::Init(msgs::Init { features }) => {
+ assert!(features.supports_variable_length_onion());
+ assert!(features.supports_upfront_shutdown_script());
+ assert!(features.supports_unknown_bits());
+ assert!(!features.requires_unknown_bits());
+ assert!(!features.initial_routing_sync());
+ },
+ _ => panic!("Expected init message, found message type: {}", decoded_msg.type_id())
+ }
+ }
+
+ #[test]
+ fn read_lnd_node_announcement() {
+ // Taken from lnd v0.9.0-beta.
+ let buffer = vec![1, 1, 91, 164, 146, 213, 213, 165, 21, 227, 102, 33, 105, 179, 214, 21, 221, 175, 228, 93, 57, 177, 191, 127, 107, 229, 31, 50, 21, 81, 179, 71, 39, 18, 35, 2, 89, 224, 110, 123, 66, 39, 148, 246, 177, 85, 12, 19, 70, 226, 173, 132, 156, 26, 122, 146, 71, 213, 247, 48, 93, 190, 185, 177, 12, 172, 0, 3, 2, 162, 161, 94, 103, 195, 37, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 51, 153, 255, 97, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 1, 172, 21, 0, 2, 38, 7];
+ let mut reader = ::std::io::Cursor::new(buffer);
+ let decoded_msg = read(&mut reader).unwrap();
+ match decoded_msg {
+ Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => {
+ assert!(features.supports_variable_length_onion());
+ assert!(features.supports_upfront_shutdown_script());
+ assert!(features.supports_unknown_bits());
+ assert!(!features.requires_unknown_bits());
+ },
+ _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
+ }
+ }
+
+ #[test]
+ fn read_lnd_chan_announcement() {
+ // Taken from lnd v0.9.0-beta.
+ let buffer = vec![1, 0, 82, 238, 153, 33, 128, 87, 215, 2, 28, 241, 140, 250, 98, 255, 56, 5, 79, 240, 214, 231, 172, 35, 240, 171, 44, 9, 78, 91, 8, 193, 102, 5, 17, 178, 142, 106, 180, 183, 46, 38, 217, 212, 25, 236, 69, 47, 92, 217, 181, 221, 161, 205, 121, 201, 99, 38, 158, 216, 186, 193, 230, 86, 222, 6, 206, 67, 22, 255, 137, 212, 141, 161, 62, 134, 76, 48, 241, 54, 50, 167, 187, 247, 73, 27, 74, 1, 129, 185, 197, 153, 38, 90, 255, 138, 39, 161, 102, 172, 213, 74, 107, 88, 150, 90, 0, 49, 104, 7, 182, 184, 194, 219, 181, 172, 8, 245, 65, 226, 19, 228, 101, 145, 25, 159, 52, 31, 58, 93, 53, 59, 218, 91, 37, 84, 103, 17, 74, 133, 33, 35, 2, 203, 101, 73, 19, 94, 175, 122, 46, 224, 47, 168, 128, 128, 25, 26, 25, 214, 52, 247, 43, 241, 117, 52, 206, 94, 135, 156, 52, 164, 143, 234, 58, 185, 50, 185, 140, 198, 174, 71, 65, 18, 105, 70, 131, 172, 137, 0, 164, 51, 215, 143, 117, 119, 217, 241, 197, 177, 227, 227, 170, 199, 114, 7, 218, 12, 107, 30, 191, 236, 203, 21, 61, 242, 48, 192, 90, 233, 200, 199, 111, 162, 68, 234, 54, 219, 1, 233, 66, 5, 82, 74, 84, 211, 95, 199, 245, 202, 89, 223, 102, 124, 62, 166, 253, 253, 90, 180, 118, 21, 61, 110, 37, 5, 96, 167, 0, 0, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15, 0, 2, 65, 0, 0, 1, 0, 0, 2, 37, 242, 97, 140, 2, 111, 69, 85, 39, 118, 30, 221, 99, 254, 120, 49, 103, 22, 170, 227, 111, 172, 164, 160, 49, 68, 138, 116, 16, 22, 206, 107, 3, 54, 61, 144, 88, 171, 247, 136, 208, 99, 9, 135, 37, 201, 178, 253, 136, 0, 185, 235, 68, 160, 106, 110, 12, 46, 21, 125, 204, 18, 75, 234, 16, 3, 42, 171, 28, 52, 224, 11, 30, 30, 253, 156, 148, 175, 203, 121, 250, 111, 122, 195, 84, 122, 77, 183, 56, 135, 101, 88, 41, 60, 191, 99, 232, 85, 2, 36, 17, 156, 11, 8, 12, 189, 177, 68, 88, 28, 15, 207, 21, 179, 151, 56, 226, 158, 148, 3, 120, 113, 177, 243, 184, 17, 173, 37, 46, 222, 16];
+ let mut reader = ::std::io::Cursor::new(buffer);
+ let decoded_msg = read(&mut reader).unwrap();
+ match decoded_msg {
+ Message::ChannelAnnouncement(msgs::ChannelAnnouncement { contents: msgs::UnsignedChannelAnnouncement { features, ..}, ..}) => {
+ assert!(!features.requires_unknown_bits());
+ },
+ _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
+ }
+ }
}
/// Default value: BREAKDOWN_TIMEOUT (currently 144), we enforce it as a minimum at channel
/// opening so you can tweak config to ask for more security, not less.
pub our_to_self_delay: u16,
+ /// Set to the smallest value HTLC we will accept to process.
+ ///
+ /// This value is sent to our counterparty on channel-open and we close the channel any time
+ /// our counterparty misbehaves by sending us an HTLC with a value smaller than this.
+ ///
+ /// Default value: 1. If the value is less than 1, it is ignored and set to 1, as is required
+ /// by the protocol.
+ pub our_htlc_minimum_msat: u64,
}
impl Default for ChannelHandshakeConfig {
ChannelHandshakeConfig {
minimum_depth: 6,
our_to_self_delay: BREAKDOWN_TIMEOUT,
+ our_htlc_minimum_msat: 1,
}
}
}
},
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
+ ///
+ /// Note that after doing so, you very likely (unless you did so very recently) want to call
+ /// ChannelManager::broadcast_node_announcement to trigger a BroadcastNodeAnnouncement event.
+ /// This ensures that any nodes which see our channel_announcement also have a relevant
+ /// node_announcement, including relevant feature flags which may be important for routing
+ /// through or to us.
BroadcastChannelAnnouncement {
/// The channel_announcement which should be sent.
msg: msgs::ChannelAnnouncement,
/// The followup channel_update which should be sent.
update_msg: msgs::ChannelUpdate,
},
+ /// Used to indicate that a node_announcement should be broadcast to all peers.
+ BroadcastNodeAnnouncement {
+ /// The node_announcement which should be sent.
+ msg: msgs::NodeAnnouncement,
+ },
/// Used to indicate that a channel_update should be broadcast to all peers.
BroadcastChannelUpdate {
/// The channel_update which should be sent.