X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=62c036b9796c96f2f812e8eb162835ca9ffd57ba;hb=607cd2e81b6ed6e561673bebb6804068eabb926b;hp=5f5fece0d2ed34e447e2905f9f3bd6d3f42ce102;hpb=073afbb24471a2e1bd50b88dc7a5e4d7cd4e0dd6;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5f5fece0..62c036b9 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -34,7 +34,7 @@ //! type Logger = dyn lightning::util::logger::Logger + Send + Sync; //! type ChainAccess = dyn lightning::chain::Access + Send + Sync; //! type ChainFilter = dyn lightning::chain::Filter + Send + Sync; -//! type DataPersister = dyn lightning::chain::channelmonitor::Persist + Send + Sync; +//! type DataPersister = dyn lightning::chain::chainmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; //! type PeerManager = Arc>; @@ -43,13 +43,12 @@ //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { //! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; //! loop { -//! channel_manager.await_persistable_update(); -//! channel_manager.process_pending_events(&|event| { -//! // Handle the event! -//! }); -//! chain_monitor.process_pending_events(&|event| { +//! let event_handler = |event: &Event| { //! // Handle the event! -//! }); +//! }; +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&event_handler); +//! chain_monitor.process_pending_events(&event_handler); //! } //! } //! @@ -57,13 +56,12 @@ //! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, socket: TcpStream) { //! lightning_net_tokio::setup_inbound(peer_manager, socket); //! loop { -//! channel_manager.await_persistable_update(); -//! channel_manager.process_pending_events(&|event| { +//! let event_handler = |event: &Event| { //! // Handle the event! -//! }); -//! chain_monitor.process_pending_events(&|event| { -//! // Handle the event! -//! }); +//! }; +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&event_handler); +//! chain_monitor.process_pending_events(&event_handler); //! } //! } //! ``` @@ -80,6 +78,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +118,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: CustomMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -141,30 +141,23 @@ impl Connection { PeerDisconnected } let disconnect_type = loop { - macro_rules! shutdown_socket { - ($err: expr, $need_disconnect: expr) => { { - println!("Disconnecting peer due to {}!", $err); - break $need_disconnect; - } } - } - let read_paused = { let us_lock = us.lock().unwrap(); if us_lock.rl_requested_disconnect { - shutdown_socket!("disconnect_socket() call from RL", Disconnect::CloseConnection); + break Disconnect::CloseConnection; } us_lock.read_paused }; tokio::select! { v = write_avail_receiver.recv() => { assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc! - if let Err(e) = peer_manager.write_buffer_space_avail(&mut our_descriptor) { - shutdown_socket!(e, Disconnect::CloseConnection); + if let Err(_) = peer_manager.write_buffer_space_avail(&mut our_descriptor) { + break Disconnect::CloseConnection; } }, _ = read_wake_receiver.recv() => {}, read = reader.read(&mut buf), if !read_paused => match read { - Ok(0) => shutdown_socket!("Connection closed", Disconnect::PeerDisconnected), + Ok(0) => break Disconnect::PeerDisconnected, Ok(len) => { let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]); let mut us_lock = us.lock().unwrap(); @@ -174,10 +167,10 @@ impl Connection { us_lock.read_paused = true; } }, - Err(e) => shutdown_socket!(e, Disconnect::CloseConnection), + Err(_) => break Disconnect::CloseConnection, } }, - Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected), + Err(_) => break Disconnect::PeerDisconnected, }, } peer_manager.process_events(); @@ -222,10 +215,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -262,10 +256,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -332,10 +327,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -496,7 +492,6 @@ mod tests { fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } - fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { } fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } @@ -563,7 +558,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -577,7 +572,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is