use lightning::util::events;
use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;
-use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
+use lightning::util::events::MessageSendEventsProvider;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use lightning::routing::router::{Route, RouteHop};
use lightning::routing::router::get_route;
use lightning::routing::network_graph::NetGraphMsgHandler;
use lightning::util::config::UserConfig;
-use lightning::util::events::{EventsProvider,Event};
+use lightning::util::events::Event;
use lightning::util::enforcing_trait_impls::EnforcingSigner;
use lightning::util::logger::Logger;
use lightning::util::ser::Readable;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
use lightning::util::config::UserConfig;
- use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
+ use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
//! use tokio::sync::mpsc;
//! use std::net::TcpStream;
//! use bitcoin::secp256k1::key::PublicKey;
-//! use lightning::util::events::EventsProvider;
+//! use lightning::util::events::{Event, EventHandler, EventsProvider};
//! use std::net::SocketAddr;
//! use std::sync::Arc;
//!
//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
//! loop {
//! receiver.recv().await;
-//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//! // Handle the event!
-//! }
-//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//! // Handle the event!
-//! }
+//! channel_manager.process_pending_events(&|event| {
+//! // Handle the event!
+//! });
+//! chain_monitor.process_pending_events(&|event| {
+//! // Handle the event!
+//! });
//! }
//! }
//!
//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
//! loop {
//! receiver.recv().await;
-//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
-//! // Handle the event!
-//! }
-//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
-//! // Handle the event!
-//! }
+//! channel_manager.process_pending_events(&|event| {
+//! // Handle the event!
+//! });
+//! chain_monitor.process_pending_events(&|event| {
+//! // Handle the event!
+//! });
//! }
//! }
//! ```
use chain::keysinterface::Sign;
use util::logger::Logger;
use util::events;
-use util::events::Event;
+use util::events::EventHandler;
use std::collections::{HashMap, hash_map};
use std::sync::RwLock;
persister,
}
}
+
+ #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+ pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+ use util::events::EventsProvider;
+ let events = std::cell::RefCell::new(Vec::new());
+ let event_handler = |event| events.borrow_mut().push(event);
+ self.process_pending_events(&event_handler);
+ events.into_inner()
+ }
}
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
{
- fn get_and_clear_pending_events(&self) -> Vec<Event> {
+ /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+ ///
+ /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+ /// order to handle these events.
+ ///
+ /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+ fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
let mut pending_events = Vec::new();
for monitor in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor.get_and_clear_pending_events());
}
- pending_events
+ for event in pending_events.drain(..) {
+ handler.handle_event(event);
+ }
}
}
use ::{check_added_monitors, get_local_commitment_txn};
use ln::features::InitFeatures;
use ln::functional_test_utils::*;
- use util::events::EventsProvider;
use util::events::MessageSendEventsProvider;
use util::test_utils::{OnRegisterOutput, TxOutReference};
use routing::router::get_route;
use util::config::UserConfig;
use util::enforcing_trait_impls::EnforcingSigner;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::ser::{ReadableArgs, Writeable};
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
use util::config::UserConfig;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use util::{byte_utils, events};
use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
use util::chacha20::{ChaCha20, ChaChaReader};
/// Note that this includes RBF or similar transaction replacement strategies - lightning does
/// not currently support replacing a funding transaction on an existing channel. Instead,
/// create a new channel with a conflicting funding transaction.
+ ///
+ /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
}
}
- /// Process pending events from the `chain::Watch`.
- fn process_pending_monitor_events(&self) {
+ /// Process pending events from the `chain::Watch`, returning whether any events were processed.
+ fn process_pending_monitor_events(&self) -> bool {
let mut failed_channels = Vec::new();
- {
- for monitor_event in self.chain_monitor.release_pending_monitor_events() {
+ let has_pending_monitor_events = {
+ let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+ let has_pending_monitor_events = !pending_monitor_events.is_empty();
+ for monitor_event in pending_monitor_events {
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
},
}
}
- }
+ has_pending_monitor_events
+ };
for failure in failed_channels.drain(..) {
self.finish_force_close_channel(failure);
}
+
+ has_pending_monitor_events
}
/// Check the holding cell in each channel and free any pending HTLCs in them if possible.
pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
}
+
+ #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+ pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+ let events = std::cell::RefCell::new(Vec::new());
+ let event_handler = |event| events.borrow_mut().push(event);
+ self.process_pending_events(&event_handler);
+ events.into_inner()
+ }
}
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
}
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
- where M::Target: chain::Watch<Signer>,
- T::Target: BroadcasterInterface,
- K::Target: KeysInterface<Signer = Signer>,
- F::Target: FeeEstimator,
- L::Target: Logger,
+where
+ M::Target: chain::Watch<Signer>,
+ T::Target: BroadcasterInterface,
+ K::Target: KeysInterface<Signer = Signer>,
+ F::Target: FeeEstimator,
+ L::Target: Logger,
{
- fn get_and_clear_pending_events(&self) -> Vec<Event> {
- //TODO: This behavior should be documented. It's non-intuitive that we query
- // ChannelMonitors when clearing other events.
- self.process_pending_monitor_events();
+ /// Processes events that must be periodically handled.
+ ///
+ /// An [`EventHandler`] may safely call back to the provider in order to handle an event.
+ /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
+ ///
+ /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
+ /// when processed, an [`EventHandler`] must be able to handle previously seen events when
+ /// restarting from an old state.
+ fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+ PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+ let mut result = NotifyOption::SkipPersist;
- let mut ret = Vec::new();
- let mut pending_events = self.pending_events.lock().unwrap();
- mem::swap(&mut ret, &mut *pending_events);
- ret
+ // TODO: This behavior should be documented. It's unintuitive that we query
+ // ChannelMonitors when clearing other events.
+ if self.process_pending_monitor_events() {
+ result = NotifyOption::DoPersist;
+ }
+
+ let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+ if !pending_events.is_empty() {
+ result = NotifyOption::DoPersist;
+ }
+
+ for event in pending_events.drain(..) {
+ handler.handle_event(event);
+ }
+
+ result
+ });
}
}
use routing::router::get_route;
use util::test_utils;
use util::config::UserConfig;
- use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+ use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256::Hash as Sha256;
use util::enforcing_trait_impls::EnforcingSigner;
use util::test_utils;
use util::test_utils::TestChainMonitor;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::config::UserConfig;
use util::ser::{ReadableArgs, Writeable, Readable};
use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction};
use util::enforcing_trait_impls::EnforcingSigner;
use util::{byte_utils, test_utils};
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::ser::{Writeable, ReadableArgs};
use util::config::UserConfig;
use ln::msgs;
use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField};
use util::test_utils;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::ser::{Writeable, Writer};
use util::config::UserConfig;
use ln::features::InitFeatures;
use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate};
use util::enforcing_trait_impls::EnforcingSigner;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::test_utils;
use util::ser::{ReadableArgs, Writeable};
use bitcoin::secp256k1::key::PublicKey;
use core::time::Duration;
+use std::ops::Deref;
/// An Event which you should probably take some action in response to.
///
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
}
-/// A trait indicating an object may generate events
+/// A trait indicating an object may generate events.
+///
+/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
+///
+/// # Requirements
+///
+/// See [`process_pending_events`] for requirements around event processing.
+///
+/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending
+/// event since the last invocation. The handler must either act upon the event immediately
+/// or preserve it for later handling.
+///
+/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to
+/// consult the provider's documentation on the implication of processing events and how a handler
+/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and
+/// [`ChainMonitor::process_pending_events`]).
+///
+/// [`process_pending_events`]: Self::process_pending_events
+/// [`handle_event`]: EventHandler::handle_event
+/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events
+/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events
pub trait EventsProvider {
- /// Gets the list of pending events which were generated by previous actions, clearing the list
- /// in the process.
- fn get_and_clear_pending_events(&self) -> Vec<Event>;
+ /// Processes any events generated since the last call using the given event handler.
+ ///
+ /// Subsequent calls must only process new events. However, handlers must be capable of handling
+ /// duplicate events across process restarts. This may occur if the provider was recovered from
+ /// an old state (i.e., it hadn't been successfully persisted after processing pending events).
+ fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
+}
+
+/// A trait implemented for objects handling events from [`EventsProvider`].
+pub trait EventHandler {
+ /// Handles the given [`Event`].
+ ///
+ /// See [`EventsProvider`] for details that must be considered when implementing this method.
+ fn handle_event(&self, event: Event);
+}
+
+impl<F> EventHandler for F where F: Fn(Event) {
+ fn handle_event(&self, event: Event) {
+ self(event)
+ }
}