Move ChannelManager::monitor_updated to a MonitorEvent
authorMatt Corallo <git@bluematt.me>
Thu, 7 Oct 2021 18:51:49 +0000 (18:51 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 19 Oct 2021 23:49:04 +0000 (23:49 +0000)
In the next commit we'll need ChainMonitor to "see" when a monitor
persistence completes, which means `monitor_updated` needs to move
to `ChainMonitor`. The simplest way to then communicate that
information to `ChannelManager` is via `MonitorEvet`s, which seems
to line up ok, even if they're now constructed by multiple
different places.

fuzz/src/chanmon_consistency.rs
lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/chain/mod.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs

index f3b952ff9dba74b10f921be769d06c355747a63e..63e179ac469308f1159f9e427df9ef9b57fc9740 100644 (file)
@@ -855,22 +855,26 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
 
                        0x08 => {
                                if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[0].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[0].process_monitor_events();
                                }
                        },
                        0x09 => {
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                        },
                        0x0a => {
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                        },
                        0x0b => {
                                if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[2].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[2].process_monitor_events();
                                }
                        },
 
@@ -1077,16 +1081,20 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                                *monitor_c.persister.update_ret.lock().unwrap() = Ok(());
 
                                if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[0].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[0].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_1_funding, *id);
+                                       monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[1].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[1].process_monitor_events();
                                }
                                if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
-                                       nodes[2].channel_monitor_updated(&chan_2_funding, *id);
+                                       monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
+                                       nodes[2].process_monitor_events();
                                }
 
                                // Next, make sure peers are all connected to each other
index f1ce0f79ae9cd99860283ec72c00df49430c87a3..9e92264b0425dffe07049d9acaf4c80e75df20bc 100644 (file)
@@ -38,7 +38,7 @@ use util::events::EventHandler;
 use ln::channelmanager::ChannelDetails;
 
 use prelude::*;
-use sync::{RwLock, RwLockReadGuard};
+use sync::{RwLock, RwLockReadGuard, Mutex};
 use core::ops::Deref;
 
 /// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -134,6 +134,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        logger: L,
        fee_estimator: F,
        persister: P,
+       pending_monitor_events: Mutex<Vec<MonitorEvent>>,
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -207,6 +208,7 @@ where C::Target: chain::Filter,
                        logger,
                        fee_estimator: feeest,
                        persister,
+                       pending_monitor_events: Mutex::new(Vec::new()),
                }
        }
 
@@ -262,6 +264,29 @@ where C::Target: chain::Filter,
                self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
        }
 
+       /// Indicates the persistence of a [`ChannelMonitor`] has completed after
+       /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
+       ///
+       /// All ChannelMonitor updates up to and including highest_applied_update_id must have been
+       /// fully committed in every copy of the given channels' ChannelMonitors.
+       ///
+       /// Note that there is no effect to calling with a highest_applied_update_id other than the
+       /// current latest ChannelMonitorUpdate and one call to this function after multiple
+       /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
+       /// exists largely only to prevent races between this and concurrent update_monitor calls.
+       ///
+       /// Thus, the anticipated use is, at a high level:
+       ///  1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
+       ///     update to disk and begins updating any remote (e.g. watchtower/backup) copies,
+       ///     returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
+       ///  2) once all remote copies are updated, you call this function with the update_id that
+       ///     completed, and once it is the latest the Channel will be re-enabled.
+       pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) {
+               self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+                       funding_txo, monitor_update_id: highest_applied_update_id
+               });
+       }
+
        #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                use util::events::EventsProvider;
@@ -431,7 +456,7 @@ where C::Target: chain::Filter,
        }
 
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
-               let mut pending_monitor_events = Vec::new();
+               let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
                        pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
                }
index 4c7da902be94455589b354f39358775d7d8cd31f..a7506acb4dc5ba6bb8ed8db88731b9f51fa6e57c 100644 (file)
@@ -131,8 +131,29 @@ pub enum MonitorEvent {
 
        /// A monitor event that the Channel's commitment transaction was confirmed.
        CommitmentTxConfirmed(OutPoint),
+
+       /// Indicates a [`ChannelMonitor`] update has completed. See
+       /// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used.
+       ///
+       /// [`ChannelMonitorUpdateErr::TemporaryFailure`]: super::ChannelMonitorUpdateErr::TemporaryFailure
+       UpdateCompleted {
+               /// The funding outpoint of the [`ChannelMonitor`] that was updated
+               funding_txo: OutPoint,
+               /// The Update ID from [`ChannelMonitorUpdate::update_id`] which was applied or
+               /// [`ChannelMonitor::get_latest_update_id`].
+               ///
+               /// Note that this should only be set to a given update's ID if all previous updates for the
+               /// same [`ChannelMonitor`] have been applied and persisted.
+               monitor_update_id: u64,
+       },
 }
-impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ;
+impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
+       // Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor
+       (0, UpdateCompleted) => {
+               (0, funding_txo, required),
+               (2, monitor_update_id, required),
+       },
+;
        (2, HTLCEvent),
        (4, CommitmentTxConfirmed),
 );
@@ -854,14 +875,19 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
                        writer.write_all(&payment_preimage.0[..])?;
                }
 
-               writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
+               writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev {
+                       MonitorEvent::HTLCEvent(_) => true,
+                       MonitorEvent::CommitmentTxConfirmed(_) => true,
+                       _ => false,
+               }).count() as u64).to_be_bytes())?;
                for event in self.pending_monitor_events.iter() {
                        match event {
                                MonitorEvent::HTLCEvent(upd) => {
                                        0u8.write(writer)?;
                                        upd.write(writer)?;
                                },
-                               MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?
+                               MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?,
+                               _ => {}, // Covered in the TLV writes below
                        }
                }
 
index f22d152ec4c7b141ef87050d64ebd84f14561426..55755073bb1a7546d39ce65fd2fd8c9d0e9b4af5 100644 (file)
@@ -182,9 +182,10 @@ pub enum ChannelMonitorUpdateErr {
        /// our state failed, but is expected to succeed at some point in the future).
        ///
        /// Such a failure will "freeze" a channel, preventing us from revoking old states or
-       /// submitting new commitment transactions to the counterparty. Once the update(s) which failed
-       /// have been successfully applied, ChannelManager::channel_monitor_updated can be used to
-       /// restore the channel to an operational state.
+       /// submitting new commitment transactions to the counterparty. Once the update(s) that failed
+       /// have been successfully applied, a [`MonitorEvent::UpdateCompleted`] event should be returned
+       /// via [`Watch::release_pending_monitor_events`] which will then restore the channel to an
+       /// operational state.
        ///
        /// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If
        /// you return a TemporaryFailure you must ensure that it is written to disk safely before
@@ -198,13 +199,14 @@ pub enum ChannelMonitorUpdateErr {
        /// the channel which would invalidate previous ChannelMonitors are not made when a channel has
        /// been "frozen".
        ///
-       /// Note that even if updates made after TemporaryFailure succeed you must still call
-       /// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel
-       /// operation.
+       /// Note that even if updates made after TemporaryFailure succeed you must still provide a
+       /// [`MonitorEvent::UpdateCompleted`] to ensure you have the latest monitor and re-enable
+       /// normal channel operation. Note that this is normally generated through a call to
+       /// [`ChainMonitor::channel_monitor_updated`].
        ///
-       /// Note that the update being processed here will not be replayed for you when you call
-       /// ChannelManager::channel_monitor_updated, so you must store the update itself along
-       /// with the persisted ChannelMonitor on your own local disk prior to returning a
+       /// Note that the update being processed here will not be replayed for you when you return a
+       /// [`MonitorEvent::UpdateCompleted`] event via [`Watch::release_pending_monitor_events`], so
+       /// you must store the update itself on your own local disk prior to returning a
        /// TemporaryFailure. You may, of course, employ a journaling approach, storing only the
        /// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at
        /// reload-time.
@@ -212,6 +214,8 @@ pub enum ChannelMonitorUpdateErr {
        /// For deployments where a copy of ChannelMonitors and other local state are backed up in a
        /// remote location (with local copies persisted immediately), it is anticipated that all
        /// updates will return TemporaryFailure until the remote copies could be updated.
+       ///
+       /// [`ChainMonitor::channel_monitor_updated`]: chainmonitor::ChainMonitor::channel_monitor_updated
        TemporaryFailure,
        /// Used to indicate no further channel monitor updates will be allowed (eg we've moved on to a
        /// different watchtower and cannot update with all watchtowers that were previously informed
@@ -280,6 +284,9 @@ pub trait Watch<ChannelSigner: Sign> {
 
        /// Returns any monitor events since the last call. Subsequent calls must only return new
        /// events.
+       ///
+       /// For details on asynchronous [`ChannelMonitor`] updating and returning
+       /// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
 }
 
index 6d62f8db92b28d173700084c1645d90f5119f373..819c71f4fdb902d7a51d625e49d4f4da23efec90 100644 (file)
@@ -177,7 +177,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
 
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[0], 0);
 
        let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events();
@@ -330,7 +330,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
        // Now fix monitor updating...
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[0], 0);
 
        macro_rules! disconnect_reconnect_peers { () => { {
@@ -628,7 +628,7 @@ fn test_monitor_update_fail_cs() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
        let responses = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(responses.len(), 2);
@@ -662,7 +662,7 @@ fn test_monitor_update_fail_cs() {
 
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[0], 0);
 
        let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
@@ -722,7 +722,7 @@ fn test_monitor_update_fail_no_rebroadcast() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        check_added_monitors!(nodes[1], 0);
        expect_pending_htlcs_forwardable!(nodes[1]);
@@ -783,7 +783,7 @@ fn test_monitor_update_raa_while_paused() {
 
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[0], 0);
 
        let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
@@ -909,7 +909,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
        // update_add update.
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
        expect_pending_htlcs_forwardable!(nodes[1]);
        check_added_monitors!(nodes[1], 1);
@@ -1148,7 +1148,7 @@ fn test_monitor_update_fail_reestablish() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1231,7 +1231,7 @@ fn raa_no_response_awaiting_raa_state() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        // nodes[1] should be AwaitingRAA here!
        check_added_monitors!(nodes[1], 0);
        let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1348,7 +1348,7 @@ fn claim_while_disconnected_monitor_update_fail() {
        // receiving the commitment update from A, and the resulting commitment dances.
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events();
@@ -1457,7 +1457,7 @@ fn monitor_failed_no_reestablish_response() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
        let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
 
@@ -1550,7 +1550,7 @@ fn first_message_on_recv_ordering() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        expect_pending_htlcs_forwardable!(nodes[1]);
@@ -1634,7 +1634,7 @@ fn test_monitor_update_fail_claim() {
 
        // Now restore monitor updating on the 0<->1 channel and claim the funds on B.
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1729,7 +1729,7 @@ fn test_monitor_update_on_pending_forwards() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1790,7 +1790,7 @@ fn monitor_update_claim_fail_no_response() {
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
@@ -1849,7 +1849,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
        assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[0], 0);
 
        let events = nodes[0].node.get_and_clear_pending_events();
@@ -1881,7 +1881,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        check_added_monitors!(nodes[1], 0);
 
        let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
@@ -1966,7 +1966,7 @@ fn test_path_paused_mpp() {
        // And check that, after we successfully update the monitor for chan_2 we can pass the second
        // HTLC along to nodes[3] and claim the whole payment back to nodes[0].
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        let mut events = nodes[0].node.get_and_clear_pending_msg_events();
        assert_eq!(events.len(), 1);
        pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None);
@@ -2302,7 +2302,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
        // not occur prior to #756).
        chanmon_cfgs[0].persister.set_update_ret(Ok(()));
        let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&funding_txo, mon_id);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id);
 
        // New outbound messages should be generated immediately upon a call to
        // get_and_clear_pending_msg_events (but not before).
@@ -2500,14 +2500,14 @@ fn test_temporary_error_during_shutdown() {
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
 
        let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
        nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()));
 
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
        let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
 
        nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id()));
        let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
@@ -2596,10 +2596,10 @@ fn double_temp_error() {
        chanmon_cfgs[1].persister.set_update_ret(Ok(()));
 
        let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
-       nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_1);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        check_added_monitors!(nodes[1], 0);
-       nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_2);
+       nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2);
 
        // Complete the first HTLC.
        let events = nodes[1].node.get_and_clear_pending_msg_events();
