Add `ChainMonitor::archive_fully_resolved_monitor_channels`
[rust-lightning] / lightning / src / util / persist.rs
1 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4 // You may not use this file except in accordance with one or both of these
5 // licenses.
6
7 //! This module contains a simple key-value store trait [`KVStore`] that
8 //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
9 //! and [`ChannelMonitor`] all in one place.
10 //!
11 //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
12
13 use core::cmp;
14 use core::ops::Deref;
15 use core::str::FromStr;
16 use bitcoin::{BlockHash, Txid};
17
18 use crate::{io, log_error};
19 use crate::prelude::*;
20
21 use crate::chain;
22 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23 use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
24 use crate::sign::{EntropySource, ecdsa::WriteableEcdsaChannelSigner, SignerProvider};
25 use crate::chain::transaction::OutPoint;
26 use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
27 use crate::ln::channelmanager::AChannelManager;
28 use crate::routing::gossip::NetworkGraph;
29 use crate::routing::scoring::WriteableScore;
30 use crate::util::logger::Logger;
31 use crate::util::ser::{Readable, ReadableArgs, Writeable};
32
33 /// The alphabet of characters allowed for namespaces and keys.
34 pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
35
36 /// The maximum number of characters namespaces and keys may have.
37 pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
38
39 /// The primary namespace under which the [`ChannelManager`] will be persisted.
40 ///
41 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
42 pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
43 /// The secondary namespace under which the [`ChannelManager`] will be persisted.
44 ///
45 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
46 pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
47 /// The key under which the [`ChannelManager`] will be persisted.
48 ///
49 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
50 pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
51
52 /// The primary namespace under which [`ChannelMonitor`]s will be persisted.
53 pub const CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitors";
54 /// The secondary namespace under which [`ChannelMonitor`]s will be persisted.
55 pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
56 /// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
57 pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
58
59 /// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
60 pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
61 /// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
62 pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
63
64 /// The primary namespace under which the [`NetworkGraph`] will be persisted.
65 pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
66 /// The secondary namespace under which the [`NetworkGraph`] will be persisted.
67 pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
68 /// The key under which the [`NetworkGraph`] will be persisted.
69 pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
70
71 /// The primary namespace under which the [`WriteableScore`] will be persisted.
72 pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
73 /// The secondary namespace under which the [`WriteableScore`] will be persisted.
74 pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
75 /// The key under which the [`WriteableScore`] will be persisted.
76 pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
77
78 /// A sentinel value to be prepended to monitors persisted by the [`MonitorUpdatingPersister`].
79 ///
80 /// This serves to prevent someone from accidentally loading such monitors (which may need
81 /// updates applied to be current) with another implementation.
82 pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
83
84 /// Provides an interface that allows storage and retrieval of persisted values that are associated
85 /// with given keys.
86 ///
87 /// In order to avoid collisions the key space is segmented based on the given `primary_namespace`s
88 /// and `secondary_namespace`s. Implementations of this trait are free to handle them in different
89 /// ways, as long as per-namespace key uniqueness is asserted.
90 ///
91 /// Keys and namespaces are required to be valid ASCII strings in the range of
92 /// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty
93 /// primary namespaces and secondary namespaces (`""`) are assumed to be a valid, however, if
94 /// `primary_namespace` is empty, `secondary_namespace` is required to be empty, too. This means
95 /// that concerns should always be separated by primary namespace first, before secondary
96 /// namespaces are used. While the number of primary namespaces will be relatively small and is
97 /// determined at compile time, there may be many secondary namespaces per primary namespace. Note
98 /// that per-namespace uniqueness needs to also hold for keys *and* namespaces in any given
99 /// namespace, i.e., conflicts between keys and equally named
100 /// primary namespaces/secondary namespaces must be avoided.
101 ///
102 /// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister`
103 /// interface can use a concatenation of `[{primary_namespace}/[{secondary_namespace}/]]{key}` to
104 /// recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`.
105 pub trait KVStore {
106         /// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and
107         /// `key`.
108         ///
109         /// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given
110         /// `primary_namespace` and `secondary_namespace`.
111         ///
112         /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
113         fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result<Vec<u8>, io::Error>;
114         /// Persists the given data under the given `key`.
115         ///
116         /// Will create the given `primary_namespace` and `secondary_namespace` if not already present
117         /// in the store.
118         fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Result<(), io::Error>;
119         /// Removes any data that had previously been persisted under the given `key`.
120         ///
121         /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
122         /// remove the given `key` at some point in time after the method returns, e.g., as part of an
123         /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
124         /// [`KVStore::list`] might include the removed key until the changes are actually persisted.
125         ///
126         /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
127         /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
128         /// potentially get lost on crash after the method returns. Therefore, this flag should only be
129         /// set for `remove` operations that can be safely replayed at a later time.
130         ///
131         /// Returns successfully if no data will be stored for the given `primary_namespace`,
132         /// `secondary_namespace`, and `key`, independently of whether it was present before its
133         /// invokation or not.
134         fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), io::Error>;
135         /// Returns a list of keys that are stored under the given `secondary_namespace` in
136         /// `primary_namespace`.
137         ///
138         /// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
139         /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
140         fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result<Vec<String>, io::Error>;
141 }
142
143 /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
144 ///
145 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
146 pub trait Persister<'a, CM: Deref, L: Deref, S: WriteableScore<'a>>
147 where
148         CM::Target: 'static + AChannelManager,
149         L::Target: 'static + Logger,
150 {
151         /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
152         ///
153         /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
154         fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>;
155
156         /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
157         fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error>;
158
159         /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
160         fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
161 }
162
163
164 impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, CM, L, S> for A
165 where
166         CM::Target: 'static + AChannelManager,
167         L::Target: 'static + Logger,
168 {
169         fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
170                 self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
171                         CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
172                         CHANNEL_MANAGER_PERSISTENCE_KEY,
173                         &channel_manager.get_cm().encode())
174         }
175
176         fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
177                 self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
178                         NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
179                         NETWORK_GRAPH_PERSISTENCE_KEY,
180                         &network_graph.encode())
181         }
182
183         fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
184                 self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
185                         SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
186                         SCORER_PERSISTENCE_KEY,
187                         &scorer.encode())
188         }
189 }
190
191 impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSigner> for K {
192         // TODO: We really need a way for the persister to inform the user that its time to crash/shut
193         // down once these start returning failure.
194         // Then we should return InProgress rather than UnrecoverableError, implying we should probably
195         // just shut down the node since we're not retrying persistence!
196
197         fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
198                 let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
199                 match self.write(
200                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
201                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
202                         &key, &monitor.encode())
203                 {
204                         Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
205                         Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
206                 }
207         }
208
209         fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
210                 let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
211                 match self.write(
212                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
213                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
214                         &key, &monitor.encode())
215                 {
216                         Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
217                         Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
218                 }
219         }
220
221         fn archive_persisted_channel(&self, funding_txo: OutPoint) {
222                 let monitor_name = MonitorName::from(funding_txo);
223                 let monitor = match self.read(
224                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
225                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
226                         monitor_name.as_str(),
227                 ) {
228                         Ok(monitor) => monitor,
229                         Err(_) => return
230                 };
231                 match self.write(
232                         ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
233                         ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
234                         monitor_name.as_str(),
235                         &monitor,
236                 ) {
237                         Ok(()) => {}
238                         Err(_e) => return
239                 };
240                 let _ = self.remove(
241                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
242                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
243                         monitor_name.as_str(),
244                         true,
245                 );
246         }
247 }
248
249 /// Read previously persisted [`ChannelMonitor`]s from the store.
250 pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
251         kv_store: K, entropy_source: ES, signer_provider: SP,
252 ) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
253 where
254         K::Target: KVStore,
255         ES::Target: EntropySource + Sized,
256         SP::Target: SignerProvider + Sized,
257 {
258         let mut res = Vec::new();
259
260         for stored_key in kv_store.list(
261                 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE)?
262         {
263                 if stored_key.len() < 66 {
264                         return Err(io::Error::new(
265                                 io::ErrorKind::InvalidData,
266                                 "Stored key has invalid length"));
267                 }
268
269                 let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| {
270                         io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
271                 })?;
272
273                 let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
274                         io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
275                 })?;
276
277                 match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
278                         &mut io::Cursor::new(
279                                 kv_store.read(CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key)?),
280                         (&*entropy_source, &*signer_provider),
281                 ) {
282                         Ok((block_hash, channel_monitor)) => {
283                                 if channel_monitor.get_funding_txo().0.txid != txid
284                                         || channel_monitor.get_funding_txo().0.index != index
285                                 {
286                                         return Err(io::Error::new(
287                                                 io::ErrorKind::InvalidData,
288                                                 "ChannelMonitor was stored under the wrong key",
289                                         ));
290                                 }
291                                 res.push((block_hash, channel_monitor));
292                         }
293                         Err(_) => {
294                                 return Err(io::Error::new(
295                                         io::ErrorKind::InvalidData,
296                                         "Failed to read ChannelMonitor"
297                                 ))
298                         }
299                 }
300         }
301         Ok(res)
302 }
303
304 /// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and
305 /// [`ChannelMonitorUpdate`]s.
306 ///
307 /// # Overview
308 ///
309 /// The main benefit this provides over the [`KVStore`]'s [`Persist`] implementation is decreased
310 /// I/O bandwidth and storage churn, at the expense of more IOPS (including listing, reading, and
311 /// deleting) and complexity. This is because it writes channel monitor differential updates,
312 /// whereas the other (default) implementation rewrites the entire monitor on each update. For
313 /// routing nodes, updates can happen many times per second to a channel, and monitors can be tens
314 /// of megabytes (or more). Updates can be as small as a few hundred bytes.
315 ///
316 /// Note that monitors written with `MonitorUpdatingPersister` are _not_ backward-compatible with
317 /// the default [`KVStore`]'s [`Persist`] implementation. They have a prepended byte sequence,
318 /// [`MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL`], applied to prevent deserialization with other
319 /// persisters. This is because monitors written by this struct _may_ have unapplied updates. In
320 /// order to downgrade, you must ensure that all updates are applied to the monitor, and remove the
321 /// sentinel bytes.
322 ///
323 /// # Storing monitors
324 ///
325 /// Monitors are stored by implementing the [`Persist`] trait, which has two functions:
326 ///
327 ///   - [`Persist::persist_new_channel`], which persists whole [`ChannelMonitor`]s.
328 ///   - [`Persist::update_persisted_channel`], which persists only a [`ChannelMonitorUpdate`]
329 ///
330 /// Whole [`ChannelMonitor`]s are stored in the [`CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE`],
331 /// using the familiar encoding of an [`OutPoint`] (for example, `[SOME-64-CHAR-HEX-STRING]_1`).
332 ///
333 /// Each [`ChannelMonitorUpdate`] is stored in a dynamic secondary namespace, as follows:
334 ///
335 ///   - primary namespace: [`CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE`]
336 ///   - secondary namespace: [the monitor's encoded outpoint name]
337 ///
338 /// Under that secondary namespace, each update is stored with a number string, like `21`, which
339 /// represents its `update_id` value.
340 ///
341 /// For example, consider this channel, named for its transaction ID and index, or [`OutPoint`]:
342 ///
343 ///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
344 ///   - Index: `1`
345 ///
346 /// Full channel monitors would be stored at a single key:
347 ///
348 /// `[CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
349 ///
350 /// Updates would be stored as follows (with `/` delimiting primary_namespace/secondary_namespace/key):
351 ///
352 /// ```text
353 /// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/1
354 /// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/2
355 /// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/3
356 /// ```
357 /// ... and so on.
358 ///
359 /// # Reading channel state from storage
360 ///
361 /// Channel state can be reconstructed by calling
362 /// [`MonitorUpdatingPersister::read_all_channel_monitors_with_updates`]. Alternatively, users can
363 /// list channel monitors themselves and load channels individually using
364 /// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
365 ///
366 /// ## EXTREMELY IMPORTANT
367 ///
368 /// It is extremely important that your [`KVStore::read`] implementation uses the
369 /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
370 /// that circumstance (not when there is really a permissions error, for example). This is because
371 /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
372 /// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
373 /// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
374 ///
375 /// # Pruning stale channel updates
376 ///
377 /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
378 /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
379 /// are deleted.
380 /// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
381 /// will complete. However, stale updates are not a problem for data integrity, since updates are
382 /// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
383 ///
384 /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
385 /// would like to get rid of them, consider using the
386 /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
387 pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
388 where
389         K::Target: KVStore,
390         L::Target: Logger,
391         ES::Target: EntropySource + Sized,
392         SP::Target: SignerProvider + Sized,
393 {
394         kv_store: K,
395         logger: L,
396         maximum_pending_updates: u64,
397         entropy_source: ES,
398         signer_provider: SP,
399 }
400
401 #[allow(dead_code)]
402 impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
403         MonitorUpdatingPersister<K, L, ES, SP>
404 where
405         K::Target: KVStore,
406         L::Target: Logger,
407         ES::Target: EntropySource + Sized,
408         SP::Target: SignerProvider + Sized,
409 {
410         /// Constructs a new [`MonitorUpdatingPersister`].
411         ///
412         /// The `maximum_pending_updates` parameter controls how many updates may be stored before a
413         /// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that
414         /// consolidation will frequently occur with fewer updates than what you set here; this number
415         /// is merely the maximum that may be stored. When setting this value, consider that for higher
416         /// values of `maximum_pending_updates`:
417         ///
418         ///   - [`MonitorUpdatingPersister`] will tend to write more [`ChannelMonitorUpdate`]s than
419         /// [`ChannelMonitor`]s, approaching one [`ChannelMonitor`] write for every
420         /// `maximum_pending_updates` [`ChannelMonitorUpdate`]s.
421         ///   - [`MonitorUpdatingPersister`] will issue deletes differently. Lazy deletes will come in
422         /// "waves" for each [`ChannelMonitor`] write. A larger `maximum_pending_updates` means bigger,
423         /// less frequent "waves."
424         ///   - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
425         /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
426         pub fn new(
427                 kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
428                 signer_provider: SP,
429         ) -> Self {
430                 MonitorUpdatingPersister {
431                         kv_store,
432                         logger,
433                         maximum_pending_updates,
434                         entropy_source,
435                         signer_provider,
436                 }
437         }
438
439         /// Reads all stored channel monitors, along with any stored updates for them.
440         ///
441         /// It is extremely important that your [`KVStore::read`] implementation uses the
442         /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
443         /// documentation for [`MonitorUpdatingPersister`].
444         pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>(
445                 &self, broadcaster: &B, fee_estimator: &F,
446         ) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
447         where
448                 B::Target: BroadcasterInterface,
449                 F::Target: FeeEstimator,
450         {
451                 let monitor_list = self.kv_store.list(
452                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
453                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
454                 )?;
455                 let mut res = Vec::with_capacity(monitor_list.len());
456                 for monitor_key in monitor_list {
457                         res.push(self.read_channel_monitor_with_updates(
458                                 broadcaster,
459                                 fee_estimator,
460                                 monitor_key,
461                         )?)
462                 }
463                 Ok(res)
464         }
465
466         /// Read a single channel monitor, along with any stored updates for it.
467         ///
468         /// It is extremely important that your [`KVStore::read`] implementation uses the
469         /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
470         /// documentation for [`MonitorUpdatingPersister`].
471         ///
472         /// For `monitor_key`, channel storage keys be the channel's transaction ID and index, or
473         /// [`OutPoint`], with an underscore `_` between them. For example, given:
474         ///
475         ///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
476         ///   - Index: `1`
477         ///
478         /// The correct `monitor_key` would be:
479         /// `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
480         ///
481         /// Loading a large number of monitors will be faster if done in parallel. You can use this
482         /// function to accomplish this. Take care to limit the number of parallel readers.
483         pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>(
484                 &self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
485         ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
486         where
487                 B::Target: BroadcasterInterface,
488                 F::Target: FeeEstimator,
489         {
490                 let monitor_name = MonitorName::new(monitor_key)?;
491                 let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
492                 let mut current_update_id = monitor.get_latest_update_id();
493                 loop {
494                         current_update_id = match current_update_id.checked_add(1) {
495                                 Some(next_update_id) => next_update_id,
496                                 None => break,
497                         };
498                         let update_name = UpdateName::from(current_update_id);
499                         let update = match self.read_monitor_update(&monitor_name, &update_name) {
500                                 Ok(update) => update,
501                                 Err(err) if err.kind() == io::ErrorKind::NotFound => {
502                                         // We can't find any more updates, so we are done.
503                                         break;
504                                 }
505                                 Err(err) => return Err(err),
506                         };
507
508                         monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger)
509                                 .map_err(|e| {
510                                         log_error!(
511                                                 self.logger,
512                                                 "Monitor update failed. monitor: {} update: {} reason: {:?}",
513                                                 monitor_name.as_str(),
514                                                 update_name.as_str(),
515                                                 e
516                                         );
517                                         io::Error::new(io::ErrorKind::Other, "Monitor update failed")
518                                 })?;
519                 }
520                 Ok((block_hash, monitor))
521         }
522
523         /// Read a channel monitor.
524         fn read_monitor(
525                 &self, monitor_name: &MonitorName,
526         ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
527                 let outpoint: OutPoint = monitor_name.try_into()?;
528                 let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
529                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
530                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
531                         monitor_name.as_str(),
532                 )?);
533                 // Discard the sentinel bytes if found.
534                 if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
535                         monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
536                 }
537                 match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
538                         &mut monitor_cursor,
539                         (&*self.entropy_source, &*self.signer_provider),
540                 ) {
541                         Ok((blockhash, channel_monitor)) => {
542                                 if channel_monitor.get_funding_txo().0.txid != outpoint.txid
543                                         || channel_monitor.get_funding_txo().0.index != outpoint.index
544                                 {
545                                         log_error!(
546                                                 self.logger,
547                                                 "ChannelMonitor {} was stored under the wrong key!",
548                                                 monitor_name.as_str()
549                                         );
550                                         Err(io::Error::new(
551                                                 io::ErrorKind::InvalidData,
552                                                 "ChannelMonitor was stored under the wrong key",
553                                         ))
554                                 } else {
555                                         Ok((blockhash, channel_monitor))
556                                 }
557                         }
558                         Err(e) => {
559                                 log_error!(
560                                         self.logger,
561                                         "Failed to read ChannelMonitor {}, reason: {}",
562                                         monitor_name.as_str(),
563                                         e,
564                                 );
565                                 Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
566                         }
567                 }
568         }
569
570         /// Read a channel monitor update.
571         fn read_monitor_update(
572                 &self, monitor_name: &MonitorName, update_name: &UpdateName,
573         ) -> Result<ChannelMonitorUpdate, io::Error> {
574                 let update_bytes = self.kv_store.read(
575                         CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
576                         monitor_name.as_str(),
577                         update_name.as_str(),
578                 )?;
579                 ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| {
580                         log_error!(
581                                 self.logger,
582                                 "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
583                                 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
584                                 monitor_name.as_str(),
585                                 update_name.as_str(),
586                                 e,
587                         );
588                         io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
589                 })
590         }
591
592         /// Cleans up stale updates for all monitors.
593         ///
594         /// This function works by first listing all monitors, and then for each of them, listing all
595         /// updates. The updates that have an `update_id` less than or equal to than the stored monitor
596         /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
597         /// be passed to [`KVStore::remove`].
598         pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
599                 let monitor_keys = self.kv_store.list(
600                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
601                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
602                 )?;
603                 for monitor_key in monitor_keys {
604                         let monitor_name = MonitorName::new(monitor_key)?;
605                         let (_, current_monitor) = self.read_monitor(&monitor_name)?;
606                         let updates = self
607                                 .kv_store
608                                 .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())?;
609                         for update in updates {
610                                 let update_name = UpdateName::new(update)?;
611                                 // if the update_id is lower than the stored monitor, delete
612                                 if update_name.0 <= current_monitor.get_latest_update_id() {
613                                         self.kv_store.remove(
614                                                 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
615                                                 monitor_name.as_str(),
616                                                 update_name.as_str(),
617                                                 lazy,
618                                         )?;
619                                 }
620                         }
621                 }
622                 Ok(())
623         }
624 }
625
626 impl<ChannelSigner: WriteableEcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
627         Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
628 where
629         K::Target: KVStore,
630         L::Target: Logger,
631         ES::Target: EntropySource + Sized,
632         SP::Target: SignerProvider + Sized,
633 {
634         /// Persists a new channel. This means writing the entire monitor to the
635         /// parametrized [`KVStore`].
636         fn persist_new_channel(
637                 &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
638                 _monitor_update_call_id: MonitorUpdateId,
639         ) -> chain::ChannelMonitorUpdateStatus {
640                 // Determine the proper key for this monitor
641                 let monitor_name = MonitorName::from(funding_txo);
642                 // Serialize and write the new monitor
643                 let mut monitor_bytes = Vec::with_capacity(
644                         MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
645                 );
646                 monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
647                 monitor.write(&mut monitor_bytes).unwrap();
648                 match self.kv_store.write(
649                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
650                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
651                         monitor_name.as_str(),
652                         &monitor_bytes,
653                 ) {
654                         Ok(_) => {
655                                 chain::ChannelMonitorUpdateStatus::Completed
656                         }
657                         Err(e) => {
658                                 log_error!(
659                                         self.logger,
660                                         "Failed to write ChannelMonitor {}/{}/{} reason: {}",
661                                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
662                                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
663                                         monitor_name.as_str(),
664                                         e
665                                 );
666                                 chain::ChannelMonitorUpdateStatus::UnrecoverableError
667                         }
668                 }
669         }
670
671         /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible.
672         ///
673         /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]:
674         ///
675         ///   - No full monitor is found in [`KVStore`]
676         ///   - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`]
677         ///   - LDK commands re-persisting the entire monitor through this function, specifically when
678         ///     `update` is `None`.
679         ///   - The update is at [`CLOSED_CHANNEL_UPDATE_ID`]
680         fn update_persisted_channel(
681                 &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
682                 monitor: &ChannelMonitor<ChannelSigner>, monitor_update_call_id: MonitorUpdateId,
683         ) -> chain::ChannelMonitorUpdateStatus {
684                 // IMPORTANT: monitor_update_call_id: MonitorUpdateId is not to be confused with
685                 // ChannelMonitorUpdate's update_id.
686                 if let Some(update) = update {
687                         if update.update_id != CLOSED_CHANNEL_UPDATE_ID
688                                 && update.update_id % self.maximum_pending_updates != 0
689                         {
690                                 let monitor_name = MonitorName::from(funding_txo);
691                                 let update_name = UpdateName::from(update.update_id);
692                                 match self.kv_store.write(
693                                         CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
694                                         monitor_name.as_str(),
695                                         update_name.as_str(),
696                                         &update.encode(),
697                                 ) {
698                                         Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
699                                         Err(e) => {
700                                                 log_error!(
701                                                         self.logger,
702                                                         "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}",
703                                                         CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
704                                                         monitor_name.as_str(),
705                                                         update_name.as_str(),
706                                                         e
707                                                 );
708                                                 chain::ChannelMonitorUpdateStatus::UnrecoverableError
709                                         }
710                                 }
711                         } else {
712                                 let monitor_name = MonitorName::from(funding_txo);
713                                 // In case of channel-close monitor update, we need to read old monitor before persisting
714                                 // the new one in order to determine the cleanup range.
715                                 let maybe_old_monitor = match monitor.get_latest_update_id() {
716                                         CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
717                                         _ => None
718                                 };
719
720                                 // We could write this update, but it meets criteria of our design that calls for a full monitor write.
721                                 let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id);
722
723                                 if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
724                                         let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
725                                                 // If there is an error while reading old monitor, we skip clean up.
726                                                 maybe_old_monitor.map(|(_, ref old_monitor)| {
727                                                         let start = old_monitor.get_latest_update_id();
728                                                         // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
729                                                         let end = cmp::min(
730                                                                 start.saturating_add(self.maximum_pending_updates),
731                                                                 CLOSED_CHANNEL_UPDATE_ID - 1,
732                                                         );
733                                                         (start, end)
734                                                 })
735                                         } else {
736                                                 let end = monitor.get_latest_update_id();
737                                                 let start = end.saturating_sub(self.maximum_pending_updates);
738                                                 Some((start, end))
739                                         };
740
741                                         if let Some((start, end)) = cleanup_range {
742                                                 self.cleanup_in_range(monitor_name, start, end);
743                                         }
744                                 }
745
746                                 monitor_update_status
747                         }
748                 } else {
749                         // There is no update given, so we must persist a new monitor.
750                         self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
751                 }
752         }
753
754         fn archive_persisted_channel(&self, funding_txo: OutPoint) {
755                 let monitor_name = MonitorName::from(funding_txo);
756                 let monitor = match self.read_monitor(&monitor_name) {
757                         Ok((_block_hash, monitor)) => monitor,
758                         Err(_) => return
759                 };
760                 match self.kv_store.write(
761                         ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
762                         ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
763                         monitor_name.as_str(),
764                         &monitor.encode()
765                 ) {
766                         Ok(()) => {},
767                         Err(_e) => return,
768                 };
769                 let _ = self.kv_store.remove(
770                         CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
771                         CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
772                         monitor_name.as_str(),
773                         true,
774                 );
775         }
776 }
777
778 impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
779 where
780         ES::Target: EntropySource + Sized,
781         K::Target: KVStore,
782         L::Target: Logger,
783         SP::Target: SignerProvider + Sized
784 {
785         // Cleans up monitor updates for given monitor in range `start..=end`.
786         fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
787                 for update_id in start..=end {
788                         let update_name = UpdateName::from(update_id);
789                         if let Err(e) = self.kv_store.remove(
790                                 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
791                                 monitor_name.as_str(),
792                                 update_name.as_str(),
793                                 true,
794                         ) {
795                                 log_error!(
796                                         self.logger,
797                                         "Failed to clean up channel monitor updates for monitor {}, reason: {}",
798                                         monitor_name.as_str(),
799                                         e
800                                 );
801                         };
802                 }
803         }
804 }
805
806 /// A struct representing a name for a monitor.
807 #[derive(Debug)]
808 struct MonitorName(String);
809
810 impl MonitorName {
811         /// Constructs a [`MonitorName`], after verifying that an [`OutPoint`] can
812         /// be formed from the given `name`.
813         pub fn new(name: String) -> Result<Self, io::Error> {
814                 MonitorName::do_try_into_outpoint(&name)?;
815                 Ok(Self(name))
816         }
817         /// Convert this monitor name to a str.
818         pub fn as_str(&self) -> &str {
819                 &self.0
820         }
821         /// Attempt to form a valid [`OutPoint`] from a given name string.
822         fn do_try_into_outpoint(name: &str) -> Result<OutPoint, io::Error> {
823                 let mut parts = name.splitn(2, '_');
824                 let txid = if let Some(part) = parts.next() {
825                         Txid::from_str(part).map_err(|_| {
826                                 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
827                         })?
828                 } else {
829                         return Err(io::Error::new(
830                                 io::ErrorKind::InvalidData,
831                                 "Stored monitor key is not a splittable string",
832                         ));
833                 };
834                 let index = if let Some(part) = parts.next() {
835                         part.parse().map_err(|_| {
836                                 io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
837                         })?
838                 } else {
839                         return Err(io::Error::new(
840                                 io::ErrorKind::InvalidData,
841                                 "No tx index value found after underscore in stored key",
842                         ));
843                 };
844                 Ok(OutPoint { txid, index })
845         }
846 }
847
848 impl TryFrom<&MonitorName> for OutPoint {
849         type Error = io::Error;
850
851         fn try_from(value: &MonitorName) -> Result<Self, io::Error> {
852                 MonitorName::do_try_into_outpoint(&value.0)
853         }
854 }
855
856 impl From<OutPoint> for MonitorName {
857         fn from(value: OutPoint) -> Self {
858                 MonitorName(format!("{}_{}", value.txid.to_string(), value.index))
859         }
860 }
861
862 /// A struct representing a name for an update.
863 #[derive(Debug)]
864 struct UpdateName(u64, String);
865
866 impl UpdateName {
867         /// Constructs an [`UpdateName`], after verifying that an update sequence ID
868         /// can be derived from the given `name`.
869         pub fn new(name: String) -> Result<Self, io::Error> {
870                 match name.parse::<u64>() {
871                         Ok(u) => Ok(u.into()),
872                         Err(_) => {
873                                 Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
874                         }
875                 }
876         }
877
878         /// Convert this monitor update name to a &str
879         pub fn as_str(&self) -> &str {
880                 &self.1
881         }
882 }
883
884 impl From<u64> for UpdateName {
885         fn from(value: u64) -> Self {
886                 Self(value, value.to_string())
887         }
888 }
889
890 #[cfg(test)]
891 mod tests {
892         use super::*;
893         use crate::chain::ChannelMonitorUpdateStatus;
894         use crate::events::{ClosureReason, MessageSendEventsProvider};
895         use crate::ln::functional_test_utils::*;
896         use crate::util::test_utils::{self, TestLogger, TestStore};
897         use crate::{check_added_monitors, check_closed_broadcast};
898         use crate::sync::Arc;
899         use crate::util::test_channel_signer::TestChannelSigner;
900
901         const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
902
903         #[test]
904         fn converts_u64_to_update_name() {
905                 assert_eq!(UpdateName::from(0).as_str(), "0");
906                 assert_eq!(UpdateName::from(21).as_str(), "21");
907                 assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
908         }
909
910         #[test]
911         fn bad_update_name_fails() {
912                 assert!(UpdateName::new("deadbeef".to_string()).is_err());
913                 assert!(UpdateName::new("-1".to_string()).is_err());
914         }
915
916         #[test]
917         fn monitor_from_outpoint_works() {
918                 let monitor_name1 = MonitorName::from(OutPoint {
919                         txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
920                         index: 1,
921                 });
922                 assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1");
923
924                 let monitor_name2 = MonitorName::from(OutPoint {
925                         txid: Txid::from_str("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(),
926                         index: u16::MAX,
927                 });
928                 assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535");
929         }
930
931         #[test]
932         fn bad_monitor_string_fails() {
933                 assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err());
934                 assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err());
935                 assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err());
936         }
937
938         // Exercise the `MonitorUpdatingPersister` with real channels and payments.
939         #[test]
940         fn persister_with_real_monitors() {
941                 // This value is used later to limit how many iterations we perform.
942                 let persister_0_max_pending_updates = 7;
943                 // Intentionally set this to a smaller value to test a different alignment.
944                 let persister_1_max_pending_updates = 3;
945                 let chanmon_cfgs = create_chanmon_cfgs(4);
946                 let persister_0 = MonitorUpdatingPersister {
947                         kv_store: &TestStore::new(false),
948                         logger: &TestLogger::new(),
949                         maximum_pending_updates: persister_0_max_pending_updates,
950                         entropy_source: &chanmon_cfgs[0].keys_manager,
951                         signer_provider: &chanmon_cfgs[0].keys_manager,
952                 };
953                 let persister_1 = MonitorUpdatingPersister {
954                         kv_store: &TestStore::new(false),
955                         logger: &TestLogger::new(),
956                         maximum_pending_updates: persister_1_max_pending_updates,
957                         entropy_source: &chanmon_cfgs[1].keys_manager,
958                         signer_provider: &chanmon_cfgs[1].keys_manager,
959                 };
960                 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
961                 let chain_mon_0 = test_utils::TestChainMonitor::new(
962                         Some(&chanmon_cfgs[0].chain_source),
963                         &chanmon_cfgs[0].tx_broadcaster,
964                         &chanmon_cfgs[0].logger,
965                         &chanmon_cfgs[0].fee_estimator,
966                         &persister_0,
967                         &chanmon_cfgs[0].keys_manager,
968                 );
969                 let chain_mon_1 = test_utils::TestChainMonitor::new(
970                         Some(&chanmon_cfgs[1].chain_source),
971                         &chanmon_cfgs[1].tx_broadcaster,
972                         &chanmon_cfgs[1].logger,
973                         &chanmon_cfgs[1].fee_estimator,
974                         &persister_1,
975                         &chanmon_cfgs[1].keys_manager,
976                 );
977                 node_cfgs[0].chain_monitor = chain_mon_0;
978                 node_cfgs[1].chain_monitor = chain_mon_1;
979                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
980                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
981                 let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
982                 let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
983
984                 // Check that the persisted channel data is empty before any channels are
985                 // open.
986                 let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
987                         &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
988                 assert_eq!(persisted_chan_data_0.len(), 0);
989                 let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
990                         &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
991                 assert_eq!(persisted_chan_data_1.len(), 0);
992
993                 // Helper to make sure the channel is on the expected update ID.
994                 macro_rules! check_persisted_data {
995                         ($expected_update_id: expr) => {
996                                 persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
997                                         &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
998                                 // check that we stored only one monitor
999                                 assert_eq!(persisted_chan_data_0.len(), 1);
1000                                 for (_, mon) in persisted_chan_data_0.iter() {
1001                                         // check that when we read it, we got the right update id
1002                                         assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1003
1004                                         // if the CM is at consolidation threshold, ensure no updates are stored.
1005                                         let monitor_name = MonitorName::from(mon.get_funding_txo().0);
1006                                         if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
1007                                                         || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
1008                                                 assert_eq!(
1009                                                         persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1010                                                                 monitor_name.as_str()).unwrap().len(),
1011                                                         0,
1012                                                         "updates stored when they shouldn't be in persister 0"
1013                                                 );
1014                                         }
1015                                 }
1016                                 persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
1017                                         &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
1018                                 assert_eq!(persisted_chan_data_1.len(), 1);
1019                                 for (_, mon) in persisted_chan_data_1.iter() {
1020                                         assert_eq!(mon.get_latest_update_id(), $expected_update_id);
1021                                         let monitor_name = MonitorName::from(mon.get_funding_txo().0);
1022                                         // if the CM is at consolidation threshold, ensure no updates are stored.
1023                                         if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
1024                                                         || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
1025                                                 assert_eq!(
1026                                                         persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1027                                                                 monitor_name.as_str()).unwrap().len(),
1028                                                         0,
1029                                                         "updates stored when they shouldn't be in persister 1"
1030                                                 );
1031                                         }
1032                                 }
1033                         };
1034                 }
1035
1036                 // Create some initial channel and check that a channel was persisted.
1037                 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1038                 check_persisted_data!(0);
1039
1040                 // Send a few payments and make sure the monitors are updated to the latest.
1041                 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1042                 check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
1043                 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1044                 check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
1045
1046                 // Send a few more payments to try all the alignments of max pending updates with
1047                 // updates for a payment sent and received.
1048                 let mut sender = 0;
1049                 for i in 3..=persister_0_max_pending_updates * 2 {
1050                         let receiver;
1051                         if sender == 0 {
1052                                 sender = 1;
1053                                 receiver = 0;
1054                         } else {
1055                                 sender = 0;
1056                                 receiver = 1;
1057                         }
1058                         send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
1059                         check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
1060                 }
1061
1062                 // Force close because cooperative close doesn't result in any persisted
1063                 // updates.
1064                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1065
1066                 check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed, false, &[nodes[1].node.get_our_node_id()], 100000);
1067                 check_closed_broadcast!(nodes[0], true);
1068                 check_added_monitors!(nodes[0], 1);
1069
1070                 let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
1071                 assert_eq!(node_txn.len(), 1);
1072
1073                 connect_block(&nodes[1], &create_dummy_block(nodes[0].best_block_hash(), 42, vec![node_txn[0].clone(), node_txn[0].clone()]));
1074
1075                 check_closed_broadcast!(nodes[1], true);
1076                 check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000);
1077                 check_added_monitors!(nodes[1], 1);
1078
1079                 // Make sure everything is persisted as expected after close.
1080                 check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
1081
1082                 // Make sure the expected number of stale updates is present.
1083                 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1084                 let (_, monitor) = &persisted_chan_data[0];
1085                 let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
1086                 // The channel should have 0 updates, as it wrote a full monitor and consolidated.
1087                 assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
1088                 assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
1089         }
1090
1091         // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
1092         // monitor or update with it results in the persister returning an UnrecoverableError status.
1093         #[test]
1094         fn unrecoverable_error_on_write_failure() {
1095                 // Set up a dummy channel and force close. This will produce a monitor
1096                 // that we can then use to test persistence.
1097                 let chanmon_cfgs = create_chanmon_cfgs(2);
1098                 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1099                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1100                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1101                 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
1102                 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
1103                 check_closed_event(&nodes[1], 1, ClosureReason::HolderForceClosed, false, &[nodes[0].node.get_our_node_id()], 100000);
1104                 {
1105                         let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
1106                         let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
1107                         let update_id = update_map.get(&added_monitors[0].1.channel_id()).unwrap();
1108                         let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
1109                         let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
1110                         let test_txo = OutPoint { txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 };
1111
1112                         let ro_persister = MonitorUpdatingPersister {
1113                                 kv_store: &TestStore::new(true),
1114                                 logger: &TestLogger::new(),
1115                                 maximum_pending_updates: 11,
1116                                 entropy_source: node_cfgs[0].keys_manager,
1117                                 signer_provider: node_cfgs[0].keys_manager,
1118                         };
1119                         match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
1120                                 ChannelMonitorUpdateStatus::UnrecoverableError => {
1121                                         // correct result
1122                                 }
1123                                 ChannelMonitorUpdateStatus::Completed => {
1124                                         panic!("Completed persisting new channel when shouldn't have")
1125                                 }
1126                                 ChannelMonitorUpdateStatus::InProgress => {
1127                                         panic!("Returned InProgress when shouldn't have")
1128                                 }
1129                         }
1130                         match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1, update_id.2) {
1131                                 ChannelMonitorUpdateStatus::UnrecoverableError => {
1132                                         // correct result
1133                                 }
1134                                 ChannelMonitorUpdateStatus::Completed => {
1135                                         panic!("Completed persisting new channel when shouldn't have")
1136                                 }
1137                                 ChannelMonitorUpdateStatus::InProgress => {
1138                                         panic!("Returned InProgress when shouldn't have")
1139                                 }
1140                         }
1141                         added_monitors.clear();
1142                 }
1143                 nodes[1].node.get_and_clear_pending_msg_events();
1144         }
1145
1146         // Confirm that the `clean_stale_updates` function finds and deletes stale updates.
1147         #[test]
1148         fn clean_stale_updates_works() {
1149                 let test_max_pending_updates = 7;
1150                 let chanmon_cfgs = create_chanmon_cfgs(3);
1151                 let persister_0 = MonitorUpdatingPersister {
1152                         kv_store: &TestStore::new(false),
1153                         logger: &TestLogger::new(),
1154                         maximum_pending_updates: test_max_pending_updates,
1155                         entropy_source: &chanmon_cfgs[0].keys_manager,
1156                         signer_provider: &chanmon_cfgs[0].keys_manager,
1157                 };
1158                 let persister_1 = MonitorUpdatingPersister {
1159                         kv_store: &TestStore::new(false),
1160                         logger: &TestLogger::new(),
1161                         maximum_pending_updates: test_max_pending_updates,
1162                         entropy_source: &chanmon_cfgs[1].keys_manager,
1163                         signer_provider: &chanmon_cfgs[1].keys_manager,
1164                 };
1165                 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
1166                 let chain_mon_0 = test_utils::TestChainMonitor::new(
1167                         Some(&chanmon_cfgs[0].chain_source),
1168                         &chanmon_cfgs[0].tx_broadcaster,
1169                         &chanmon_cfgs[0].logger,
1170                         &chanmon_cfgs[0].fee_estimator,
1171                         &persister_0,
1172                         &chanmon_cfgs[0].keys_manager,
1173                 );
1174                 let chain_mon_1 = test_utils::TestChainMonitor::new(
1175                         Some(&chanmon_cfgs[1].chain_source),
1176                         &chanmon_cfgs[1].tx_broadcaster,
1177                         &chanmon_cfgs[1].logger,
1178                         &chanmon_cfgs[1].fee_estimator,
1179                         &persister_1,
1180                         &chanmon_cfgs[1].keys_manager,
1181                 );
1182                 node_cfgs[0].chain_monitor = chain_mon_0;
1183                 node_cfgs[1].chain_monitor = chain_mon_1;
1184                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
1185                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
1186
1187                 let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
1188
1189                 // Check that the persisted channel data is empty before any channels are
1190                 // open.
1191                 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1192                 assert_eq!(persisted_chan_data.len(), 0);
1193
1194                 // Create some initial channel
1195                 let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
1196
1197                 // Send a few payments to advance the updates a bit
1198                 send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
1199                 send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
1200
1201                 // Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
1202                 let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
1203                 let (_, monitor) = &persisted_chan_data[0];
1204                 let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
1205                 persister_0
1206                         .kv_store
1207                         .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1])
1208                         .unwrap();
1209
1210                 // Do the stale update cleanup
1211                 persister_0.cleanup_stale_updates(false).unwrap();
1212
1213                 // Confirm the stale update is unreadable/gone
1214                 assert!(persister_0
1215                         .kv_store
1216                         .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str())
1217                         .is_err());
1218
1219                 // Force close.
1220                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1221                 check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed, false, &[nodes[1].node.get_our_node_id()], 100000);
1222                 check_closed_broadcast!(nodes[0], true);
1223                 check_added_monitors!(nodes[0], 1);
1224
1225                 // Write an update near u64::MAX
1226                 persister_0
1227                         .kv_store
1228                         .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1])
1229                         .unwrap();
1230
1231                 // Do the stale update cleanup
1232                 persister_0.cleanup_stale_updates(false).unwrap();
1233
1234                 // Confirm the stale update is unreadable/gone
1235                 assert!(persister_0
1236                         .kv_store
1237                         .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str())
1238                         .is_err());
1239         }
1240
1241         fn persist_fn<P: Deref, ChannelSigner: WriteableEcdsaChannelSigner>(_persist: P) -> bool where P::Target: Persist<ChannelSigner> {
1242                 true
1243         }
1244
1245         #[test]
1246         fn kvstore_trait_object_usage() {
1247                 let store: Arc<dyn KVStore + Send + Sync> = Arc::new(TestStore::new(false));
1248                 assert!(persist_fn::<_, TestChannelSigner>(store.clone()));
1249         }
1250 }