From 7c465d69dcd81d540e6b21ceb46531f22e8414f0 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 12 May 2021 00:34:30 -0700 Subject: [PATCH] Refactor EventsProvider to take an EventHandler --- fuzz/src/chanmon_consistency.rs | 2 +- fuzz/src/full_stack.rs | 2 +- lightning-background-processor/src/lib.rs | 2 +- lightning-net-tokio/src/lib.rs | 26 +++---- lightning/src/chain/chainmonitor.rs | 24 +++++- lightning/src/ln/chanmon_update_fail_tests.rs | 2 +- lightning/src/ln/channelmanager.rs | 76 ++++++++++++++----- lightning/src/ln/functional_test_utils.rs | 2 +- lightning/src/ln/functional_tests.rs | 2 +- lightning/src/ln/onion_route_tests.rs | 2 +- lightning/src/ln/reorg_tests.rs | 2 +- lightning/src/util/events.rs | 46 ++++++++++- 12 files changed, 139 insertions(+), 49 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 732ccd27..1b255791 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -44,7 +44,7 @@ use lightning::util::errors::APIError; use lightning::util::events; use lightning::util::logger::Logger; use lightning::util::config::UserConfig; -use lightning::util::events::{EventsProvider, MessageSendEventsProvider}; +use lightning::util::events::MessageSendEventsProvider; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use lightning::routing::router::{Route, RouteHop}; diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index f31ec7f8..52b37a10 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; use lightning::routing::network_graph::NetGraphMsgHandler; use lightning::util::config::UserConfig; -use lightning::util::events::{EventsProvider,Event}; +use lightning::util::events::Event; use lightning::util::enforcing_trait_impls::EnforcingSigner; use lightning::util::logger::Logger; use lightning::util::ser::Readable; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index de9aa286..9273b358 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -173,7 +173,7 @@ mod tests { use lightning::ln::msgs::ChannelMessageHandler; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; - use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; + use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use lightning::util::test_utils; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 6e2a0c22..8ff186c4 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -26,7 +26,7 @@ //! use tokio::sync::mpsc; //! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; -//! use lightning::util::events::EventsProvider; +//! use lightning::util::events::{Event, EventHandler, EventsProvider}; //! use std::net::SocketAddr; //! use std::sync::Arc; //! @@ -47,12 +47,12 @@ //! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await; //! loop { //! receiver.recv().await; -//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } +//! channel_manager.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! chain_monitor.process_pending_events(&|event| { +//! // Handle the event! +//! }); //! } //! } //! @@ -62,12 +62,12 @@ //! lightning_net_tokio::setup_inbound(peer_manager, sender, socket); //! loop { //! receiver.recv().await; -//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } +//! channel_manager.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! chain_monitor.process_pending_events(&|event| { +//! // Handle the event! +//! }); //! } //! } //! ``` diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index a9b570d5..8e6ddc76 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; use util::events; -use util::events::Event; +use util::events::EventHandler; use std::collections::{HashMap, hash_map}; use std::sync::RwLock; @@ -139,6 +139,15 @@ where C::Target: chain::Filter, persister, } } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + use util::events::EventsProvider; + let events = std::cell::RefCell::new(Vec::new()); + let event_handler = |event| events.borrow_mut().push(event); + self.process_pending_events(&event_handler); + events.into_inner() + } } impl @@ -306,12 +315,20 @@ impl even L::Target: Logger, P::Target: channelmonitor::Persist, { - fn get_and_clear_pending_events(&self) -> Vec { + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: events::Event::SpendableOutputs + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); for monitor in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor.get_and_clear_pending_events()); } - pending_events + for event in pending_events.drain(..) { + handler.handle_event(event); + } } } @@ -320,7 +337,6 @@ mod tests { use ::{check_added_monitors, get_local_commitment_txn}; use ln::features::InitFeatures; use ln::functional_test_utils::*; - use util::events::EventsProvider; use util::events::MessageSendEventsProvider; use util::test_utils::{OnRegisterOutput, TxOutReference}; diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 79bbfadc..2e758aa4 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -27,7 +27,7 @@ use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler}; use routing::router::get_route; use util::config::UserConfig; use util::enforcing_trait_impls::EnforcingSigner; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::ser::{ReadableArgs, Writeable}; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 190fb2bc..34a3786c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,7 +54,7 @@ use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField}; use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner}; use util::config::UserConfig; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer}; use util::chacha20::{ChaCha20, ChaChaReader}; @@ -1860,6 +1860,8 @@ impl ChannelMana /// Note that this includes RBF or similar transaction replacement strategies - lightning does /// not currently support replacing a funding transaction on an existing channel. Instead, /// create a new channel with a conflicting funding transaction. + /// + /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@ -3449,11 +3451,13 @@ impl ChannelMana } } - /// Process pending events from the `chain::Watch`. - fn process_pending_monitor_events(&self) { + /// Process pending events from the `chain::Watch`, returning whether any events were processed. + fn process_pending_monitor_events(&self) -> bool { let mut failed_channels = Vec::new(); - { - for monitor_event in self.chain_monitor.release_pending_monitor_events() { + let has_pending_monitor_events = { + let pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); + let has_pending_monitor_events = !pending_monitor_events.is_empty(); + for monitor_event in pending_monitor_events { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { @@ -3490,11 +3494,14 @@ impl ChannelMana }, } } - } + has_pending_monitor_events + }; for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + has_pending_monitor_events } /// Check the holding cell in each channel and free any pending HTLCs in them if possible. @@ -3670,6 +3677,14 @@ impl ChannelMana pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result { self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id) } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + let events = std::cell::RefCell::new(Vec::new()); + let event_handler = |event| events.borrow_mut().push(event); + self.process_pending_events(&event_handler); + events.into_inner() + } } impl MessageSendEventsProvider for ChannelManager @@ -3694,21 +3709,42 @@ impl MessageSend } impl EventsProvider for ChannelManager - where M::Target: chain::Watch, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +where + M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, { - fn get_and_clear_pending_events(&self) -> Vec { - //TODO: This behavior should be documented. It's non-intuitive that we query - // ChannelMonitors when clearing other events. - self.process_pending_monitor_events(); + /// Processes events that must be periodically handled. + /// + /// An [`EventHandler`] may safely call back to the provider in order to handle an event. + /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock. + /// + /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared + /// when processed, an [`EventHandler`] must be able to handle previously seen events when + /// restarting from an old state. + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut result = NotifyOption::SkipPersist; - let mut ret = Vec::new(); - let mut pending_events = self.pending_events.lock().unwrap(); - mem::swap(&mut ret, &mut *pending_events); - ret + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events.drain(..) { + handler.handle_event(event); + } + + result + }); } } @@ -4956,7 +4992,7 @@ pub mod bench { use routing::router::get_route; use util::test_utils; use util::config::UserConfig; - use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; + use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c1500508..bf1a24d3 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; use util::enforcing_trait_impls::EnforcingSigner; use util::test_utils; use util::test_utils::TestChainMonitor; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::config::UserConfig; use util::ser::{ReadableArgs, Writeable, Readable}; diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 9f1769ff..5fa93e18 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -29,7 +29,7 @@ use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction}; use util::enforcing_trait_impls::EnforcingSigner; use util::{byte_utils, test_utils}; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::ser::{Writeable, ReadableArgs}; use util::config::UserConfig; diff --git a/lightning/src/ln/onion_route_tests.rs b/lightning/src/ln/onion_route_tests.rs index e3184fc6..bab78e7a 100644 --- a/lightning/src/ln/onion_route_tests.rs +++ b/lightning/src/ln/onion_route_tests.rs @@ -20,7 +20,7 @@ use ln::features::{InitFeatures, InvoiceFeatures}; use ln::msgs; use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField}; use util::test_utils; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::ser::{Writeable, Writer}; use util::config::UserConfig; diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 6906e724..f81c42a3 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -15,7 +15,7 @@ use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs}; use ln::features::InitFeatures; use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate}; use util::enforcing_trait_impls::EnforcingSigner; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::test_utils; use util::ser::{ReadableArgs, Writeable}; diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index b7b8bf31..013e3c4e 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -24,6 +24,7 @@ use bitcoin::blockdata::script::Script; use bitcoin::secp256k1::key::PublicKey; use core::time::Duration; +use std::ops::Deref; /// An Event which you should probably take some action in response to. /// @@ -376,9 +377,46 @@ pub trait MessageSendEventsProvider { fn get_and_clear_pending_msg_events(&self) -> Vec; } -/// A trait indicating an object may generate events +/// A trait indicating an object may generate events. +/// +/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`]. +/// +/// # Requirements +/// +/// See [`process_pending_events`] for requirements around event processing. +/// +/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending +/// event since the last invocation. The handler must either act upon the event immediately +/// or preserve it for later handling. +/// +/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to +/// consult the provider's documentation on the implication of processing events and how a handler +/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and +/// [`ChainMonitor::process_pending_events`]). +/// +/// [`process_pending_events`]: Self::process_pending_events +/// [`handle_event`]: EventHandler::handle_event +/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events +/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events pub trait EventsProvider { - /// Gets the list of pending events which were generated by previous actions, clearing the list - /// in the process. - fn get_and_clear_pending_events(&self) -> Vec; + /// Processes any events generated since the last call using the given event handler. + /// + /// Subsequent calls must only process new events. However, handlers must be capable of handling + /// duplicate events across process restarts. This may occur if the provider was recovered from + /// an old state (i.e., it hadn't been successfully persisted after processing pending events). + fn process_pending_events(&self, handler: H) where H::Target: EventHandler; +} + +/// A trait implemented for objects handling events from [`EventsProvider`]. +pub trait EventHandler { + /// Handles the given [`Event`]. + /// + /// See [`EventsProvider`] for details that must be considered when implementing this method. + fn handle_event(&self, event: Event); +} + +impl EventHandler for F where F: Fn(Event) { + fn handle_event(&self, event: Event) { + self(event) + } } -- 2.30.2