/// claims which are awaiting confirmation.
///
/// Includes the balances from each [`ChannelMonitor`] *except* those included in
- /// `ignored_channels`, allowing you to filter out balances from channels which are still open
- /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
+ /// `ignored_channels`.
///
/// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
/// inclusion in the return value.
self.event_notifier.notify();
}
- #[cfg(any(test, fuzzing, feature = "_test_utils"))]
+ #[cfg(any(test, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use crate::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
&self, handler: H
) {
- let mut pending_events = Vec::new();
- for monitor_state in self.monitors.read().unwrap().values() {
- pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
- }
- for event in pending_events {
- handler(event).await;
+ // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
+ // crazy dance to process a monitor's events then only remove them once we've done so.
+ let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
+ for funding_txo in mons_to_process {
+ let mut ev;
+ super::channelmonitor::process_events_body!(
+ self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
}
}
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
- #[cfg(not(anchors))]
- /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
- ///
- /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
- /// order to handle these events.
- ///
- /// [`SpendableOutputs`]: events::Event::SpendableOutputs
- fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
- let mut pending_events = Vec::new();
- for monitor_state in self.monitors.read().unwrap().values() {
- pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
- }
- for event in pending_events {
- handler.handle_event(event);
- }
- }
- #[cfg(anchors)]
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
/// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
/// within each channel. As the confirmation of a commitment transaction may be critical to the
- /// safety of funds, this method must be invoked frequently, ideally once for every chain tip
- /// update (block connected or disconnected).
+ /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
+ /// environment with spotty connections, like on mobile.
///
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
/// order to handle these events.
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
/// [`BumpTransaction`]: events::Event::BumpTransaction
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
- let mut pending_events = Vec::new();
for monitor_state in self.monitors.read().unwrap().values() {
- pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
- }
- for event in pending_events {
- handler.handle_event(event);
+ monitor_state.monitor.process_pending_events(&handler);
}
}
}
#[cfg(test)]
mod tests {
- use bitcoin::{BlockHeader, TxMerkleNode};
- use bitcoin::hashes::Hash;
use crate::{check_added_monitors, check_closed_broadcast, check_closed_event};
- use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+ use crate::{expect_payment_claimed, expect_payment_path_successful, get_event_msg};
use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
- expect_payment_sent_without_paths!(nodes[0], payment_preimage_1);
+ expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_second_updates.update_fulfill_htlcs[0]);
- expect_payment_sent_without_paths!(nodes[0], payment_preimage_2);
+ expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
// Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
// channel is now closed, but the ChannelManager doesn't know that yet.
- let new_header = BlockHeader {
- version: 2, time: 0, bits: 0, nonce: 0,
- prev_blockhash: nodes[0].best_block_info().0,
- merkle_root: TxMerkleNode::all_zeros() };
+ let new_header = create_dummy_header(nodes[0].best_block_info().0, 0);
nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
&[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
assert!(err.contains("ChannelMonitor storage failure")));
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
check_closed_broadcast!(nodes[0], true);
- check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
+ check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
+ [nodes[1].node.get_our_node_id()], 100000);
// However, as the ChainMonitor is still waiting for the original persistence to complete,
// it won't yet release the MonitorEvents.
if block_timeout {
// After three blocks, pending MontiorEvents should be released either way.
- let latest_header = BlockHeader {
- version: 2, time: 0, bits: 0, nonce: 0,
- prev_blockhash: nodes[0].best_block_info().0,
- merkle_root: TxMerkleNode::all_zeros() };
+ let latest_header = create_dummy_header(nodes[0].best_block_info().0, 0);
nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
} else {
let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
}
}
- expect_payment_sent!(nodes[0], payment_preimage);
+ expect_payment_sent(&nodes[0], payment_preimage, None, true, false);
}
#[test]
// ... however once we get events once, the channel will close, creating a channel-closed
// ChannelMonitorUpdate.
check_closed_broadcast!(nodes[0], true);
- check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() });
+ check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() },
+ [nodes[1].node.get_our_node_id()], 100000);
check_added_monitors!(nodes[0], 1);
}
}