From 5dea30faee9ff4e83eea13d025ae023c5b400426 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 1 Jun 2019 07:02:31 -0400 Subject: [PATCH] Add subcrate that implements network socket handling with Tokio This is still pretty raw and ist mostly just a straight move from rust-lightning-bitcoinrpc, but first step is to get it there, second is to actually make it actually safe to use. --- .travis.yml | 1 + net-tokio/Cargo.toml | 19 +++ net-tokio/src/lib.rs | 270 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 290 insertions(+) create mode 100644 net-tokio/Cargo.toml create mode 100644 net-tokio/src/lib.rs diff --git a/.travis.yml b/.travis.yml index efdff130b..38ddd94d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,3 +14,4 @@ script: - cargo build --verbose - cargo test --verbose - if [ "$(rustup show | grep default | grep 1.29.2)" != "" ]; then cd fuzz && cargo test --verbose && ./travis-fuzz.sh; fi + - if [ "$(rustup show | grep default | grep stable)" != "" ]; then cd net-tokio && cargo build --verbose; fi diff --git a/net-tokio/Cargo.toml b/net-tokio/Cargo.toml new file mode 100644 index 000000000..a6fc047ad --- /dev/null +++ b/net-tokio/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "lightning-net-tokio" +version = "0.0.1" +authors = ["Matt Corallo"] +license = "Apache-2.0" +description = """ +Implementation of the rust-lightning network stack using Tokio. +For Rust-Lightning clients which wish to make direct connections to Lightning P2P nodes, this is a simple alternative to implementing the nerequired network stack, especially for those already using Tokio. +""" + +[dependencies] +bitcoin = "0.18" +bitcoin_hashes = "0.3" +lightning = { version = "0.0.8", path = "../" } +secp256k1 = "0.12" +tokio-codec = "0.1" +futures = "0.1" +tokio = "0.1" +bytes = "0.4" diff --git a/net-tokio/src/lib.rs b/net-tokio/src/lib.rs new file mode 100644 index 000000000..54752fdb3 --- /dev/null +++ b/net-tokio/src/lib.rs @@ -0,0 +1,270 @@ +extern crate bytes; +extern crate tokio; +extern crate tokio_codec; +extern crate futures; +extern crate lightning; +extern crate secp256k1; + +use bytes::BufMut; + +use futures::future; +use futures::future::Future; +use futures::{AsyncSink, Stream, Sink}; +use futures::sync::mpsc; + +use secp256k1::key::PublicKey; + +use tokio::timer::Delay; +use tokio::net::TcpStream; + +use lightning::ln::peer_handler; +use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; + +use std::mem; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; +use std::vec::Vec; +use std::hash::Hash; + +static ID_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// A connection to a remote peer. Can be constructed either as a remote connection using +/// Connection::setup_outbound o +pub struct Connection { + writer: Option>, + event_notify: mpsc::Sender<()>, + pending_read: Vec, + read_blocker: Option>>, + read_paused: bool, + need_disconnect: bool, + id: u64, +} +impl Connection { + fn schedule_read(peer_manager: Arc>, us: Arc>, reader: futures::stream::SplitStream>) { + let us_ref = us.clone(); + let us_close_ref = us.clone(); + let peer_manager_ref = peer_manager.clone(); + tokio::spawn(reader.for_each(move |b| { + let pending_read = b.to_vec(); + { + let mut lock = us_ref.lock().unwrap(); + assert!(lock.pending_read.is_empty()); + if lock.read_paused { + lock.pending_read = pending_read; + let (sender, blocker) = futures::sync::oneshot::channel(); + lock.read_blocker = Some(sender); + return future::Either::A(blocker.then(|_| { Ok(()) })); + } + } + //TODO: There's a race where we don't meet the requirements of disconnect_socket if its + //called right here, after we release the us_ref lock in the scope above, but before we + //call read_event! + match peer_manager.read_event(&mut SocketDescriptor::new(us_ref.clone(), peer_manager.clone()), pending_read) { + Ok(pause_read) => { + if pause_read { + let mut lock = us_ref.lock().unwrap(); + lock.read_paused = true; + } + }, + Err(e) => { + us_ref.lock().unwrap().need_disconnect = false; + return future::Either::B(future::result(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))); + } + } + + if let Err(e) = us_ref.lock().unwrap().event_notify.try_send(()) { + // Ignore full errors as we just need them to poll after this point, so if the user + // hasn't received the last send yet, it doesn't matter. + assert!(e.is_full()); + } + + future::Either::B(future::result(Ok(()))) + }).then(move |_| { + if us_close_ref.lock().unwrap().need_disconnect { + peer_manager_ref.disconnect_event(&SocketDescriptor::new(us_close_ref, peer_manager_ref.clone())); + println!("Peer disconnected!"); + } else { + println!("We disconnected peer!"); + } + Ok(()) + })); + } + + fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (futures::stream::SplitStream>, Arc>) { + let (writer, reader) = tokio_codec::Framed::new(stream, tokio_codec::BytesCodec::new()).split(); + let (send_sink, send_stream) = mpsc::channel(3); + tokio::spawn(writer.send_all(send_stream.map_err(|_| -> std::io::Error { + unreachable!(); + })).then(|_| { + future::result(Ok(())) + })); + let us = Arc::new(Mutex::new(Self { writer: Some(send_sink), event_notify, pending_read: Vec::new(), read_blocker: None, read_paused: false, need_disconnect: true, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) })); + + (reader, us) + } + + /// Process incoming messages and feed outgoing messages on the provided socket generated by + /// accepting an incoming connection (by scheduling futures with tokio::spawn). + /// + /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on + /// ChannelManager and ChannelMonitor objects. + pub fn setup_inbound(peer_manager: Arc>, event_notify: mpsc::Sender<()>, stream: TcpStream) { + let (reader, us) = Self::new(event_notify, stream); + + if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone(), peer_manager.clone())) { + Self::schedule_read(peer_manager, us, reader); + } + } + + /// Process incoming messages and feed outgoing messages on the provided socket generated by + /// making an outbound connection which is expected to be accepted by a peer with the given + /// public key (by scheduling futures with tokio::spawn). + /// + /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on + /// ChannelManager and ChannelMonitor objects. + pub fn setup_outbound(peer_manager: Arc>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) { + let (reader, us) = Self::new(event_notify, stream); + + if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone(), peer_manager.clone())) { + if SocketDescriptor::new(us.clone(), peer_manager.clone()).send_data(&initial_send, 0, true) == initial_send.len() { + Self::schedule_read(peer_manager, us, reader); + } else { + println!("Failed to write first full message to socket!"); + } + } + } + + /// Process incoming messages and feed outgoing messages on a new connection made to the given + /// socket address which is expected to be accepted by a peer with the given public key (by + /// scheduling futures with tokio::spawn). + /// + /// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on + /// ChannelManager and ChannelMonitor objects. + pub fn connect_outbound(peer_manager: Arc>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) { + let connect_timeout = Delay::new(Instant::now() + Duration::from_secs(10)).then(|_| { + future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached")) + }); + tokio::spawn(TcpStream::connect(&addr).select(connect_timeout) + .and_then(move |stream| { + Connection::setup_outbound(peer_manager, event_notify, their_node_id, stream.0); + future::ok(()) + }).or_else(|_| { + //TODO: return errors somehow + future::ok(()) + })); + } +} + +#[derive(Clone)] +pub struct SocketDescriptor { + conn: Arc>, + id: u64, + peer_manager: Arc>, +} +impl SocketDescriptor { + fn new(conn: Arc>, peer_manager: Arc>) -> Self { + let id = conn.lock().unwrap().id; + Self { conn, id, peer_manager } + } +} +impl peer_handler::SocketDescriptor for SocketDescriptor { + fn send_data(&mut self, data: &Vec, write_offset: usize, resume_read: bool) -> usize { + macro_rules! schedule_read { + ($us_ref: expr) => { + tokio::spawn(future::lazy(move || -> Result<(), ()> { + let mut read_data = Vec::new(); + { + let mut us = $us_ref.conn.lock().unwrap(); + mem::swap(&mut read_data, &mut us.pending_read); + } + if !read_data.is_empty() { + let mut us_clone = $us_ref.clone(); + match $us_ref.peer_manager.read_event(&mut us_clone, read_data) { + Ok(pause_read) => { + if pause_read { return Ok(()); } + }, + Err(_) => { + //TODO: Not actually sure how to do this + return Ok(()); + } + } + } + let mut us = $us_ref.conn.lock().unwrap(); + if let Some(sender) = us.read_blocker.take() { + sender.send(Ok(())).unwrap(); + } + us.read_paused = false; + if let Err(e) = us.event_notify.try_send(()) { + // Ignore full errors as we just need them to poll after this point, so if the user + // hasn't received the last send yet, it doesn't matter. + assert!(e.is_full()); + } + Ok(()) + })); + } + } + + let mut us = self.conn.lock().unwrap(); + if resume_read { + let us_ref = self.clone(); + schedule_read!(us_ref); + } + if data.len() == write_offset { return 0; } + if us.writer.is_none() { + us.read_paused = true; + return 0; + } + + let mut bytes = bytes::BytesMut::with_capacity(data.len() - write_offset); + bytes.put(&data[write_offset..]); + let write_res = us.writer.as_mut().unwrap().start_send(bytes.freeze()); + match write_res { + Ok(res) => { + match res { + AsyncSink::Ready => { + data.len() - write_offset + }, + AsyncSink::NotReady(_) => { + us.read_paused = true; + let us_ref = self.clone(); + tokio::spawn(us.writer.take().unwrap().flush().then(move |writer_res| -> Result<(), ()> { + if let Ok(writer) = writer_res { + { + let mut us = us_ref.conn.lock().unwrap(); + us.writer = Some(writer); + } + schedule_read!(us_ref); + } // we'll fire the disconnect event on the socket reader end + Ok(()) + })); + 0 + } + } + }, + Err(_) => { + // We'll fire the disconnected event on the socket reader end + 0 + }, + } + } + + fn disconnect_socket(&mut self) { + let mut us = self.conn.lock().unwrap(); + us.need_disconnect = true; + us.read_paused = true; + } +} +impl Eq for SocketDescriptor {} +impl PartialEq for SocketDescriptor { + fn eq(&self, o: &Self) -> bool { + self.id == o.id + } +} +impl Hash for SocketDescriptor { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + -- 2.39.5