index 77faac0898fdc61b93b2cd688973a4a33be0649a..72f0bebdf1d3c1eb7cf9e505e75daaa286612884 100644 (file)
@@ -3385,27 +3385,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.our_network_pubkey.clone()
        }
 
-       /// Restores a single, given channel to normal operation after a
-       /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
-       /// operation.
-       ///
-       /// All ChannelMonitor updates up to and including highest_applied_update_id must have been
-       /// fully committed in every copy of the given channels' ChannelMonitors.
-       ///
-       /// Note that there is no effect to calling with a highest_applied_update_id other than the
-       /// current latest ChannelMonitorUpdate and one call to this function after multiple
-       /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
-       /// exists largely only to prevent races between this and concurrent update_monitor calls.
-       ///
-       /// Thus, the anticipated use is, at a high level:
-       ///  1) You register a chain::Watch with this ChannelManager,
-       ///  2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of
-       ///     said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures
-       ///     any time it cannot do so instantly,
-       ///  3) update(s) are applied to each remote copy of a ChannelMonitor,
-       ///  4) once all remote copies are updated, you call this function with the update_id that
-       ///     completed, and once it is the latest the Channel will be re-enabled.
-       pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
+       fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let chan_restoration_res;
@@ -4135,6 +4115,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                });
                                        }
                                },
+                               MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
+                                       self.channel_monitor_updated(&funding_txo, monitor_update_id);
+                               },
                        }
                }
 
@@ -4145,6 +4128,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                has_pending_monitor_events
        }
 
+       /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without
+       /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor
+       /// update events as a separate process method here.
+       #[cfg(feature = "fuzztarget")]
+       pub fn process_monitor_events(&self) {
+               self.process_pending_monitor_events();
+       }
+
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
        /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
        /// update was applied.