X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=f15f271e42cbe07e3ff41f5c9f5a5bc76e0f806a;hb=94bb0c9128c2635ea08ec089f79fab04880dedd0;hp=e84ee76229fecb2a3090c72ed1865102f119eb74;hpb=6cd6816cd7efce593ef487d07adfff66cee0daa2;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index e84ee762..f15f271e 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -24,7 +24,7 @@ //! The call site should, thus, look something like this: //! ``` //! use tokio::sync::mpsc; -//! use tokio::net::TcpStream; +//! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; //! use lightning::util::events::EventsProvider; //! use std::net::SocketAddr; @@ -36,7 +36,8 @@ //! type Logger = dyn lightning::util::logger::Logger; //! type ChainAccess = dyn lightning::chain::Access; //! type ChainFilter = dyn lightning::chain::Filter; -//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc>; +//! type DataPersister = dyn lightning::chain::channelmonitor::Persist; +//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; //! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; //! @@ -85,6 +86,7 @@ use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -155,7 +157,7 @@ impl Connection { // In this case, we do need to call peer_manager.socket_disconnected() to inform // Rust-Lightning that the socket is gone. PeerDisconnected - }; + } let disconnect_type = loop { macro_rules! shutdown_socket { ($err: expr, $need_disconnect: expr) => { { @@ -217,7 +219,7 @@ impl Connection { } } - fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -228,7 +230,8 @@ impl Connection { // we shove a value into the channel which comes after we've reset the read_paused bool to // false. let (read_waker, read_receiver) = mpsc::channel(1); - let (reader, writer) = io::split(stream); + stream.set_nonblocking(true).unwrap(); + let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap()); (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { @@ -247,7 +250,7 @@ impl Connection { /// not need to poll the provided future in order to make progress. /// /// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -289,7 +292,7 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -365,7 +368,7 @@ pub async fn connect_outbound(peer_manager: Arc; - let mut sender = unsafe { (*sender_ptr).clone() }; + let sender = unsafe { (*sender_ptr).clone() }; let _ = sender.try_send(()); } fn drop_socket_waker(orig_ptr: *const ()) { @@ -511,6 +514,7 @@ mod tests { use tokio::sync::mpsc; use std::mem; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -525,6 +529,7 @@ mod tests { expected_pubkey: PublicKey, pubkey_connected: mpsc::Sender<()>, pubkey_disconnected: mpsc::Sender<()>, + disconnected_flag: AtomicBool, msg_events: Mutex>, } impl RoutingMessageHandler for MsgHandler { @@ -534,7 +539,11 @@ mod tests { fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { } fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } - fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false } + fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {} @@ -554,6 +563,7 @@ mod tests { fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {} fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) { if *their_node_id == self.expected_pubkey { + self.disconnected_flag.store(true, Ordering::SeqCst); self.pubkey_disconnected.clone().try_send(()).unwrap(); } } @@ -586,6 +596,7 @@ mod tests { expected_pubkey: b_pub, pubkey_connected: a_connected_sender, pubkey_disconnected: a_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let a_manager = Arc::new(PeerManager::new(MessageHandler { @@ -599,6 +610,7 @@ mod tests { expected_pubkey: a_pub, pubkey_connected: b_connected_sender, pubkey_disconnected: b_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let b_manager = Arc::new(PeerManager::new(MessageHandler { @@ -619,8 +631,8 @@ mod tests { } else { panic!("Failed to bind to v4 localhost on common ports"); }; let (sender, _receiver) = mpsc::channel(2); - let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap()); - let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap()); + let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); + let fut_b = super::setup_inbound(b_manager, sender, conn_b); tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); @@ -628,18 +640,20 @@ mod tests { a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None } }); - assert!(a_disconnected.try_recv().is_err()); - assert!(b_disconnected.try_recv().is_err()); + assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst)); a_manager.process_events(); tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap(); + assert!(a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(b_handler.disconnected_flag.load(Ordering::SeqCst)); fut_a.await; fut_b.await; } - #[tokio::test(threaded_scheduler)] + #[tokio::test(flavor = "multi_thread")] async fn basic_threaded_connection_test() { do_basic_connection_test().await; }