X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;fp=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=2479e8f7e7b5efd00582d2e9aaac7ecd9cfa9f31;hb=12fa0b11a669050bc3e0e081bea0523baa1598ff;hp=9f15a2a27996ba1e40faf27a97f80c27fb434659;hpb=313810ebdc8d7a90e68db07774cfdf7a653159e0;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9f15a2a2..2479e8f7 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -27,7 +27,7 @@ use bitcoin::blockdata::block::BlockHeader; use bitcoin::hash_types::Txid; use chain; -use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; +use chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use chain::transaction::{OutPoint, TransactionData}; @@ -78,20 +78,21 @@ impl MonitorUpdateId { /// /// Each method can return three possible values: /// * If persistence (including any relevant `fsync()` calls) happens immediately, the -/// implementation should return `Ok(())`, indicating normal channel operation should continue. +/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal +/// channel operation should continue. /// * If persistence happens asynchronously, implementations should first ensure the /// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return -/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the -/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be -/// called with the corresponding [`MonitorUpdateId`]. +/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background. +/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with +/// the corresponding [`MonitorUpdateId`]. /// /// Note that unlike the direct [`chain::Watch`] interface, /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. /// /// * If persistence fails for some reason, implementations should return -/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be +/// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be /// closed without broadcasting the latest state. See -/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. +/// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. @@ -101,14 +102,14 @@ pub trait Persist { /// and the stored channel data). Note that you **must** persist every new monitor to disk. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` - /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; + fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -136,14 +137,14 @@ pub trait Persist { /// whereas updates are small and `O(1)`. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and - /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; + fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } struct MonitorHolder { @@ -151,9 +152,9 @@ struct MonitorHolder { /// The full set of pending monitor updates for this Channel. /// /// Note that this lock must be held during updates to prevent a race where we call - /// update_persisted_channel, the user returns a TemporaryFailure, and then calls - /// channel_monitor_updated immediately, racing our insertion of the pending update into the - /// contained Vec. + /// update_persisted_channel, the user returns a + /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated + /// immediately, racing our insertion of the pending update into the contained Vec. /// /// Beyond the synchronization of updates themselves, we cannot handle user events until after /// any chain updates have been stored on disk. Thus, we scan this list when returning updates @@ -287,20 +288,20 @@ where C::Target: chain::Filter, if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { // If there are not ChainSync persists awaiting completion, go ahead and // set last_chain_persist_height here - we wouldn't want the first - // TemporaryFailure to always immediately be considered "overly delayed". + // InProgress to always immediately be considered "overly delayed". monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { - Ok(()) => + ChannelMonitorUpdateStatus::Completed => log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - Err(ChannelMonitorUpdateErr::PermanentFailure) => { + ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); }, - Err(ChannelMonitorUpdateErr::TemporaryFailure) => { + ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, @@ -400,12 +401,12 @@ where C::Target: chain::Filter, } /// Indicates the persistence of a [`ChannelMonitor`] has completed after - /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. + /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation. /// /// Thus, the anticipated use is, at a high level: /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, - /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], + /// returning [`ChannelMonitorUpdateStatus::InProgress`], /// 2) once all remote copies are updated, you call this function with the /// `completed_update_id` that completed, and once all pending updates have completed the /// channel will be re-enabled. @@ -438,10 +439,10 @@ where C::Target: chain::Filter, if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) { // If there are still monitor updates pending (or an old monitor update // finished after a later one perm-failed), we cannot yet construct an - // UpdateCompleted event. + // Completed event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { funding_txo, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); @@ -464,7 +465,7 @@ where C::Target: chain::Filter, pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { funding_txo, monitor_update_id, }], counterparty_node_id)); @@ -570,27 +571,31 @@ where C::Target: chain::Filter, /// /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` /// monitors lock. - fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> ChannelMonitorUpdateStatus { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); - return Err(ChannelMonitorUpdateErr::PermanentFailure)}, + return ChannelMonitorUpdateStatus::PermanentFailure + }, hash_map::Entry::Vacant(e) => e, }; log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); let update_id = MonitorUpdateId::from_new_monitor(&monitor); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); - if persist_res.is_err() { - log_error!(self.logger, "Failed to persist new ChannelMonitor for channel {}: {:?}", log_funding_info!(monitor), persist_res); - } else { - log_trace!(self.logger, "Finished persisting new ChannelMonitor for channel {}", log_funding_info!(monitor)); - } - if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { - return persist_res; - } else if persist_res.is_err() { - pending_monitor_updates.push(update_id); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::PermanentFailure => { + log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor)); + return persist_res; + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); + } } if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source); @@ -606,7 +611,7 @@ where C::Target: chain::Filter, /// Note that we persist the given `ChannelMonitor` update while holding the /// `ChainMonitor` monitors lock. - fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { + fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { @@ -619,7 +624,7 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing))] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); #[cfg(not(any(test, fuzzing)))] - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure }, Some(monitor_state) => { let monitor = &monitor_state.monitor; @@ -633,20 +638,23 @@ where C::Target: chain::Filter, let update_id = MonitorUpdateId::from_monitor_update(&update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id); - if let Err(e) = persist_res { - if e == ChannelMonitorUpdateErr::TemporaryFailure { + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); - } else { + log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); + }, + ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); - } - log_error!(self.logger, "Failed to persist ChannelMonitor update for channel {}: {:?}", log_funding_info!(monitor), e); - } else { - log_trace!(self.logger, "Finished persisting ChannelMonitor update for channel {}", log_funding_info!(monitor)); + log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor)); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + }, } if update_res.is_err() { - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure } else { persist_res } @@ -723,7 +731,7 @@ mod tests { use ::{check_added_monitors, check_closed_broadcast, check_closed_event}; use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; - use chain::{ChannelMonitorUpdateErr, Confirm, Watch}; + use chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; use ln::channelmanager::{self, PaymentSendFailure}; use ln::functional_test_utils::*; @@ -747,7 +755,7 @@ mod tests { let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); - chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); @@ -756,7 +764,7 @@ mod tests { check_added_monitors!(nodes[1], 1); expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); - chanmon_cfgs[1].persister.set_update_ret(Ok(())); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -834,7 +842,7 @@ mod tests { // Temp-fail the block connection which will hold the channel-closed event chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); - chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The // channel is now closed, but the ChannelManager doesn't know that yet. @@ -850,7 +858,7 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). - chanmon_cfgs[0].persister.set_update_ret(Ok(())); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); @@ -896,7 +904,7 @@ mod tests { create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); - chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::PermanentFailure)); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure); connect_blocks(&nodes[0], 1); // Before processing events, the ChannelManager will still think the Channel is open and