Persist `ChannelMonitor`s after new blocks are connected
authorMatt Corallo <git@bluematt.me>
Wed, 13 Oct 2021 20:05:48 +0000 (20:05 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 20 Oct 2021 00:06:17 +0000 (00:06 +0000)
This resolves several user complaints (and issues in the sample
node) where startup is substantially delayed as we're always
waiting for the chain data to sync.

Further, in an upcoming PR, we'll be reloading pending payments
from ChannelMonitors on restart, at which point we'll need the
change here which avoids handling events until after the user
has confirmed the `ChannelMonitor` has been persisted to disk.
It will avoid a race where we
 * send a payment/HTLC (persisting the monitor to disk with the
   HTLC pending),
 * force-close the channel, removing the channel entry from the
   ChannelManager entirely,
 * persist the ChannelManager,
 * connect a block which contains a fulfill of the HTLC, generating
   a claim event,
 * handle the claim event while the `ChannelMonitor` is being
   persisted,
 * persist the ChannelManager (before the CHannelMonitor is
   persisted fully),
 * restart, reloading the HTLC as a pending payment in the
   ChannelManager, which now has no references to it except from
   the ChannelMonitor which still has the pending HTLC,
 * replay the block connection, generating a duplicate PaymentSent
   event.

fuzz/src/utils/test_persister.rs
lightning-persister/src/lib.rs
lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/chain/mod.rs
lightning/src/ln/channelmanager.rs
lightning/src/util/test_utils.rs

index 4c18d8261399ba2dce4189dc80dfe43010eea3fb..7ca1ff96d05ae5f0311a9702836800c01f012b65 100644 (file)
@@ -14,7 +14,7 @@ impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
                self.update_ret.lock().unwrap().clone()
        }
 
-       fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
+       fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
                self.update_ret.lock().unwrap().clone()
        }
 }
index 4ba6d96ea3f5d2aa1970fb1e622e0c2df14792d5..eba0692485a551403b0ae40dc4e74909a2c1bb71 100644 (file)
@@ -159,13 +159,18 @@ impl FilesystemPersister {
 }
 
 impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
+       // TODO: We really need a way for the persister to inform the user that its time to crash/shut
+       // down once these start returning failure.
+       // A PermanentFailure implies we need to shut down since we're force-closing channels without
+       // even broadcasting!
+
        fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
                let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
                util::write_to_file(self.path_to_monitor_data(), filename, monitor)
                        .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
        }
 
-       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
+       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
                let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
                util::write_to_file(self.path_to_monitor_data(), filename, monitor)
                        .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
index 323598f2061c32829b1c13e42c50bde30fee2432..d99d6708a3c71e209dc03b229a5b36e49d3ddf97 100644 (file)
@@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
+use util::atomic_counter::AtomicCounter;
 use util::logger::Logger;
 use util::errors::APIError;
 use util::events;
@@ -41,10 +42,19 @@ use ln::channelmanager::ChannelDetails;
 use prelude::*;
 use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
 use core::ops::Deref;
+use core::sync::atomic::{AtomicBool, Ordering};
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
+/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
+/// entirely opaque.
 enum UpdateOrigin {
+       /// An update that was generated by the `ChannelManager` (via our `chain::Watch`
+       /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field
+       /// and [`ChannelMonitor::get_latest_update_id`].
        OffChain(u64),
+       /// An update that was generated during blockchain processing. The ID here is specific to the
+       /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs.
+       ChainSync(u64),
 }
 
 /// An opaque identifier describing a specific [`Persist`] method call.
@@ -103,6 +113,12 @@ pub trait Persist<ChannelSigner: Sign> {
        /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
        /// details.
        ///
+       /// During blockchain synchronization operations, this may be called with no
+       /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
+       /// Note that after the full [`ChannelMonitor`] is persisted any previous
+       /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
+       /// applied to the persisted [`ChannelMonitor`] as they were already applied.
+       ///
        /// If an implementer chooses to persist the updates only, they need to make
        /// sure that all the updates are applied to the `ChannelMonitors` *before*
        /// the set of channel monitors is given to the `ChannelManager`
@@ -123,7 +139,7 @@ pub trait Persist<ChannelSigner: Sign> {
        /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
        ///
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn update_persisted_channel(&self, channel_id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
 }
 
 struct MonitorHolder<ChannelSigner: Sign> {
@@ -134,7 +150,24 @@ struct MonitorHolder<ChannelSigner: Sign> {
        /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
        /// channel_monitor_updated immediately, racing our insertion of the pending update into the
        /// contained Vec.
+       ///
+       /// Beyond the synchronization of updates themselves, we cannot handle user events until after
+       /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
+       /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
+       /// being persisted fully to disk after a chain update.
+       ///
+       /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
+       /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
+       /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
+       /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
+       /// resulting in a duplicate PaymentSent event.
        pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
+       /// When the user returns a PermanentFailure error from an update_persisted_channel call during
+       /// block processing, we inform the ChannelManager that the channel should be closed
+       /// asynchronously. In order to ensure no further changes happen before the ChannelManager has
+       /// processed the closure event, we set this to true and return PermanentFailure for any other
+       /// chain::Watch events.
+       channel_perm_failed: AtomicBool,
 }
 
 impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
@@ -142,6 +175,10 @@ impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
                pending_monitor_updates_lock.iter().any(|update_id|
                        if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false })
        }
+       fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
+               pending_monitor_updates_lock.iter().any(|update_id|
+                       if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false })
+       }
 }
 
 /// A read-only reference to a current ChannelMonitor.
