X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=68a25a75ef39fa33813f55343aab8407b20813fd;hb=7a586455c10d835a425fa1de8ec447acbf9c67f3;hp=5f5fece0d2ed34e447e2905f9f3bd6d3f42ce102;hpb=9d04227a66469a6e9569b9bff82d89f42121b0bd;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5f5fece0..68a25a75 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -37,7 +37,7 @@ //! type DataPersister = dyn lightning::chain::channelmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; -//! type PeerManager = Arc>; +//! type PeerManager = Arc>>; //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { @@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; +use lightning::ln::peer_handler::UnknownMessageHandler; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::util::logger::Logger; @@ -119,10 +120,11 @@ struct Connection { id: u64, } impl Connection { - async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized { + L: Logger + 'static + ?Sized, + UMH: UnknownMessageHandler + 'static { // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -222,10 +224,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -262,10 +265,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -332,10 +336,11 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, - L: Logger + 'static + ?Sized + Send + Sync { + L: Logger + 'static + ?Sized + Send + Sync, + UMH: UnknownMessageHandler + 'static + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } @@ -467,7 +472,7 @@ impl Hash for SocketDescriptor { mod tests { use lightning::ln::features::*; use lightning::ln::msgs::*; - use lightning::ln::peer_handler::{MessageHandler, PeerManager}; + use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringUnknownMessageHandler}; use lightning::util::events::*; use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey}; @@ -563,7 +568,7 @@ mod tests { let a_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - }, a_key.clone(), &[1; 32], Arc::new(TestLogger()))); + }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -577,7 +582,7 @@ mod tests { let b_manager = Arc::new(PeerManager::new(MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - }, b_key.clone(), &[2; 32], Arc::new(TestLogger()))); + }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(IgnoringUnknownMessageHandler {}))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is