From b1550524cfe2689e3743ddfc0a78527d26de8613 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 6 Oct 2021 04:29:19 +0000 Subject: [PATCH] [net-tokio] Call PeerManager::process_events without blocking reads Unlike very ancient versions of lightning-net-tokio, this does not rely on a single global process_events future, but instead has one per connection. This could still cause significant contention, so we'll ensure only two process_events calls can exist at once in the next few commits. --- lightning-net-tokio/src/lib.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index a9fd861b..cee7c5c1 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -121,11 +121,28 @@ struct Connection { id: u64, } impl Connection { + async fn poll_event_process(peer_manager: Arc, Arc, Arc, Arc>>, mut event_receiver: mpsc::Receiver<()>) where + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + loop { + if event_receiver.recv().await.is_none() { + return; + } + peer_manager.process_events(); + } + } + async fn schedule_read(peer_manager: Arc, Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where - CMH: ChannelMessageHandler + 'static, - RMH: RoutingMessageHandler + 'static, - L: Logger + 'static + ?Sized, - UMH: CustomMessageHandler + 'static { + CMH: ChannelMessageHandler + 'static + Send + Sync, + RMH: RoutingMessageHandler + 'static + Send + Sync, + L: Logger + 'static + ?Sized + Send + Sync, + UMH: CustomMessageHandler + 'static + Send + Sync { + // Create a waker to wake up poll_event_process, above + let (event_waker, event_receiver) = mpsc::channel(1); + tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver)); + // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -176,7 +193,7 @@ impl Connection { Err(_) => break Disconnect::PeerDisconnected, }, } - peer_manager.process_events(); + let _ = event_waker.try_send(()); }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { -- 2.30.2