///
/// # Pruning stale channel updates
///
-/// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if
-/// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy`
-/// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
+/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
+/// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
+/// are deleted.
+/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
/// will complete. However, stale updates are not a problem for data integrity, since updates are
/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
///
) -> chain::ChannelMonitorUpdateStatus {
// Determine the proper key for this monitor
let monitor_name = MonitorName::from(funding_txo);
- let maybe_old_monitor = self.read_monitor(&monitor_name);
- match maybe_old_monitor {
- Ok((_, ref old_monitor)) => {
- // Check that this key isn't already storing a monitor with a higher update_id
- // (collision)
- if old_monitor.get_latest_update_id() > monitor.get_latest_update_id() {
- log_error!(
- self.logger,
- "Tried to write a monitor at the same outpoint {} with a higher update_id!",
- monitor_name.as_str()
- );
- return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
- }
- }
- // This means the channel monitor is new.
- Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
- _ => return chain::ChannelMonitorUpdateStatus::UnrecoverableError,
- }
// Serialize and write the new monitor
let mut monitor_bytes = Vec::with_capacity(
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
&monitor_bytes,
) {
Ok(_) => {
- // Assess cleanup. Typically, we'll clean up only between the last two known full
- // monitors.
- if let Ok((_, old_monitor)) = maybe_old_monitor {
- let start = old_monitor.get_latest_update_id();
- let end = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
- // We don't want to clean the rest of u64, so just do possible pending
- // updates. Note that we never write updates at
- // `CLOSED_CHANNEL_UPDATE_ID`.
- cmp::min(
- start.saturating_add(self.maximum_pending_updates),
- CLOSED_CHANNEL_UPDATE_ID - 1,
- )
- } else {
- monitor.get_latest_update_id().saturating_sub(1)
- };
- // We should bother cleaning up only if there's at least one update
- // expected.
- for update_id in start..=end {
- let update_name = UpdateName::from(update_id);
- #[cfg(debug_assertions)]
- {
- if let Ok(update) =
- self.read_monitor_update(&monitor_name, &update_name)
- {
- // Assert that we are reading what we think we are.
- debug_assert_eq!(update.update_id, update_name.0);
- } else if update_id != start && monitor.get_latest_update_id() != CLOSED_CHANNEL_UPDATE_ID
- {
- // We're deleting something we should know doesn't exist.
- panic!(
- "failed to read monitor update {}",
- update_name.as_str()
- );
- }
- // On closed channels, we will unavoidably try to read
- // non-existent updates since we have to guess at the range of
- // stale updates, so do nothing.
- }
- if let Err(e) = self.kv_store.remove(
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
- monitor_name.as_str(),
- update_name.as_str(),
- true,
- ) {
- log_error!(
- self.logger,
- "error cleaning up channel monitor updates for monitor {}, reason: {}",
- monitor_name.as_str(),
- e
- );
- };
- }
- };
chain::ChannelMonitorUpdateStatus::Completed
}
Err(e) => {
log_error!(
self.logger,
- "error writing channel monitor {}/{}/{} reason: {}",
+ "Failed to write ChannelMonitor {}/{}/{} reason: {}",
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
Err(e) => {
log_error!(
self.logger,
- "error writing channel monitor update {}/{}/{} reason: {}",
+ "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}",
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
update_name.as_str(),
}
}
} else {
- // We could write this update, but it meets criteria of our design that call for a full monitor write.
- self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
+ let monitor_name = MonitorName::from(funding_txo);
+ // In case of channel-close monitor update, we need to read old monitor before persisting
+ // the new one in order to determine the cleanup range.
+ let maybe_old_monitor = match monitor.get_latest_update_id() {
+ CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
+ _ => None
+ };
+
+ // We could write this update, but it meets criteria of our design that calls for a full monitor write.
+ let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id);
+
+ if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
+ let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+ // If there is an error while reading old monitor, we skip clean up.
+ maybe_old_monitor.map(|(_, ref old_monitor)| {
+ let start = old_monitor.get_latest_update_id();
+ // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
+ let end = cmp::min(
+ start.saturating_add(self.maximum_pending_updates),
+ CLOSED_CHANNEL_UPDATE_ID - 1,
+ );
+ (start, end)
+ })
+ } else {
+ let end = monitor.get_latest_update_id();
+ let start = end.saturating_sub(self.maximum_pending_updates);
+ Some((start, end))
+ };
+
+ if let Some((start, end)) = cleanup_range {
+ self.cleanup_in_range(monitor_name, start, end);
+ }
+ }
+
+ monitor_update_status
}
} else {
// There is no update given, so we must persist a new monitor.
}
}
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
+where
+ ES::Target: EntropySource + Sized,
+ K::Target: KVStore,
+ L::Target: Logger,
+ SP::Target: SignerProvider + Sized
+{
+ // Cleans up monitor updates for given monitor in range `start..=end`.
+ fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
+ for update_id in start..=end {
+ let update_name = UpdateName::from(update_id);
+ if let Err(e) = self.kv_store.remove(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ update_name.as_str(),
+ true,
+ ) {
+ log_error!(
+ self.logger,
+ "Failed to clean up channel monitor updates for monitor {}, reason: {}",
+ monitor_name.as_str(),
+ e
+ );
+ };
+ }
+ }
+}
+
/// A struct representing a name for a monitor.
#[derive(Debug)]
struct MonitorName(String);
#[test]
fn persister_with_real_monitors() {
// This value is used later to limit how many iterations we perform.
- let test_max_pending_updates = 7;
+ let persister_0_max_pending_updates = 7;
+ // Intentionally set this to a smaller value to test a different alignment.
+ let persister_1_max_pending_updates = 3;
let chanmon_cfgs = create_chanmon_cfgs(4);
let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
- maximum_pending_updates: test_max_pending_updates,
+ maximum_pending_updates: persister_0_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
- // Intentionally set this to a smaller value to test a different alignment.
- maximum_pending_updates: 3,
+ maximum_pending_updates: persister_1_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
};
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
for (_, mon) in persisted_chan_data_0.iter() {
// check that when we read it, we got the right update id
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
- // if the CM is at the correct update id without updates, ensure no updates are stored
+
+ // if the CM is at consolidation threshold, ensure no updates are stored.
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
- let (_, cm_0) = persister_0.read_monitor(&monitor_name).unwrap();
- if cm_0.get_latest_update_id() == $expected_update_id {
+ if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
+ || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!(
persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(),
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
- let (_, cm_1) = persister_1.read_monitor(&monitor_name).unwrap();
- if cm_1.get_latest_update_id() == $expected_update_id {
+ // if the CM is at consolidation threshold, ensure no updates are stored.
+ if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
+ || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
assert_eq!(
persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str()).unwrap().len(),
// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
- for i in 3..=test_max_pending_updates * 2 {
+ for i in 3..=persister_0_max_pending_updates * 2 {
let receiver;
if sender == 0 {
sender = 1;