X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=f7e42b6634147d405e2cfebe1278801940e0a771;hb=1fd6c6fb9f7e58e8c0cf6539e7a9451e57a2b6fd;hp=7082916fcdba75b733a364008a9c18c12e6d8ca4;hpb=65920818db58880f6576fd50c3ea5df273912978;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 7082916f..f7e42b66 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -120,11 +120,28 @@ struct Connection { id: u64, } impl Connection { + async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + loop { + if event_receiver.recv().await.is_none() { + return; + } + peer_manager.process_events(); + } + } + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where - CMH: ChannelMessageHandler + 'static, - RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized, - UMH: CustomMessageHandler + 'static { + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + // Create a waker to wake up poll_event_process, above + let (event_waker, event_receiver) = mpsc::channel(1); + tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver)); + // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -175,7 +192,14 @@ impl Connection { Err(_) => break Disconnect::PeerDisconnected, }, } - peer_manager.process_events(); + let _ = event_waker.try_send(()); + + // At this point we've processed a message or two, and reset the ping timer for this + // peer, at least in the "are we still receiving messages" context, if we don't give up + // our timeslice to another task we may just spin on this peer, starving other peers + // and eventually disconnecting them for ping timeouts. Instead, we explicitly yield + // here. + tokio::task::yield_now().await; }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { @@ -443,6 +467,9 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { // pause read given we're now waiting on the remote end to ACK (and in // accordance with the send_data() docs). us.read_paused = true; + // Further, to avoid any current pending read causing a `read_event` call, wake + // up the read_waker and restart its loop. + let _ = us.read_waker.try_send(()); return written_len; }, }