Refactor EventsProvider to take an EventHandler
authorJeffrey Czyz <jkczyz@gmail.com>
Wed, 12 May 2021 07:34:30 +0000 (00:34 -0700)
committerJeffrey Czyz <jkczyz@gmail.com>
Mon, 24 May 2021 21:16:16 +0000 (14:16 -0700)
12 files changed:
fuzz/src/chanmon_consistency.rs
fuzz/src/full_stack.rs
lightning-background-processor/src/lib.rs
lightning-net-tokio/src/lib.rs
lightning/src/chain/chainmonitor.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/onion_route_tests.rs
lightning/src/ln/reorg_tests.rs
lightning/src/util/events.rs

index 732ccd27e88e11da7de2c8f0128d5cf2493e6759..1b255791f7cdeecaba8a2c94cd0f1805142df1a2 100644 (file)
@@ -44,7 +44,7 @@ use lightning::util::errors::APIError;
 use lightning::util::events;
 use lightning::util::logger::Logger;
 use lightning::util::config::UserConfig;
-use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
+use lightning::util::events::MessageSendEventsProvider;
 use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
 use lightning::routing::router::{Route, RouteHop};
 
index f31ec7f89694abb366c4f305ae9e66bfd59edc23..52b37a10f23bac6ad4cd8330ea47f7346cc7be84 100644 (file)
@@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError;
 use lightning::routing::router::get_route;
 use lightning::routing::network_graph::NetGraphMsgHandler;
 use lightning::util::config::UserConfig;
-use lightning::util::events::{EventsProvider,Event};
+use lightning::util::events::Event;
 use lightning::util::enforcing_trait_impls::EnforcingSigner;
 use lightning::util::logger::Logger;
 use lightning::util::ser::Readable;
index de9aa286a623959a41c5d8d2f0a244283c24e353..9273b358a9109ad29ac649fc35a47866c0046d1e 100644 (file)
@@ -173,7 +173,7 @@ mod tests {
        use lightning::ln::msgs::ChannelMessageHandler;
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
        use lightning::util::config::UserConfig;
-       use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
+       use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
        use lightning::util::logger::Logger;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
index 6e2a0c22eef397b18433aefa7a575cd2084ecc73..8ff186c401f9c540b10321eb9733e5cf7d84c7f7 100644 (file)
@@ -26,7 +26,7 @@
 //! use tokio::sync::mpsc;
 //! use std::net::TcpStream;
 //! use bitcoin::secp256k1::key::PublicKey;
-//! use lightning::util::events::EventsProvider;
+//! use lightning::util::events::{Event, EventHandler, EventsProvider};
 //! use std::net::SocketAddr;
 //! use std::sync::Arc;
 //!
 //!     lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
 //!     loop {
 //!         receiver.recv().await;
-//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
-//!         for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
+//!         channel_manager.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
+//!         chain_monitor.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
 //!     }
 //! }
 //!
 //!     lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
 //!     loop {
 //!         receiver.recv().await;
-//!         for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
-//!         for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//!             // Handle the event!
-//!         }
+//!         channel_manager.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
+//!         chain_monitor.process_pending_events(&|event| {
+//!            // Handle the event!
+//!         });
 //!     }
 //! }
 //! ```
index a9b570d523d69d420e7b9bb31c3f638eb7d6df23..8e6ddc76c4911e2d79343e58a199453535e08d76 100644 (file)
@@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::logger::Logger;
 use util::events;
-use util::events::Event;
+use util::events::EventHandler;
 
 use std::collections::{HashMap, hash_map};
 use std::sync::RwLock;
@@ -139,6 +139,15 @@ where C::Target: chain::Filter,
                        persister,
                }
        }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               use util::events::EventsProvider;
+               let events = std::cell::RefCell::new(Vec::new());
+               let event_handler = |event| events.borrow_mut().push(event);
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -306,12 +315,20 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
              L::Target: Logger,
              P::Target: channelmonitor::Persist<ChannelSigner>,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