@@ -177,11 +214,17 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
         P::Target: Persist<ChannelSigner>,
 {
        monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
+       /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a
+       /// unique ID, which we calculate by simply getting the next value from this counter. Note that
+       /// the ID is never persisted so it's ok that they reset on restart.
+       sync_persistence_id: AtomicCounter,
        chain_source: Option<C>,
        broadcaster: T,
        logger: L,
        fee_estimator: F,
        persister: P,
+       /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
+       /// from the user and not from a [`ChannelMonitor`].
        pending_monitor_events: Mutex<Vec<MonitorEvent>>,
 }
 
@@ -206,26 +249,50 @@ where C::Target: chain::Filter,
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
                let mut dependent_txdata = Vec::new();
-               let monitor_states = self.monitors.read().unwrap();
-               for monitor_state in monitor_states.values() {
-                       let mut txn_outputs = process(&monitor_state.monitor, txdata);
+               {
+                       let monitor_states = self.monitors.write().unwrap();
+                       for (funding_outpoint, monitor_state) in monitor_states.iter() {
+                               let monitor = &monitor_state.monitor;
+                               let mut txn_outputs;
+                               {
+                                       txn_outputs = process(monitor, txdata);
+                                       let update_id = MonitorUpdateId {
+                                               contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
+                                       };
+                                       let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+
+                                       log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+                                       match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
+                                               Ok(()) =>
+                                                       log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+                                               Err(ChannelMonitorUpdateErr::PermanentFailure) => {
+                                                       monitor_state.channel_perm_failed.store(true, Ordering::Release);
+                                                       self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
+                                               },
+                                               Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
+                                                       log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                                       pending_monitor_updates.push(update_id);
+                                               },
+                                       }
+                               }
 
-                       // Register any new outputs with the chain source for filtering, storing any dependent
-                       // transactions from within the block that previously had not been included in txdata.
-                       if let Some(ref chain_source) = self.chain_source {
-                               let block_hash = header.block_hash();
-                               for (txid, mut outputs) in txn_outputs.drain(..) {
-                                       for (idx, output) in outputs.drain(..) {
-                                               // Register any new outputs with the chain source for filtering and recurse
-                                               // if it indicates that there are dependent transactions within the block
-                                               // that had not been previously included in txdata.
-                                               let output = WatchedOutput {
-                                                       block_hash: Some(block_hash),
-                                                       outpoint: OutPoint { txid, index: idx as u16 },
-                                                       script_pubkey: output.script_pubkey,
-                                               };
-                                               if let Some(tx) = chain_source.register_output(output) {
-                                                       dependent_txdata.push(tx);
+                               // Register any new outputs with the chain source for filtering, storing any dependent
+                               // transactions from within the block that previously had not been included in txdata.
+                               if let Some(ref chain_source) = self.chain_source {
+                                       let block_hash = header.block_hash();
+                                       for (txid, mut outputs) in txn_outputs.drain(..) {
+                                               for (idx, output) in outputs.drain(..) {
+                                                       // Register any new outputs with the chain source for filtering and recurse
+                                                       // if it indicates that there are dependent transactions within the block
+                                                       // that had not been previously included in txdata.
+                                                       let output = WatchedOutput {
+                                                               block_hash: Some(block_hash),
+                                                               outpoint: OutPoint { txid, index: idx as u16 },
+                                                               script_pubkey: output.script_pubkey,
+                                                       };
+                                                       if let Some(tx) = chain_source.register_output(output) {
+                                                               dependent_txdata.push(tx);
+                                                       }
                                                }
                                        }
                                }
@@ -251,6 +318,7 @@ where C::Target: chain::Filter,
        pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
                Self {
                        monitors: RwLock::new(HashMap::new()),
+                       sync_persistence_id: AtomicCounter::new(),
                        chain_source,
                        broadcaster,
                        logger,
@@ -337,7 +405,7 @@ where C::Target: chain::Filter,
                pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
 
                match completed_update_id {
-                       MonitorUpdateId { .. } => {
+                       MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => {
                                // Note that we only check for `UpdateOrigin::OffChain` failures here - if
                                // we're being told that a `UpdateOrigin::OffChain` monitor update completed,
                                // we only care about ensuring we don't tell the `ChannelManager` to restore
@@ -348,8 +416,9 @@ where C::Target: chain::Filter,
                                // `MonitorEvent`s from the monitor back to the `ChannelManager` until they
                                // complete.
                                let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates);
-                               if monitor_is_pending_updates {
-                                       // If there are still monitor updates pending, we cannot yet construct an
+                               if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
+                                       // If there are still monitor updates pending (or an old monitor update
+                                       // finished after a later one perm-failed), we cannot yet construct an
                                        // UpdateCompleted event.
                                        return Ok(());
                                }
@@ -357,7 +426,12 @@ where C::Target: chain::Filter,
                                        funding_txo,
                                        monitor_update_id: monitor_data.monitor.get_latest_update_id(),
                                });
-                       }
+                       },
+                       MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
+                               // We've already done everything we need to, the next time
+                               // release_pending_monitor_events is called, any events for this ChannelMonitor
+                               // will be returned if there's no more SyncPersistId events left.
+                       },
                }
                Ok(())
        }
@@ -502,7 +576,11 @@ where C::Target: chain::Filter,
                                monitor.load_outputs_to_watch(chain_source);
                        }
                }
