From 89ad05954891276dfb8524be904af78a8ec7ee82 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Oct 2021 23:59:47 +0000 Subject: [PATCH] Use an opaque type to describe monitor updates in Persist In the next commit, we'll be originating monitor updates both from the ChainMonitor and from the ChannelManager, making simple sequential update IDs impossible. Further, the existing async monitor update API was somewhat hard to work with - instead of being able to generate monitor_updated callbacks whenever a persistence process finishes, you had to ensure you only did so at least once all previous updates had also been persisted. Here we eat the complexity for the user by moving to an opaque type for monitor updates, tracking which updates are in-flight for the user and only generating monitor-persisted events once all pending updates have been committed. --- fuzz/src/chanmon_consistency.rs | 18 +- fuzz/src/utils/test_persister.rs | 5 +- lightning-persister/src/lib.rs | 12 +- lightning/src/chain/chainmonitor.rs | 169 ++++++++++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 92 +++++----- lightning/src/util/test_utils.rs | 13 +- 6 files changed, 206 insertions(+), 103 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 63e179ac4..28bab5a28 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -855,25 +855,25 @@ pub fn do_test(data: &[u8], out: Out) { 0x08 => { if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[0].process_monitor_events(); } }, 0x09 => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[1].process_monitor_events(); } }, 0x0a => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[1].process_monitor_events(); } }, 0x0b => { if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[2].process_monitor_events(); } }, @@ -1075,25 +1075,25 @@ pub fn do_test(data: &[u8], out: Out) { // Test that no channel is in a stuck state where neither party can send funds even // after we resolve all pending events. // First make sure there are no pending monitor updates, resetting the error state - // and calling channel_monitor_updated for each monitor. + // and calling force_channel_monitor_updated for each monitor. *monitor_a.persister.update_ret.lock().unwrap() = Ok(()); *monitor_b.persister.update_ret.lock().unwrap() = Ok(()); *monitor_c.persister.update_ret.lock().unwrap() = Ok(()); if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[0].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id); nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id); nodes[2].process_monitor_events(); } diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index f02f8587a..4c18d8261 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -1,5 +1,6 @@ use lightning::chain; use lightning::chain::{chainmonitor, channelmonitor}; +use lightning::chain::chainmonitor::MonitorUpdateId; use lightning::chain::transaction::OutPoint; use lightning::util::enforcing_trait_impls::EnforcingSigner; @@ -9,11 +10,11 @@ pub struct TestPersister { pub update_ret: Mutex>, } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { self.update_ret.lock().unwrap().clone() } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { self.update_ret.lock().unwrap().clone() } } diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 88b103c71..4ba6d96ea 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -159,13 +159,13 @@ impl FilesystemPersister { } impl chainmonitor::Persist for FilesystemPersister { - fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) } - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); util::write_to_file(self.path_to_monitor_data(), filename, monitor) .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) @@ -296,6 +296,8 @@ mod tests { nodes[1].node.force_close_channel(&chan.2).unwrap(); check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); + let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); + let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap(); // Set the persister's directory to read-only, which should result in // returning a permanent failure when we then attempt to persist a @@ -309,7 +311,7 @@ mod tests { txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match persister.persist_new_channel(test_txo, &added_monitors[0].1) { + match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { Err(ChannelMonitorUpdateErr::PermanentFailure) => {}, _ => panic!("unexpected result from persisting new channel") } @@ -333,6 +335,8 @@ mod tests { nodes[1].node.force_close_channel(&chan.2).unwrap(); check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); + let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); + let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap(); // Create the persister with an invalid directory name and test that the // channel fails to open because the directories fail to be created. There @@ -344,7 +348,7 @@ mod tests { txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match persister.persist_new_channel(test_txo, &added_monitors[0].1) { + match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { Err(ChannelMonitorUpdateErr::PermanentFailure) => {}, _ => panic!("unexpected result from persisting new channel") } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9e92264b0..323598f20 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -33,44 +33,75 @@ use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, Monit use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; +use util::errors::APIError; use util::events; use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::{RwLock, RwLockReadGuard, Mutex}; +use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +enum UpdateOrigin { + OffChain(u64), +} + +/// An opaque identifier describing a specific [`Persist`] method call. +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +pub struct MonitorUpdateId { + contents: UpdateOrigin, +} + +impl MonitorUpdateId { + pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self { + Self { contents: UpdateOrigin::OffChain(update.update_id) } + } + pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { + Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) } + } +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// -/// Note that for every new monitor, you **must** persist the new `ChannelMonitor` -/// to disk/backups. And, on every update, you **must** persist either the -/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk -/// of situations such as revoking a transaction, then crashing before this -/// revocation can be persisted, then unintentionally broadcasting a revoked -/// transaction and losing money. This is a risk because previous channel states -/// are toxic, so it's important that whatever channel state is persisted is -/// kept up-to-date. +/// Each method can return three possible values: +/// * If persistence (including any relevant `fsync()` calls) happens immediately, the +/// implementation should return `Ok(())`, indicating normal channel operation should continue. +/// * If persistence happens asynchronously, implementations should first ensure the +/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return +/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the +/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be +/// called with the corresponding [`MonitorUpdateId`]. +/// +/// Note that unlike the direct [`chain::Watch`] interface, +/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// +/// * If persistence fails for some reason, implementations should return +/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be +/// closed without broadcasting the latest state. See +/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. pub trait Persist { - /// Persist a new channel's data. The data can be stored any way you want, but - /// the identifier provided by Rust-Lightning is the channel's outpoint (and - /// it is up to you to maintain a correct mapping between the outpoint and the - /// stored channel data). Note that you **must** persist every new monitor to - /// disk. See the `Persist` trait documentation for more details. + /// Persist a new channel's data. The data can be stored any way you want, but the identifier + /// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct + /// mapping between the outpoint and the stored channel data). Note that you **must** persist + /// every new monitor to disk. + /// + /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], + /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; - /// Update one channel's data. The provided `ChannelMonitor` has already - /// applied the given update. + /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given + /// update. /// - /// Note that on every update, you **must** persist either the - /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See - /// the `Persist` trait documentation for more details. + /// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the + /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more + /// details. /// /// If an implementer chooses to persist the updates only, they need to make /// sure that all the updates are applied to the `ChannelMonitors` *before* @@ -84,16 +115,33 @@ pub trait Persist { /// them in batches. The size of each monitor grows `O(number of state updates)` /// whereas updates are small and `O(1)`. /// + /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], + /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + fn update_persisted_channel(&self, channel_id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; } struct MonitorHolder { monitor: ChannelMonitor, + /// The full set of pending monitor updates for this Channel. + /// + /// Note that this lock must be held during updates to prevent a race where we call + /// update_persisted_channel, the user returns a TemporaryFailure, and then calls + /// channel_monitor_updated immediately, racing our insertion of the pending update into the + /// contained Vec. + pending_monitor_updates: Mutex>, +} + +impl MonitorHolder { + fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { + pending_monitor_updates_lock.iter().any(|update_id| + if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) + } } /// A read-only reference to a current ChannelMonitor. @@ -267,23 +315,61 @@ where C::Target: chain::Filter, /// Indicates the persistence of a [`ChannelMonitor`] has completed after /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. /// - /// All ChannelMonitor updates up to and including highest_applied_update_id must have been - /// fully committed in every copy of the given channels' ChannelMonitors. - /// - /// Note that there is no effect to calling with a highest_applied_update_id other than the - /// current latest ChannelMonitorUpdate and one call to this function after multiple - /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field - /// exists largely only to prevent races between this and concurrent update_monitor calls. - /// /// Thus, the anticipated use is, at a high level: /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], - /// 2) once all remote copies are updated, you call this function with the update_id that - /// completed, and once it is the latest the Channel will be re-enabled. - pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) { + /// 2) once all remote copies are updated, you call this function with the + /// `completed_update_id` that completed, and once all pending updates have completed the + /// channel will be re-enabled. + // Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't + // care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We + // only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s. + /// + /// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently + /// registered [`ChannelMonitor`]s. + pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> { + let monitors = self.monitors.read().unwrap(); + let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else { + return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) }); + }; + let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap(); + pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); + + match completed_update_id { + MonitorUpdateId { .. } => { + // Note that we only check for `UpdateOrigin::OffChain` failures here - if + // we're being told that a `UpdateOrigin::OffChain` monitor update completed, + // we only care about ensuring we don't tell the `ChannelManager` to restore + // the channel to normal operation until all `UpdateOrigin::OffChain` updates + // complete. + // If there's some `UpdateOrigin::ChainSync` update still pending that's okay + // - we can still update our channel state, just as long as we don't return + // `MonitorEvent`s from the monitor back to the `ChannelManager` until they + // complete. + let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct an + // UpdateCompleted event. + return Ok(()); + } + self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + funding_txo, + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); + } + } + Ok(()) + } + + /// This wrapper avoids having to update some of our tests for now as they assume the direct + /// chain::Watch API wherein we mark a monitor fully-updated by just calling + /// channel_monitor_updated once with the highest ID. + #[cfg(any(test, feature = "fuzztarget"))] + pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { - funding_txo, monitor_update_id: highest_applied_update_id + funding_txo, + monitor_update_id, }); } @@ -397,12 +483,16 @@ where C::Target: chain::Filter, return Err(ChannelMonitorUpdateErr::PermanentFailure)}, hash_map::Entry::Vacant(e) => e, }; - let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor); + let update_id = MonitorUpdateId::from_new_monitor(&monitor); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); if persist_res.is_err() { log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res); } if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { return persist_res; + } else if persist_res.is_err() { + pending_monitor_updates.push(update_id); } { let funding_txo = monitor.get_funding_txo(); @@ -412,7 +502,7 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(MonitorHolder { monitor }); + entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates) }); persist_res } @@ -442,8 +532,13 @@ where C::Target: chain::Filter, } // Even if updating the monitor returns an error, the monitor's state will // still be changed. So, persist the updated monitor despite the error. - let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor); - if let Err(ref e) = persist_res { + let update_id = MonitorUpdateId::from_monitor_update(&update); + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor, update_id); + if let Err(e) = persist_res { + if e == ChannelMonitorUpdateErr::TemporaryFailure { + pending_monitor_updates.push(update_id); + } log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e); } if update_res.is_err() { diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 819c71f4f..0b1c16d35 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -176,8 +176,8 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { } chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); @@ -329,8 +329,8 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); macro_rules! disconnect_reconnect_peers { () => { { @@ -627,8 +627,8 @@ fn test_monitor_update_fail_cs() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(responses.len(), 2); @@ -661,8 +661,8 @@ fn test_monitor_update_fail_cs() { } chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); @@ -721,8 +721,8 @@ fn test_monitor_update_fail_no_rebroadcast() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -782,8 +782,8 @@ fn test_monitor_update_raa_while_paused() { check_added_monitors!(nodes[0], 1); chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -908,8 +908,8 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // Restore monitor updating, ensuring we immediately get a fail-back update and a // update_add update. chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); check_added_monitors!(nodes[1], 1); @@ -1147,8 +1147,8 @@ fn test_monitor_update_fail_reestablish() { .contents.flags & 2, 0); // The "disabled" bit should be unset as we just reconnected chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1230,8 +1230,8 @@ fn raa_no_response_awaiting_raa_state() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); // nodes[1] should be AwaitingRAA here! check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1347,8 +1347,8 @@ fn claim_while_disconnected_monitor_update_fail() { // Now un-fail the monitor, which will result in B sending its original commitment update, // receiving the commitment update from A, and the resulting commitment dances. chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events(); @@ -1456,8 +1456,8 @@ fn monitor_failed_no_reestablish_response() { let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1549,8 +1549,8 @@ fn first_message_on_recv_ordering() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Previous monitor update failure prevented generation of RAA".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -1633,8 +1633,8 @@ fn test_monitor_update_fail_claim() { commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false, true); // Now restore monitor updating on the 0<->1 channel and claim the funds on B. - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1728,8 +1728,8 @@ fn test_monitor_update_on_pending_forwards() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1789,8 +1789,8 @@ fn monitor_update_claim_fail_no_response() { nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Temporary failure claiming HTLC, treating as success: Failed to update ChannelMonitor".to_string(), 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1848,8 +1848,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: check_added_monitors!(nodes[0], 1); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let events = nodes[0].node.get_and_clear_pending_events(); @@ -1880,8 +1880,8 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: } chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first { @@ -1965,8 +1965,8 @@ fn test_path_paused_mpp() { // And check that, after we successfully update the monitor for chan_2 we can pass the second // HTLC along to nodes[3] and claim the whole payment back to nodes[0]. - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None); @@ -2301,8 +2301,8 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { // If we finish updating the monitor, we should free the holding cell right away (this did // not occur prior to #756). chanmon_cfgs[0].persister.set_update_ret(Ok(())); - let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id); + let (funding_txo, mon_id, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_txo, mon_id); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2499,15 +2499,15 @@ fn test_temporary_error_during_shutdown() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id())); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); + let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update); nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id())); let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); @@ -2586,7 +2586,7 @@ fn double_temp_error() { // `claim_funds` results in a ChannelMonitorUpdate. assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let (funding_tx, latest_update_1) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + let (funding_tx, latest_update_1, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); // Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`, @@ -2595,11 +2595,11 @@ fn double_temp_error() { check_added_monitors!(nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(Ok(())); - let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1); + let (_, latest_update_2, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(funding_tx, latest_update_2); // Complete the first HTLC. let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 15eaa7d46..f0e453f9b 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -12,6 +12,7 @@ use chain::WatchedOutput; use chain::chaininterface; use chain::chaininterface::ConfirmationTarget; use chain::chainmonitor; +use chain::chainmonitor::MonitorUpdateId; use chain::channelmonitor; use chain::channelmonitor::MonitorEvent; use chain::transaction::OutPoint; @@ -88,7 +89,7 @@ impl keysinterface::KeysInterface for OnlyReadsKeysInterface { pub struct TestChainMonitor<'a> { pub added_monitors: Mutex)>>, - pub latest_monitor_update_id: Mutex>, + pub latest_monitor_update_id: Mutex>, pub chain_monitor: chainmonitor::ChainMonitor>, pub keys_manager: &'a TestKeysInterface, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a @@ -116,7 +117,8 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), self.keys_manager).unwrap().1; assert!(new_monitor == monitor); - self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, monitor.get_latest_update_id())); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), + (funding_txo, monitor.get_latest_update_id(), MonitorUpdateId::from_new_monitor(&monitor))); self.added_monitors.lock().unwrap().push((funding_txo, monitor)); self.chain_monitor.watch_channel(funding_txo, new_monitor) } @@ -136,7 +138,8 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } else { panic!(); } } - self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), (funding_txo, update.update_id)); + self.latest_monitor_update_id.lock().unwrap().insert(funding_txo.to_channel_id(), + (funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(&update))); let update_res = self.chain_monitor.update_channel(funding_txo, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... @@ -179,7 +182,7 @@ impl TestPersister { } } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; @@ -187,7 +190,7 @@ impl chainmonitor::Persist for TestPersiste ret } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), chain::ChannelMonitorUpdateErr> { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { let ret = self.update_ret.lock().unwrap().clone(); if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() { *self.update_ret.lock().unwrap() = next_ret; -- 2.39.5