Refactor EventsProvider to take an EventHandler
[rust-lightning] / lightning-net-tokio / src / lib.rs
index 051341ee98cb230b16114ee743a87a80c749e5cd..8ff186c401f9c540b10321eb9733e5cf7d84c7f7 100644 (file)
 //! use tokio::sync::mpsc;
 //! use std::net::TcpStream;
 //! use bitcoin::secp256k1::key::PublicKey;
-//! use lightning::util::events::EventsProvider;
+//! use lightning::util::events::{Event, EventHandler, EventsProvider};
 //! use std::net::SocketAddr;
 //! use std::sync::Arc;
 //!
 //! // Define concrete types for our high-level objects:
-//! type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
-//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
-//! type Logger = dyn lightning::util::logger::Logger;
-//! type ChainAccess = dyn lightning::chain::Access;
-//! type ChainFilter = dyn lightning::chain::Filter;
-//! type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemorySigner>;
+//! type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
+//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
+//! type Logger = dyn lightning::util::logger::Logger + Send + Sync;
+//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
+//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
+//! type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
 //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
-//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
-//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>;
+//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
+//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
 //!
 //! // Connect to node with pubkey their_node_id at addr:
 //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
 //!     lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
 //!     loop {
 //!         receiver.recv().await;
-//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
-//!         for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
+//!         channel_manager.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
+//!         chain_monitor.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
 //!     }
 //! }
 //!
 //!     lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
 //!     loop {
 //!         receiver.recv().await;
-//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
-//!         for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
+//!         channel_manager.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
+//!         chain_monitor.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
 //!     }
 //! }
 //! ```
 
+#![deny(broken_intra_doc_links)]
+#![deny(missing_docs)]
+
 use bitcoin::secp256k1::key::PublicKey;
 
 use tokio::net::TcpStream;
@@ -251,9 +254,9 @@ impl Connection {
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
 pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
-               CMH: ChannelMessageHandler + 'static,
-               RMH: RoutingMessageHandler + 'static,
-               L: Logger + 'static + ?Sized {
+               CMH: ChannelMessageHandler + 'static + Send + Sync,
+               RMH: RoutingMessageHandler + 'static + Send + Sync,
+               L: Logger + 'static + ?Sized + Send + Sync {
        let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
        #[cfg(debug_assertions)]
        let last_us = Arc::clone(&us);
@@ -293,9 +296,9 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
 pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
-               CMH: ChannelMessageHandler + 'static,
-               RMH: RoutingMessageHandler + 'static,
-               L: Logger + 'static + ?Sized {
+               CMH: ChannelMessageHandler + 'static + Send + Sync,
+               RMH: RoutingMessageHandler + 'static + Send + Sync,
+               L: Logger + 'static + ?Sized + Send + Sync {
        let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
        #[cfg(debug_assertions)]
        let last_us = Arc::clone(&us);
@@ -365,9 +368,9 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
 pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
-               CMH: ChannelMessageHandler + 'static,
-               RMH: RoutingMessageHandler + 'static,
-               L: Logger + 'static + ?Sized {
+               CMH: ChannelMessageHandler + 'static + Send + Sync,
+               RMH: RoutingMessageHandler + 'static + Send + Sync,
+               L: Logger + 'static + ?Sized + Send + Sync {
        if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
                Some(setup_outbound(peer_manager, event_notify, their_node_id, stream))
        } else { None }
@@ -561,6 +564,7 @@ mod tests {
                fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {}
                fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {}
                fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {}
+               fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &ChannelUpdate) {}
                fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
                        if *their_node_id == self.expected_pubkey {
                                self.disconnected_flag.store(true, Ordering::SeqCst);