-               entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates) });
+               entry.insert(MonitorHolder {
+                       monitor,
+                       pending_monitor_updates: Mutex::new(pending_monitor_updates),
+                       channel_perm_failed: AtomicBool::new(false),
+               });
                persist_res
        }
 
@@ -534,15 +612,19 @@ where C::Target: chain::Filter,
                                // still be changed. So, persist the updated monitor despite the error.
                                let update_id = MonitorUpdateId::from_monitor_update(&update);
                                let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                               let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor, update_id);
+                               let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
                                if let Err(e) = persist_res {
                                        if e == ChannelMonitorUpdateErr::TemporaryFailure {
                                                pending_monitor_updates.push(update_id);
+                                       } else {
+                                               monitor_state.channel_perm_failed.store(true, Ordering::Release);
                                        }
                                        log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
                                }
                                if update_res.is_err() {
                                        Err(ChannelMonitorUpdateErr::PermanentFailure)
+                               } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
+                                       Err(ChannelMonitorUpdateErr::PermanentFailure)
                                } else {
                                        persist_res
                                }
@@ -553,7 +635,23 @@ where C::Target: chain::Filter,
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
+                       let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
+                       if is_pending_monitor_update {
+                               log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
+                       } else {
+                               if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
+                                       // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`,
+                                       // we don't really know if the latest `ChannelMonitor` state is on disk or not.
+                                       // We're supposed to hold monitor updates until the latest state is on disk to
+                                       // avoid duplicate events, but the user told us persistence is screw-y and may
+                                       // not complete. We can't hold events forever because we may learn some payment
+                                       // preimage, so instead we just log and hope the user complied with the
+                                       // `PermanentFailure` requirements of having at least the local-disk copy
+                                       // updated.
+                                       log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!");
+                               }
+                               pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
+                       }
                }
                pending_monitor_events
        }
index a7506acb4dc5ba6bb8ed8db88731b9f51fa6e57c..7fad40d9c2ed70f4252115b3d2f98d16ee2ac2b7 100644 (file)
@@ -146,9 +146,16 @@ pub enum MonitorEvent {
                /// same [`ChannelMonitor`] have been applied and persisted.
                monitor_update_id: u64,
        },
+
+       /// Indicates a [`ChannelMonitor`] update has failed. See
+       /// [`ChannelMonitorUpdateErr::PermanentFailure`] for more information on how this is used.
+       ///
+       /// [`ChannelMonitorUpdateErr::PermanentFailure`]: super::ChannelMonitorUpdateErr::PermanentFailure
+       UpdateFailed(OutPoint),
 }
 impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
-       // Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor
+       // Note that UpdateCompleted and UpdateFailed are currently never serialized to disk as they are
+       // generated only in ChainMonitor
        (0, UpdateCompleted) => {
                (0, funding_txo, required),
                (2, monitor_update_id, required),
@@ -156,6 +163,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
 ;
        (2, HTLCEvent),
        (4, CommitmentTxConfirmed),
+       (6, UpdateFailed),
 );
 
 /// Simple structure sent back by `chain::Watch` when an HTLC from a forward channel is detected on
