From 4500270488e6ed918c5f6e07310eb4a384eb6e21 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Oct 2021 18:51:49 +0000 Subject: [PATCH] Move ChannelManager::monitor_updated to a MonitorEvent In the next commit we'll need ChainMonitor to "see" when a monitor persistence completes, which means `monitor_updated` needs to move to `ChainMonitor`. The simplest way to then communicate that information to `ChannelManager` is via `MonitorEvet`s, which seems to line up ok, even if they're now constructed by multiple different places. --- fuzz/src/chanmon_consistency.rs | 24 ++++++---- lightning/src/chain/chainmonitor.rs | 29 +++++++++++- lightning/src/chain/channelmonitor.rs | 32 +++++++++++-- lightning/src/chain/mod.rs | 25 ++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 46 +++++++++---------- lightning/src/ln/channelmanager.rs | 33 +++++-------- 6 files changed, 123 insertions(+), 66 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f3b952ff..63e179ac 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -855,22 +855,26 @@ pub fn do_test(data: &[u8], out: Out) { 0x08 => { if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[0].channel_monitor_updated(&chan_1_funding, *id); + monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[0].process_monitor_events(); } }, 0x09 => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[1].channel_monitor_updated(&chan_1_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[1].process_monitor_events(); } }, 0x0a => { if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[1].channel_monitor_updated(&chan_2_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[1].process_monitor_events(); } }, 0x0b => { if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[2].channel_monitor_updated(&chan_2_funding, *id); + monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[2].process_monitor_events(); } }, @@ -1077,16 +1081,20 @@ pub fn do_test(data: &[u8], out: Out) { *monitor_c.persister.update_ret.lock().unwrap() = Ok(()); if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[0].channel_monitor_updated(&chan_1_funding, *id); + monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[0].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) { - nodes[1].channel_monitor_updated(&chan_1_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id); + nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[1].channel_monitor_updated(&chan_2_funding, *id); + monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[1].process_monitor_events(); } if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) { - nodes[2].channel_monitor_updated(&chan_2_funding, *id); + monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id); + nodes[2].process_monitor_events(); } // Next, make sure peers are all connected to each other diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index f1ce0f79..9e92264b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -38,7 +38,7 @@ use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::{RwLock, RwLockReadGuard}; +use sync::{RwLock, RwLockReadGuard, Mutex}; use core::ops::Deref; /// `Persist` defines behavior for persisting channel monitors: this could mean @@ -134,6 +134,7 @@ pub struct ChainMonitor>, } impl ChainMonitor @@ -207,6 +208,7 @@ where C::Target: chain::Filter, logger, fee_estimator: feeest, persister, + pending_monitor_events: Mutex::new(Vec::new()), } } @@ -262,6 +264,29 @@ where C::Target: chain::Filter, self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor } + /// Indicates the persistence of a [`ChannelMonitor`] has completed after + /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. + /// + /// All ChannelMonitor updates up to and including highest_applied_update_id must have been + /// fully committed in every copy of the given channels' ChannelMonitors. + /// + /// Note that there is no effect to calling with a highest_applied_update_id other than the + /// current latest ChannelMonitorUpdate and one call to this function after multiple + /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field + /// exists largely only to prevent races between this and concurrent update_monitor calls. + /// + /// Thus, the anticipated use is, at a high level: + /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the + /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, + /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], + /// 2) once all remote copies are updated, you call this function with the update_id that + /// completed, and once it is the latest the Channel will be re-enabled. + pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) { + self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted { + funding_txo, monitor_update_id: highest_applied_update_id + }); + } + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { use util::events::EventsProvider; @@ -431,7 +456,7 @@ where C::Target: chain::Filter, } fn release_pending_monitor_events(&self) -> Vec { - let mut pending_monitor_events = Vec::new(); + let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 4c7da902..a7506acb 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -131,8 +131,29 @@ pub enum MonitorEvent { /// A monitor event that the Channel's commitment transaction was confirmed. CommitmentTxConfirmed(OutPoint), + + /// Indicates a [`ChannelMonitor`] update has completed. See + /// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used. + /// + /// [`ChannelMonitorUpdateErr::TemporaryFailure`]: super::ChannelMonitorUpdateErr::TemporaryFailure + UpdateCompleted { + /// The funding outpoint of the [`ChannelMonitor`] that was updated + funding_txo: OutPoint, + /// The Update ID from [`ChannelMonitorUpdate::update_id`] which was applied or + /// [`ChannelMonitor::get_latest_update_id`]. + /// + /// Note that this should only be set to a given update's ID if all previous updates for the + /// same [`ChannelMonitor`] have been applied and persisted. + monitor_update_id: u64, + }, } -impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ; +impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, + // Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor + (0, UpdateCompleted) => { + (0, funding_txo, required), + (2, monitor_update_id, required), + }, +; (2, HTLCEvent), (4, CommitmentTxConfirmed), ); @@ -854,14 +875,19 @@ impl Writeable for ChannelMonitorImpl { writer.write_all(&payment_preimage.0[..])?; } - writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?; + writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::CommitmentTxConfirmed(_) => true, + _ => false, + }).count() as u64).to_be_bytes())?; for event in self.pending_monitor_events.iter() { match event { MonitorEvent::HTLCEvent(upd) => { 0u8.write(writer)?; upd.write(writer)?; }, - MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)? + MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below } } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index f22d152e..55755073 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -182,9 +182,10 @@ pub enum ChannelMonitorUpdateErr { /// our state failed, but is expected to succeed at some point in the future). /// /// Such a failure will "freeze" a channel, preventing us from revoking old states or - /// submitting new commitment transactions to the counterparty. Once the update(s) which failed - /// have been successfully applied, ChannelManager::channel_monitor_updated can be used to - /// restore the channel to an operational state. + /// submitting new commitment transactions to the counterparty. Once the update(s) that failed + /// have been successfully applied, a [`MonitorEvent::UpdateCompleted`] event should be returned + /// via [`Watch::release_pending_monitor_events`] which will then restore the channel to an + /// operational state. /// /// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If /// you return a TemporaryFailure you must ensure that it is written to disk safely before @@ -198,13 +199,14 @@ pub enum ChannelMonitorUpdateErr { /// the channel which would invalidate previous ChannelMonitors are not made when a channel has /// been "frozen". /// - /// Note that even if updates made after TemporaryFailure succeed you must still call - /// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel - /// operation. + /// Note that even if updates made after TemporaryFailure succeed you must still provide a + /// [`MonitorEvent::UpdateCompleted`] to ensure you have the latest monitor and re-enable + /// normal channel operation. Note that this is normally generated through a call to + /// [`ChainMonitor::channel_monitor_updated`]. /// - /// Note that the update being processed here will not be replayed for you when you call - /// ChannelManager::channel_monitor_updated, so you must store the update itself along - /// with the persisted ChannelMonitor on your own local disk prior to returning a + /// Note that the update being processed here will not be replayed for you when you return a + /// [`MonitorEvent::UpdateCompleted`] event via [`Watch::release_pending_monitor_events`], so + /// you must store the update itself on your own local disk prior to returning a /// TemporaryFailure. You may, of course, employ a journaling approach, storing only the /// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at /// reload-time. @@ -212,6 +214,8 @@ pub enum ChannelMonitorUpdateErr { /// For deployments where a copy of ChannelMonitors and other local state are backed up in a /// remote location (with local copies persisted immediately), it is anticipated that all /// updates will return TemporaryFailure until the remote copies could be updated. + /// + /// [`ChainMonitor::channel_monitor_updated`]: chainmonitor::ChainMonitor::channel_monitor_updated TemporaryFailure, /// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a /// different watchtower and cannot update with all watchtowers that were previously informed @@ -280,6 +284,9 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. + /// + /// For details on asynchronous [`ChannelMonitor`] updating and returning + /// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`]. fn release_pending_monitor_events(&self) -> Vec; } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 6d62f8db..819c71f4 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -177,7 +177,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); @@ -330,7 +330,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); macro_rules! disconnect_reconnect_peers { () => { { @@ -628,7 +628,7 @@ fn test_monitor_update_fail_cs() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(responses.len(), 2); @@ -662,7 +662,7 @@ fn test_monitor_update_fail_cs() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); @@ -722,7 +722,7 @@ fn test_monitor_update_fail_no_rebroadcast() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -783,7 +783,7 @@ fn test_monitor_update_raa_while_paused() { chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id()); @@ -909,7 +909,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // update_add update. chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); check_added_monitors!(nodes[1], 1); @@ -1148,7 +1148,7 @@ fn test_monitor_update_fail_reestablish() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1231,7 +1231,7 @@ fn raa_no_response_awaiting_raa_state() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); // nodes[1] should be AwaitingRAA here! check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1348,7 +1348,7 @@ fn claim_while_disconnected_monitor_update_fail() { // receiving the commitment update from A, and the resulting commitment dances. chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events(); @@ -1457,7 +1457,7 @@ fn monitor_failed_no_reestablish_response() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1550,7 +1550,7 @@ fn first_message_on_recv_ordering() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); expect_pending_htlcs_forwardable!(nodes[1]); @@ -1634,7 +1634,7 @@ fn test_monitor_update_fail_claim() { // Now restore monitor updating on the 0<->1 channel and claim the funds on B. let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1729,7 +1729,7 @@ fn test_monitor_update_on_pending_forwards() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); @@ -1790,7 +1790,7 @@ fn monitor_update_claim_fail_no_response() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1849,7 +1849,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[0], 0); let events = nodes[0].node.get_and_clear_pending_events(); @@ -1881,7 +1881,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf: chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); check_added_monitors!(nodes[1], 0); let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first { @@ -1966,7 +1966,7 @@ fn test_path_paused_mpp() { // And check that, after we successfully update the monitor for chan_2 we can pass the second // HTLC along to nodes[3] and claim the whole payment back to nodes[0]. let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None); @@ -2302,7 +2302,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { // not occur prior to #756). chanmon_cfgs[0].persister.set_update_ret(Ok(())); let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&funding_txo, mon_id); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id); // New outbound messages should be generated immediately upon a call to // get_and_clear_pending_msg_events (but not before). @@ -2500,14 +2500,14 @@ fn test_temporary_error_during_shutdown() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id())); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update); nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id())); let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); @@ -2596,10 +2596,10 @@ fn double_temp_error() { chanmon_cfgs[1].persister.set_update_ret(Ok(())); let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); - nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_1); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors!(nodes[1], 0); - nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_2); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2); // Complete the first HTLC. let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 77faac08..72f0bebd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3385,27 +3385,7 @@ impl ChannelMana self.our_network_pubkey.clone() } - /// Restores a single, given channel to normal operation after a - /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update - /// operation. - /// - /// All ChannelMonitor updates up to and including highest_applied_update_id must have been - /// fully committed in every copy of the given channels' ChannelMonitors. - /// - /// Note that there is no effect to calling with a highest_applied_update_id other than the - /// current latest ChannelMonitorUpdate and one call to this function after multiple - /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field - /// exists largely only to prevent races between this and concurrent update_monitor calls. - /// - /// Thus, the anticipated use is, at a high level: - /// 1) You register a chain::Watch with this ChannelManager, - /// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of - /// said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures - /// any time it cannot do so instantly, - /// 3) update(s) are applied to each remote copy of a ChannelMonitor, - /// 4) once all remote copies are updated, you call this function with the update_id that - /// completed, and once it is the latest the Channel will be re-enabled. - pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { + fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let chan_restoration_res; @@ -4135,6 +4115,9 @@ impl ChannelMana }); } }, + MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => { + self.channel_monitor_updated(&funding_txo, monitor_update_id); + }, } } @@ -4145,6 +4128,14 @@ impl ChannelMana has_pending_monitor_events } + /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without + /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor + /// update events as a separate process method here. + #[cfg(feature = "fuzztarget")] + pub fn process_monitor_events(&self) { + self.process_pending_monitor_events(); + } + /// Check the holding cell in each channel and free any pending HTLCs in them if possible. /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor /// update was applied. -- 2.30.2