//! servicing [`ChannelMonitor`] updates from the client.
use bitcoin::blockdata::block::BlockHeader;
-use bitcoin::hash_types::Txid;
-
-use chain;
-use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
-use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
-use chain::transaction::{OutPoint, TransactionData};
-use chain::keysinterface::Sign;
-use util::atomic_counter::AtomicCounter;
-use util::logger::Logger;
-use util::errors::APIError;
-use util::events;
-use util::events::EventHandler;
-use ln::channelmanager::ChannelDetails;
-
-use prelude::*;
-use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
+use bitcoin::hash_types::{Txid, BlockHash};
+
+use crate::chain;
+use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
+use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::chain::keysinterface::Sign;
+use crate::util::atomic_counter::AtomicCounter;
+use crate::util::logger::Logger;
+use crate::util::errors::APIError;
+use crate::util::events;
+use crate::util::events::{Event, EventHandler};
+use crate::ln::channelmanager::ChannelDetails;
+
+use crate::prelude::*;
+use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
///
/// Each method can return three possible values:
/// * If persistence (including any relevant `fsync()` calls) happens immediately, the
-/// implementation should return `Ok(())`, indicating normal channel operation should continue.
+/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
+/// channel operation should continue.
/// * If persistence happens asynchronously, implementations should first ensure the
/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
-/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
-/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
-/// called with the corresponding [`MonitorUpdateId`].
+/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
+/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
+/// the corresponding [`MonitorUpdateId`].
///
/// Note that unlike the direct [`chain::Watch`] interface,
/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
///
/// * If persistence fails for some reason, implementations should return
-/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
+/// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be
/// closed without broadcasting the latest state. See
-/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
+/// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details.
pub trait Persist<ChannelSigner: Sign> {
/// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
/// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
/// and the stored channel data). Note that you **must** persist every new monitor to disk.
///
/// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
- /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+ /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
///
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
- /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+ /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [`Writeable::write`]: crate::util::ser::Writeable::write
- fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+ fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
/// update.
/// whereas updates are small and `O(1)`.
///
/// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
- /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+ /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
///
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
/// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
- /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+ /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
- fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+ fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
}
struct MonitorHolder<ChannelSigner: Sign> {
/// The full set of pending monitor updates for this Channel.
///
/// Note that this lock must be held during updates to prevent a race where we call
- /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
- /// channel_monitor_updated immediately, racing our insertion of the pending update into the
- /// contained Vec.
+ /// update_persisted_channel, the user returns a
+ /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
+ /// immediately, racing our insertion of the pending update into the contained Vec.
///
/// Beyond the synchronization of updates themselves, we cannot handle user events until after
/// any chain updates have been stored on disk. Thus, we scan this list when returning updates
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
- // TemporaryFailure to always immediately be considered "overly delayed".
+ // InProgress to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
- Ok(()) =>
+ ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
- Err(ChannelMonitorUpdateErr::PermanentFailure) => {
+ ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
},
- Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
+ ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
}
+ #[cfg(not(c_bindings))]
+ /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+ pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
+ self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+ (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+ }).collect()
+ }
+
+ #[cfg(c_bindings)]
+ /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+ pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec<MonitorUpdateId>)> {
+ self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+ (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+ }).collect()
+ }
+
+
#[cfg(test)]
pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
}
/// Indicates the persistence of a [`ChannelMonitor`] has completed after
- /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
+ /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation.
///
/// Thus, the anticipated use is, at a high level:
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
- /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
+ /// returning [`ChannelMonitorUpdateStatus::InProgress`],
/// 2) once all remote copies are updated, you call this function with the
/// `completed_update_id` that completed, and once all pending updates have completed the
/// channel will be re-enabled.
if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
// If there are still monitor updates pending (or an old monitor update
// finished after a later one perm-failed), we cannot yet construct an
- // UpdateCompleted event.
+ // Completed event.
return Ok(());
}
- self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}], monitor_data.monitor.get_counterparty_node_id()));
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
let monitors = self.monitors.read().unwrap();
let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
- self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
funding_txo,
monitor_update_id,
}], counterparty_node_id));
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
- use util::events::EventsProvider;
+ use crate::util::events::EventsProvider;
let events = core::cell::RefCell::new(Vec::new());
- let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+ let event_handler = |event: events::Event| events.borrow_mut().push(event);
self.process_pending_events(&event_handler);
events.into_inner()
}
+
+ /// Processes any events asynchronously in the order they were generated since the last call
+ /// using the given event handler.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ ///
+ /// [`EventsProvider`]: crate::util::events::EventsProvider
+ pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ let mut pending_events = Vec::new();
+ for monitor_state in self.monitors.read().unwrap().values() {
+ pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+ }
+ for event in pending_events {
+ handler(event).await;
+ }
+ }
}
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
});
}
- fn get_relevant_txids(&self) -> Vec<Txid> {
+ fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
let mut txids = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
///
/// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor`
/// monitors lock.
- fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
+ fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus {
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
- return Err(ChannelMonitorUpdateErr::PermanentFailure)},
+ return ChannelMonitorUpdateStatus::PermanentFailure
+ },
hash_map::Entry::Vacant(e) => e,
};
log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
- if persist_res.is_err() {
- log_error!(self.logger, "Failed to persist new ChannelMonitor for channel {}: {:?}", log_funding_info!(monitor), persist_res);
- } else {
- log_trace!(self.logger, "Finished persisting new ChannelMonitor for channel {}", log_funding_info!(monitor));
- }
- if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
- return persist_res;
- } else if persist_res.is_err() {
- pending_monitor_updates.push(update_id);
+ match persist_res {
+ ChannelMonitorUpdateStatus::InProgress => {
+ log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
+ pending_monitor_updates.push(update_id);
+ },
+ ChannelMonitorUpdateStatus::PermanentFailure => {
+ log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor));
+ return persist_res;
+ },
+ ChannelMonitorUpdateStatus::Completed => {
+ log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
+ }
}
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source);
/// Note that we persist the given `ChannelMonitor` update while holding the
/// `ChainMonitor` monitors lock.
- fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
+ fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
#[cfg(any(test, fuzzing))]
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
#[cfg(not(any(test, fuzzing)))]
- Err(ChannelMonitorUpdateErr::PermanentFailure)
+ ChannelMonitorUpdateStatus::PermanentFailure
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
let update_id = MonitorUpdateId::from_monitor_update(&update);
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
- if let Err(e) = persist_res {
- if e == ChannelMonitorUpdateErr::TemporaryFailure {
+ match persist_res {
+ ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
- } else {
+ log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
+ },
+ ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
- }
- log_error!(self.logger, "Failed to persist ChannelMonitor update for channel {}: {:?}", log_funding_info!(monitor), e);
- } else {
- log_trace!(self.logger, "Finished persisting ChannelMonitor update for channel {}", log_funding_info!(monitor));
+ log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor));
+ },
+ ChannelMonitorUpdateStatus::Completed => {
+ log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+ },
}
if update_res.is_err() {
- Err(ChannelMonitorUpdateErr::PermanentFailure)
+ ChannelMonitorUpdateStatus::PermanentFailure
} else if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
- Err(ChannelMonitorUpdateErr::PermanentFailure)
+ ChannelMonitorUpdateStatus::PermanentFailure
} else {
persist_res
}
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
+ #[cfg(not(anchors))]
+ /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+ ///
+ /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+ /// order to handle these events.
+ ///
+ /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+ fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+ let mut pending_events = Vec::new();
+ for monitor_state in self.monitors.read().unwrap().values() {
+ pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+ }
+ for event in pending_events {
+ handler.handle_event(event);
+ }
+ }
+ #[cfg(anchors)]
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
+ /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
+ /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
+ /// within each channel. As the confirmation of a commitment transaction may be critical to the
+ /// safety of funds, this method must be invoked frequently, ideally once for every chain tip
+ /// update (block connected or disconnected).
+ ///
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
/// order to handle these events.
///
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
+ /// [`BumpTransaction`]: events::Event::BumpTransaction
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
let mut pending_events = Vec::new();
for monitor_state in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
}
- for event in pending_events.drain(..) {
- handler.handle_event(&event);
+ for event in pending_events {
+ handler.handle_event(event);
}
}
}
#[cfg(test)]
mod tests {
- use bitcoin::BlockHeader;
- use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
- use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
- use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
- use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
- use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
- use ln::channelmanager::PaymentSendFailure;
- use ln::features::InitFeatures;
- use ln::functional_test_utils::*;
- use ln::msgs::ChannelMessageHandler;
- use util::errors::APIError;
- use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
+ use bitcoin::{BlockHeader, TxMerkleNode};
+ use bitcoin::hashes::Hash;
+ use crate::{check_added_monitors, check_closed_broadcast, check_closed_event};
+ use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+ use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
+ use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
+ use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
+ use crate::ln::channelmanager::{self, PaymentSendFailure, PaymentId};
+ use crate::ln::functional_test_utils::*;
+ use crate::ln::msgs::ChannelMessageHandler;
+ use crate::util::errors::APIError;
+ use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
#[test]
fn test_async_ooo_offchain_updates() {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
- create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
// Route two payments to be claimed at the same time.
let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
- chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+ chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
check_added_monitors!(nodes[1], 1);
expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
- chanmon_cfgs[1].persister.set_update_ret(Ok(()));
+ chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
assert_eq!(persistences.len(), 1);
// Note that updates is a HashMap so the ordering here is actually random. This shouldn't
// fail either way but if it fails intermittently it's depending on the ordering of updates.
let mut update_iter = updates.iter();
- nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
+ let next_update = update_iter.next().unwrap().clone();
+ // Should contain next_update when pending updates listed.
+ #[cfg(not(c_bindings))]
+ assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+ .unwrap().contains(&next_update));
+ #[cfg(c_bindings)]
+ assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+ .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
+ nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap();
+ // Should not contain the previously pending next_update when pending updates listed.
+ #[cfg(not(c_bindings))]
+ assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+ .unwrap().contains(&next_update));
+ #[cfg(c_bindings)]
+ assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+ .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let channel = create_announced_chan_between_nodes(
- &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
// Get a route for later and rebalance the channel somewhat
send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
// Temp-fail the block connection which will hold the channel-closed event
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
- chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+ chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
// Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
// channel is now closed, but the ChannelManager doesn't know that yet.
let new_header = BlockHeader {
version: 2, time: 0, bits: 0, nonce: 0,
prev_blockhash: nodes[0].best_block_info().0,
- merkle_root: Default::default() };
+ merkle_root: TxMerkleNode::all_zeros() };
nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
&[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
// If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
// the update through to the ChannelMonitor which will refuse it (as the channel is closed).
- chanmon_cfgs[0].persister.set_update_ret(Ok(()));
- unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)),
+ chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
+ unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)),
true, APIError::ChannelUnavailable { ref err },
assert!(err.contains("ChannelMonitor storage failure")));
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
let latest_header = BlockHeader {
version: 2, time: 0, bits: 0, nonce: 0,
prev_blockhash: nodes[0].best_block_info().0,
- merkle_root: Default::default() };
+ merkle_root: TxMerkleNode::all_zeros() };
nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
} else {
let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
- create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+ create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
- chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::PermanentFailure));
+ chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
connect_blocks(&nodes[0], 1);
// Before processing events, the ChannelManager will still think the Channel is open and