X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=80f7e055283ebbe5bfdc274d089057b14eb435b9;hb=32abba7201f4045d31bc7c7df43683028989b4aa;hp=36384380fb14a458b31e132446ae374ffaf0b51d;hpb=9c3f3e76e58586dd0d985ea9f58193e4a757512b;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 36384380..80f7e055 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -24,7 +24,7 @@ //! The call site should, thus, look something like this: //! ``` //! use tokio::sync::mpsc; -//! use tokio::net::TcpStream; +//! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; //! use lightning::util::events::EventsProvider; //! use std::net::SocketAddr; @@ -86,6 +86,7 @@ use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -156,7 +157,7 @@ impl Connection { // In this case, we do need to call peer_manager.socket_disconnected() to inform // Rust-Lightning that the socket is gone. PeerDisconnected - }; + } let disconnect_type = loop { macro_rules! shutdown_socket { ($err: expr, $need_disconnect: expr) => { { @@ -218,7 +219,7 @@ impl Connection { } } - fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -229,7 +230,8 @@ impl Connection { // we shove a value into the channel which comes after we've reset the read_paused bool to // false. let (read_waker, read_receiver) = mpsc::channel(1); - let (reader, writer) = io::split(stream); + stream.set_nonblocking(true).unwrap(); + let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap()); (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { @@ -248,7 +250,7 @@ impl Connection { /// not need to poll the provided future in order to make progress. /// /// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -290,7 +292,7 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -366,7 +368,7 @@ pub async fn connect_outbound(peer_manager: Arc; - let mut sender = unsafe { (*sender_ptr).clone() }; + let sender = unsafe { (*sender_ptr).clone() }; let _ = sender.try_send(()); } fn drop_socket_waker(orig_ptr: *const ()) { @@ -512,6 +514,7 @@ mod tests { use tokio::sync::mpsc; use std::mem; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -526,6 +529,7 @@ mod tests { expected_pubkey: PublicKey, pubkey_connected: mpsc::Sender<()>, pubkey_disconnected: mpsc::Sender<()>, + disconnected_flag: AtomicBool, msg_events: Mutex>, } impl RoutingMessageHandler for MsgHandler { @@ -535,7 +539,11 @@ mod tests { fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { } fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } - fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false } + fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {} @@ -555,6 +563,7 @@ mod tests { fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {} fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) { if *their_node_id == self.expected_pubkey { + self.disconnected_flag.store(true, Ordering::SeqCst); self.pubkey_disconnected.clone().try_send(()).unwrap(); } } @@ -587,6 +596,7 @@ mod tests { expected_pubkey: b_pub, pubkey_connected: a_connected_sender, pubkey_disconnected: a_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let a_manager = Arc::new(PeerManager::new(MessageHandler { @@ -600,6 +610,7 @@ mod tests { expected_pubkey: a_pub, pubkey_connected: b_connected_sender, pubkey_disconnected: b_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let b_manager = Arc::new(PeerManager::new(MessageHandler { @@ -620,8 +631,8 @@ mod tests { } else { panic!("Failed to bind to v4 localhost on common ports"); }; let (sender, _receiver) = mpsc::channel(2); - let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap()); - let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap()); + let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); + let fut_b = super::setup_inbound(b_manager, sender, conn_b); tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); @@ -629,18 +640,20 @@ mod tests { a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None } }); - assert!(a_disconnected.try_recv().is_err()); - assert!(b_disconnected.try_recv().is_err()); + assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst)); a_manager.process_events(); tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap(); + assert!(a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(b_handler.disconnected_flag.load(Ordering::SeqCst)); fut_a.await; fut_b.await; } - #[tokio::test(threaded_scheduler)] + #[tokio::test(flavor = "multi_thread")] async fn basic_threaded_connection_test() { do_basic_connection_test().await; }