X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=13a4c0d934fe264fa5be936d440951c5c5bee1dd;hb=59925c1fa0509cda696825d334237655a346f294;hp=d2ee100281eed222fd0a172ab227606318e8aaca;hpb=d523b6e42b5b0fde07a89770906e0193ea987538;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index d2ee1002..13a4c0d9 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -43,13 +43,12 @@ //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { //! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; //! loop { -//! channel_manager.await_persistable_update(); -//! channel_manager.process_pending_events(&|event| { -//! // Handle the event! -//! }); -//! chain_monitor.process_pending_events(&|event| { +//! let event_handler = |event: &Event| { //! // Handle the event! -//! }); +//! }; +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&event_handler); +//! chain_monitor.process_pending_events(&event_handler); //! } //! } //! @@ -57,13 +56,12 @@ //! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, socket: TcpStream) { //! lightning_net_tokio::setup_inbound(peer_manager, socket); //! loop { -//! channel_manager.await_persistable_update(); -//! channel_manager.process_pending_events(&|event| { -//! // Handle the event! -//! }); -//! chain_monitor.process_pending_events(&|event| { +//! let event_handler = |event: &Event| { //! // Handle the event! -//! }); +//! }; +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&event_handler); +//! chain_monitor.process_pending_events(&event_handler); //! } //! } //! ``` @@ -80,6 +78,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +118,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: CustomMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -215,10 +215,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -255,10 +256,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -325,10 +327,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -489,7 +492,6 @@ mod tests { fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } - fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { } fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } @@ -556,7 +558,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -570,7 +572,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is