//! servicing [`ChannelMonitor`] updates from the client.
use bitcoin::blockdata::block::BlockHeader;
-use bitcoin::hash_types::Txid;
-
-use chain;
-use chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
-use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
-use chain::transaction::{OutPoint, TransactionData};
-use chain::keysinterface::Sign;
-use util::atomic_counter::AtomicCounter;
-use util::logger::Logger;
-use util::errors::APIError;
-use util::events;
-use util::events::EventHandler;
-use ln::channelmanager::ChannelDetails;
-
-use prelude::*;
-use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
+use bitcoin::hash_types::{Txid, BlockHash};
+
+use crate::chain;
+use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
+use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::chain::keysinterface::Sign;
+use crate::util::atomic_counter::AtomicCounter;
+use crate::util::logger::Logger;
+use crate::util::errors::APIError;
+use crate::util::events;
+use crate::util::events::{Event, EventHandler};
+use crate::ln::channelmanager::ChannelDetails;
+
+use crate::prelude::*;
+use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
}
+ #[cfg(not(c_bindings))]
+ /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+ pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
+ self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+ (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+ }).collect()
+ }
+
+ #[cfg(c_bindings)]
+ /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+ pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec<MonitorUpdateId>)> {
+ self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+ (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+ }).collect()
+ }
+
+
#[cfg(test)]
pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
- use util::events::EventsProvider;
+ use crate::util::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
- let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+ let event_handler = |event: events::Event| events.borrow_mut().push(event);
self.process_pending_events(&event_handler);
events.into_inner()
}
+
+ /// Processes any events asynchronously in the order they were generated since the last call
+ /// using the given event handler.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ ///
+ /// [`EventsProvider`]: crate::util::events::EventsProvider
+ pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ let mut pending_events = Vec::new();
+ for monitor_state in self.monitors.read().unwrap().values() {
+ pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+ }
+ for event in pending_events {
+ handler(event).await;
+ }
+ }
}
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
});
}
- fn get_relevant_txids(&self) -> Vec<Txid> {
+ fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
let mut txids = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
for monitor_state in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
}
- for event in pending_events.drain(..) {
- handler.handle_event(&event);
+ for event in pending_events {
+ handler.handle_event(event);
}
}
#[cfg(anchors)]
for monitor_state in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
}
- for event in pending_events.drain(..) {
- handler.handle_event(&event);
+ for event in pending_events {
+ handler.handle_event(event);
}
}
}
mod tests {
use bitcoin::{BlockHeader, TxMerkleNode};
use bitcoin::hashes::Hash;
- use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
- use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
- use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
- use chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
- use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
- use ln::channelmanager::{self, PaymentSendFailure};
- use ln::functional_test_utils::*;
- use ln::msgs::ChannelMessageHandler;
- use util::errors::APIError;
- use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
+ use crate::{check_added_monitors, check_closed_broadcast, check_closed_event};
+ use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+ use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
+ use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
+ use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
+ use crate::ln::channelmanager::{self, PaymentSendFailure, PaymentId};
+ use crate::ln::functional_test_utils::*;
+ use crate::ln::msgs::ChannelMessageHandler;
+ use crate::util::errors::APIError;
+ use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
#[test]
fn test_async_ooo_offchain_updates() {
// Note that updates is a HashMap so the ordering here is actually random. This shouldn't
// fail either way but if it fails intermittently it's depending on the ordering of updates.
let mut update_iter = updates.iter();
- nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
+ let next_update = update_iter.next().unwrap().clone();
+ // Should contain next_update when pending updates listed.
+ #[cfg(not(c_bindings))]
+ assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+ .unwrap().contains(&next_update));
+ #[cfg(c_bindings)]
+ assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+ .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
+ nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap();
+ // Should not contain the previously pending next_update when pending updates listed.
+ #[cfg(not(c_bindings))]
+ assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+ .unwrap().contains(&next_update));
+ #[cfg(c_bindings)]
+ assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+ .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
// If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
// the update through to the ChannelMonitor which will refuse it (as the channel is closed).
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
- unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)),
+ unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)),
true, APIError::ChannelUnavailable { ref err },
assert!(err.contains("ChannelMonitor storage failure")));
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update