+       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+       /// order to handle these events.
+       ///
+       /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
                let mut pending_events = Vec::new();
                for monitor in self.monitors.read().unwrap().values() {
                        pending_events.append(&mut monitor.get_and_clear_pending_events());
                }
-               pending_events
+               for event in pending_events.drain(..) {
+                       handler.handle_event(event);
+               }
        }
 }
 
@@ -320,7 +337,6 @@ mod tests {
        use ::{check_added_monitors, get_local_commitment_txn};
        use ln::features::InitFeatures;
        use ln::functional_test_utils::*;
-       use util::events::EventsProvider;
        use util::events::MessageSendEventsProvider;
        use util::test_utils::{OnRegisterOutput, TxOutReference};
 
index 79bbfadc7687372e63d9c738f45c55b18c8aa64d..2e758aa48c82a0eecf1a9761741cf37e81ecabc6 100644 (file)
@@ -27,7 +27,7 @@ use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
 use routing::router::get_route;
 use util::config::UserConfig;
 use util::enforcing_trait_impls::EnforcingSigner;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 use util::errors::APIError;
 use util::ser::{ReadableArgs, Writeable};
 
index 190fb2bc041181179b7fe33a150ca4f4aeefb6c9..34a3786c21da4f7aa98c82372fbe90d70dcae376 100644 (file)
@@ -54,7 +54,7 @@ use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
 use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
 use util::config::UserConfig;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 use util::{byte_utils, events};
 use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
@@ -1860,6 +1860,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Note that this includes RBF or similar transaction replacement strategies - lightning does
        /// not currently support replacing a funding transaction on an existing channel. Instead,
        /// create a new channel with a conflicting funding transaction.
+       ///
+       /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
@@ -3449,11 +3451,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// Process pending events from the `chain::Watch`.
-       fn process_pending_monitor_events(&self) {
+       /// Process pending events from the `chain::Watch`, returning whether any events were processed.
+       fn process_pending_monitor_events(&self) -> bool {
                let mut failed_channels = Vec::new();
-               {
-                       for monitor_event in self.chain_monitor.release_pending_monitor_events() {
+               let has_pending_monitor_events = {
+                       let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+                       let has_pending_monitor_events = !pending_monitor_events.is_empty();
+                       for monitor_event in pending_monitor_events {
                                match monitor_event {
                                        MonitorEvent::HTLCEvent(htlc_update) => {
                                                if let Some(preimage) = htlc_update.payment_preimage {
@@ -3490,11 +3494,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        },
                                }
                        }
-               }
+                       has_pending_monitor_events
+               };
 
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
+
+               has_pending_monitor_events
        }
 
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
@@ -3670,6 +3677,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
                self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
        }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               let events = std::cell::RefCell::new(Vec::new());
+               let event_handler = |event| events.borrow_mut().push(event);
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -3694,21 +3709,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
-       where M::Target: chain::Watch<Signer>,
-        T::Target: BroadcasterInterface,
-        K::Target: KeysInterface<Signer = Signer>,
-        F::Target: FeeEstimator,
-                               L::Target: Logger,
+where
+       M::Target: chain::Watch<Signer>,
+       T::Target: BroadcasterInterface,
+       K::Target: KeysInterface<Signer = Signer>,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               //TODO: This behavior should be documented. It's non-intuitive that we query
-               // ChannelMonitors when clearing other events.
-               self.process_pending_monitor_events();
+       /// Processes events that must be periodically handled.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider in order to handle an event.
+       /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
+       ///
+       /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
+       /// when processed, an [`EventHandler`] must be able to handle previously seen events when
+       /// restarting from an old state.
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut result = NotifyOption::SkipPersist;
 
-               let mut ret = Vec::new();
-               let mut pending_events = self.pending_events.lock().unwrap();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
+                       // TODO: This behavior should be documented. It's unintuitive that we query
+                       // ChannelMonitors when clearing other events.
+                       if self.process_pending_monitor_events() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+                       if !pending_events.is_empty() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       for event in pending_events.drain(..) {
+                               handler.handle_event(event);
+                       }
+
+                       result
+               });
        }
 }
 