@@ -649,7 +657,17 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
 
        payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
 
+       // Note that `MonitorEvent`s MUST NOT be generated during update processing, only generated
+       // during chain data processing. This prevents a race in `ChainMonitor::update_channel` (and
+       // presumably user implementations thereof as well) where we update the in-memory channel
+       // object, then before the persistence finishes (as it's all under a read-lock), we return
+       // pending events to the user or to the relevant `ChannelManager`. Then, on reload, we'll have
+       // the pre-event state here, but have processed the event in the `ChannelManager`.
+       // Note that because the `event_lock` in `ChainMonitor` is only taken in
+       // block/transaction-connected events and *not* during block/transaction-disconnected events,
+       // we further MUST NOT generate events during block/transaction-disconnection.
        pending_monitor_events: Vec<MonitorEvent>,
+
        pending_events: Vec<Event>,
 
        // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
index 55755073bb1a7546d39ce65fd2fd8c9d0e9b4af5..25e5a97d288df42d4a2baf2263aae9e9d493f28e 100644 (file)
@@ -285,6 +285,10 @@ pub trait Watch<ChannelSigner: Sign> {
        /// Returns any monitor events since the last call. Subsequent calls must only return new
        /// events.
        ///
+       /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no
+       /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted
+       /// to disk.
+       ///
        /// For details on asynchronous [`ChannelMonitor`] updating and returning
        /// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
index 72f0bebdf1d3c1eb7cf9e505e75daaa286612884..1f3ad5541f1ba1d4697032d668c4f39765f29cb1 100644 (file)
@@ -4090,7 +4090,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                                        }
                                },
-                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
+                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
+                               MonitorEvent::UpdateFailed(funding_outpoint) => {
                                        let mut channel_lock = self.channel_state.lock().unwrap();
                                        let channel_state = &mut *channel_lock;
                                        let by_id = &mut channel_state.by_id;
@@ -4106,7 +4107,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: update
                                                        });
                                                }
-                                               self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed);
+                                               let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
+                                                       ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
+                                               } else {
+                                                       ClosureReason::CommitmentTxConfirmed
+                                               };
+                                               self.issue_channel_close_events(&chan, reason);
                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                        node_id: chan.get_counterparty_node_id(),
                                                        action: msgs::ErrorAction::SendErrorMessage {
@@ -5440,20 +5446,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
 /// is:
-/// 1) Deserialize all stored ChannelMonitors.
-/// 2) Deserialize the ChannelManager by filling in this struct and calling:
-///    <(BlockHash, ChannelManager)>::read(reader, args)
-///    This may result in closing some Channels if the ChannelMonitor is newer than the stored
-///    ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
-/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same
-///    way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and
-///    ChannelMonitor::get_funding_txo().
-/// 4) Reconnect blocks on your ChannelMonitors.
-/// 5) Disconnect/connect blocks on the ChannelManager.
-/// 6) Move the ChannelMonitors into your local chain::Watch.
+/// 1) Deserialize all stored [`ChannelMonitor`]s.
+/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling:
+///    `<(BlockHash, ChannelManager)>::read(reader, args)`
+///    This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored
+///    [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted.
+/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the
+///    same way you would handle a [`chain::Filter`] call using
+///    [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`].
+/// 4) Reconnect blocks on your [`ChannelMonitor`]s.
+/// 5) Disconnect/connect blocks on the [`ChannelManager`].
+/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk.
+///    Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you
+///    will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in
+///    the next step.
+/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a
+///    [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`].
 ///
-/// Note that the ordering of #4-6 is not of importance, however all three must occur before you
-/// call any other methods on the newly-deserialized ChannelManager.
+/// Note that the ordering of #4-7 is not of importance, however all four must occur before you
+/// call any other methods on the newly-deserialized [`ChannelManager`].
 ///
 /// Note that because some channels may be closed during deserialization, it is critical that you
 /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@@ -5461,6 +5472,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will
 /// not force-close the same channels but consider them live), you may end up revoking a state for
 /// which you've already broadcasted the transaction.
+///
+/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
 pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
index f0e453f9ba0f57a04010d5052a32c99303fbfb2f..7ec794555507cb21bcc2dfab0dbf9823bfd87285 100644 (file)
@@ -190,7 +190,7 @@ impl<Signer: keysinterface::Sign> chainmonitor::Persist<Signer> for TestPersiste
                ret
        }
 
-       fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
+       fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &Option<channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
                let ret = self.update_ret.lock().unwrap().clone();
                if let Some(next_ret) = self.next_update_ret.lock().unwrap().take() {
                        *self.update_ret.lock().unwrap() = next_ret;