]> git.bitcoin.ninja Git - rust-lightning/commitdiff
[net-tokio] Call PeerManager::process_events without blocking reads
authorMatt Corallo <git@bluematt.me>
Wed, 6 Oct 2021 04:29:19 +0000 (04:29 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 10 May 2022 23:40:20 +0000 (23:40 +0000)
Unlike very ancient versions of lightning-net-tokio, this does not
rely on a single global process_events future, but instead has one
per connection. This could still cause significant contention, so
we'll ensure only two process_events calls can exist at once in
the next few commits.

lightning-net-tokio/src/lib.rs

index a9fd861bc846e440fd8ff54ab31deedf622aa8e6..cee7c5c1b982882244cdb33a5d82d7abb0c8eadc 100644 (file)
@@ -121,11 +121,28 @@ struct Connection {
        id: u64,
 }
 impl Connection {
+       async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
+                       CMH: ChannelMessageHandler + 'static + Send + Sync,
+                       RMH: RoutingMessageHandler + 'static + Send + Sync,
+                       L: Logger + 'static + ?Sized + Send + Sync,
+                       UMH: CustomMessageHandler + 'static + Send + Sync {
+               loop {
+                       if event_receiver.recv().await.is_none() {
+                               return;
+                       }
+                       peer_manager.process_events();
+               }
+       }
+
        async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
-                       CMH: ChannelMessageHandler + 'static,
-                       RMH: RoutingMessageHandler + 'static,
-                       L: Logger + 'static + ?Sized,
-                       UMH: CustomMessageHandler + 'static {
+                       CMH: ChannelMessageHandler + 'static + Send + Sync,
+                       RMH: RoutingMessageHandler + 'static + Send + Sync,
+                       L: Logger + 'static + ?Sized + Send + Sync,
+                       UMH: CustomMessageHandler + 'static + Send + Sync {
+               // Create a waker to wake up poll_event_process, above
+               let (event_waker, event_receiver) = mpsc::channel(1);
+               tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
+
                // 8KB is nice and big but also should never cause any issues with stack overflowing.
                let mut buf = [0; 8192];
 
@@ -176,7 +193,7 @@ impl Connection {
                                        Err(_) => break Disconnect::PeerDisconnected,
                                },
                        }
-                       peer_manager.process_events();
+                       let _ = event_waker.try_send(());
                };
                let writer_option = us.lock().unwrap().writer.take();
                if let Some(mut writer) = writer_option {