@@ -4956,7 +4992,7 @@ pub mod bench {
        use routing::router::get_route;
        use util::test_utils;
        use util::config::UserConfig;
-       use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+       use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;
index c15005089e51f533f6a4a7e334d0b5dd61a9c329..bf1a24d36852b2450af13218ee3025a7b2bf78e8 100644 (file)
@@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
 use util::enforcing_trait_impls::EnforcingSigner;
 use util::test_utils;
 use util::test_utils::TestChainMonitor;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 use util::errors::APIError;
 use util::config::UserConfig;
 use util::ser::{ReadableArgs, Writeable, Readable};
index 9f1769ff513c242f3f3f72c20e9683b53c4be203..5fa93e18ae506514bf2be0474cab87f74dd825b7 100644 (file)
@@ -29,7 +29,7 @@ use ln::msgs;
 use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction};
 use util::enforcing_trait_impls::EnforcingSigner;
 use util::{byte_utils, test_utils};
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 use util::errors::APIError;
 use util::ser::{Writeable, ReadableArgs};
 use util::config::UserConfig;
index e3184fc65bb711f824636df7dc020f3342f79a04..bab78e7a1c75504c94452401a77732a1956a9a7f 100644 (file)
@@ -20,7 +20,7 @@ use ln::features::{InitFeatures, InvoiceFeatures};
 use ln::msgs;
 use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField};
 use util::test_utils;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 use util::ser::{Writeable, Writer};
 use util::config::UserConfig;
 
index 6906e724d26daef663e1eb53f794bbfb09b9c145..f81c42a36f710a1515f092bc7f000a1d79085f03 100644 (file)
@@ -15,7 +15,7 @@ use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
 use ln::features::InitFeatures;
 use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate};
 use util::enforcing_trait_impls::EnforcingSigner;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 use util::test_utils;
 use util::ser::{ReadableArgs, Writeable};
 
index b7b8bf3137b8ea44d6d667842c3b2473b7a8790e..013e3c4e611bbdfb130ca85094911499e65a6da5 100644 (file)
@@ -24,6 +24,7 @@ use bitcoin::blockdata::script::Script;
 use bitcoin::secp256k1::key::PublicKey;
 
 use core::time::Duration;
+use std::ops::Deref;
 
 /// An Event which you should probably take some action in response to.
 ///
@@ -376,9 +377,46 @@ pub trait MessageSendEventsProvider {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
 }
 
-/// A trait indicating an object may generate events
+/// A trait indicating an object may generate events.
+///
+/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
+///
+/// # Requirements
+///
+/// See [`process_pending_events`] for requirements around event processing.
+///
+/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending
+/// event since the last invocation. The handler must either act upon the event immediately
+/// or preserve it for later handling.
+///
+/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to
+/// consult the provider's documentation on the implication of processing events and how a handler
+/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and
+/// [`ChainMonitor::process_pending_events`]).
+///
+/// [`process_pending_events`]: Self::process_pending_events
+/// [`handle_event`]: EventHandler::handle_event
+/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events
+/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events
 pub trait EventsProvider {
-       /// Gets the list of pending events which were generated by previous actions, clearing the list
-       /// in the process.
-       fn get_and_clear_pending_events(&self) -> Vec<Event>;
+       /// Processes any events generated since the last call using the given event handler.
+       ///
+       /// Subsequent calls must only process new events. However, handlers must be capable of handling
+       /// duplicate events across process restarts. This may occur if the provider was recovered from
+       /// an old state (i.e., it hadn't been successfully persisted after processing pending events).
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
+}
+
+/// A trait implemented for objects handling events from [`EventsProvider`].
+pub trait EventHandler {
+       /// Handles the given [`Event`].
+       ///
+       /// See [`EventsProvider`] for details that must be considered when implementing this method.
+       fn handle_event(&self, event: Event);
+}
+
+impl<F> EventHandler for F where F: Fn(Event) {
+       fn handle_event(&self, event: Event) {
+               self(event)
+       }
 }