From: Matt Corallo Date: Thu, 6 Feb 2020 00:39:31 +0000 (-0500) Subject: Add types for updating ChannelMonitors without copying them. X-Git-Tag: v0.0.12~119^2~9 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=3b277cc39427d1f1bdbce4a4c2eea2dee493f9f0;p=rust-lightning Add types for updating ChannelMonitors without copying them. This is the first step in migrating ChannelMonitor updating logic to use incremental Update objects instead of copying the ChannelMonitors themselves and insert_combine()ing them. This adds most of the scaffolding and updates relevant comments to refer to the new architecture, without changing how any actual updates occur. --- diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 18652d221..02de54595 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -121,6 +121,10 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMon ret } + fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + unimplemented!(); //TODO + } + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { return self.simple_monitor.get_and_clear_pending_htlcs_updated(); } diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index a4abd32ad..a3f89de96 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -240,6 +240,8 @@ pub(super) struct Channel { secp_ctx: Secp256k1, channel_value_satoshis: u64, + latest_monitor_update_id: u64, + #[cfg(not(test))] local_keys: ChanSigner, #[cfg(test)] @@ -470,6 +472,8 @@ impl Channel { secp_ctx: secp_ctx, channel_value_satoshis: channel_value_satoshis, + latest_monitor_update_id: 0, + local_keys: chan_keys, shutdown_pubkey: keys_provider.get_shutdown_pubkey(), cur_local_commitment_transaction_number: INITIAL_COMMITMENT_NUMBER, @@ -691,6 +695,8 @@ impl Channel { channel_outbound: false, secp_ctx: secp_ctx, + latest_monitor_update_id: 0, + local_keys: chan_keys, shutdown_pubkey: keys_provider.get_shutdown_pubkey(), cur_local_commitment_transaction_number: INITIAL_COMMITMENT_NUMBER, @@ -2908,6 +2914,10 @@ impl Channel { self.channel_update_count } + pub fn get_latest_monitor_update_id(&self) -> u64 { + self.latest_monitor_update_id + } + pub fn should_announce(&self) -> bool { self.config.announced_channel } @@ -3673,6 +3683,8 @@ impl Writeable for Channel { self.channel_outbound.write(writer)?; self.channel_value_satoshis.write(writer)?; + self.latest_monitor_update_id.write(writer)?; + self.local_keys.write(writer)?; self.shutdown_pubkey.write(writer)?; @@ -3870,6 +3882,8 @@ impl> ReadableArgs> ReadableArgs ChannelManager chan, + None => return, + }; + if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id { + return; + } + + let (raa, commitment_update, order, pending_forwards, mut pending_failures, needs_broadcast_safe, funding_locked) = channel.monitor_updating_restored(); + if !pending_forwards.is_empty() { + htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), pending_forwards)); + } + htlc_failures.append(&mut pending_failures); + + macro_rules! handle_cs { () => { + if let Some(update) = commitment_update { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: channel.get_their_node_id(), + updates: update, + }); + } + } } + macro_rules! handle_raa { () => { + if let Some(revoke_and_ack) = raa { + pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { + node_id: channel.get_their_node_id(), + msg: revoke_and_ack, + }); + } + } } + match order { + RAACommitmentOrder::CommitmentFirst => { + handle_cs!(); + handle_raa!(); + }, + RAACommitmentOrder::RevokeAndACKFirst => { + handle_raa!(); + handle_cs!(); + }, + } + if needs_broadcast_safe { + pending_events.push(events::Event::FundingBroadcastSafe { + funding_txo: channel.get_funding_txo().unwrap(), + user_channel_id: channel.get_user_id(), + }); + } + if let Some(msg) = funding_locked { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { + node_id: channel.get_their_node_id(), + msg, + }); + if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_their_node_id(), + msg: announcement_sigs, + }); + } + short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); + } + } + + self.pending_events.lock().unwrap().append(&mut pending_events); + + for failure in htlc_failures.drain(..) { + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); + } + self.forward_htlcs(&mut htlc_forwards[..]); + + for res in close_results.drain(..) { + self.finish_force_close_channel(res); + } + } + /// Used to restore channels to normal operation after a /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update /// operation. @@ -3351,7 +3457,8 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref, T if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { if channel.get_cur_local_commitment_transaction_number() != monitor.get_cur_local_commitment_number() || channel.get_revoked_remote_commitment_transaction_number() != monitor.get_min_seen_secret() || - channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() { + channel.get_cur_remote_commitment_transaction_number() != monitor.get_cur_remote_commitment_number() || + channel.get_latest_monitor_update_id() != monitor.get_latest_update_id() { let mut force_close_res = channel.force_shutdown(); force_close_res.0 = monitor.get_latest_local_commitment_txn(); closed_channels.push(force_close_res); diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index f65b5e7f1..f01d33f70 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -45,6 +45,45 @@ use std::sync::{Arc,Mutex}; use std::{hash,cmp, mem}; use std::ops::Deref; +/// An update generated by the underlying Channel itself which contains some new information the +/// ChannelMonitor should be made aware of. +#[cfg_attr(test, derive(PartialEq))] +#[derive(Clone)] +#[must_use] +pub struct ChannelMonitorUpdate { + pub(super) updates: Vec, + /// The sequence number of this update. Updates *must* be replayed in-order according to this + /// sequence number (and updates may panic if they are not). The update_id values are strictly + /// increasing and increase by one for each new update. + /// + /// This sequence number is also used to track up to which points updates which returned + /// ChannelMonitorUpdateErr::TemporaryFailure have been applied to all copies of a given + /// ChannelMonitor when ChannelManager::channel_monitor_updated is called. + pub update_id: u64, +} + +impl Writeable for ChannelMonitorUpdate { + fn write(&self, w: &mut W) -> Result<(), ::std::io::Error> { + self.update_id.write(w)?; + (self.updates.len() as u64).write(w)?; + for update_step in self.updates.iter() { + update_step.write(w)?; + } + Ok(()) + } +} +impl Readable for ChannelMonitorUpdate { + fn read(r: &mut R) -> Result { + let update_id: u64 = Readable::read(r)?; + let len: u64 = Readable::read(r)?; + let mut updates = Vec::with_capacity(cmp::min(len as usize, MAX_ALLOC_SIZE / ::std::mem::size_of::())); + for _ in 0..len { + updates.push(Readable::read(r)?); + } + Ok(Self { update_id, updates }) + } +} + /// An error enum representing a failure to persist a channel monitor update. #[derive(Clone)] pub enum ChannelMonitorUpdateErr { @@ -52,13 +91,13 @@ pub enum ChannelMonitorUpdateErr { /// our state failed, but is expected to succeed at some point in the future). /// /// Such a failure will "freeze" a channel, preventing us from revoking old states or - /// submitting new commitment transactions to the remote party. - /// ChannelManager::test_restore_channel_monitor can be used to retry the update(s) and restore - /// the channel to an operational state. + /// submitting new commitment transactions to the remote party. Once the update(s) which failed + /// have been successfully applied, ChannelManager::channel_monitor_updated can be used to + /// restore the channel to an operational state. /// - /// Note that continuing to operate when no copy of the updated ChannelMonitor could be - /// persisted is unsafe - if you failed to store the update on your own local disk you should - /// instead return PermanentFailure to force closure of the channel ASAP. + /// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If + /// you return a TemporaryFailure you must ensure that it is written to disk safely before + /// writing out the latest ChannelManager state. /// /// Even when a channel has been "frozen" updates to the ChannelMonitor can continue to occur /// (eg if an inbound HTLC which we forwarded was claimed upstream resulting in us attempting @@ -69,8 +108,15 @@ pub enum ChannelMonitorUpdateErr { /// been "frozen". /// /// Note that even if updates made after TemporaryFailure succeed you must still call - /// test_restore_channel_monitor to ensure you have the latest monitor and re-enable normal - /// channel operation. + /// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel + /// operation. + /// + /// Note that the update being processed here will not be replayed for you when you call + /// ChannelManager::channel_monitor_updated, so you must store the update itself along + /// with the persisted ChannelMonitor on your own local disk prior to returning a + /// TemporaryFailure. You may, of course, employ a journaling approach, storing only the + /// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at + /// reload-time. /// /// For deployments where a copy of ChannelMonitors and other local state are backed up in a /// remote location (with local copies persisted immediately), it is anticipated that all @@ -129,9 +175,23 @@ pub trait ManyChannelMonitor: Send + Sync { /// any spends of any of the outputs. /// /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in funds loss. + /// ChannelMonitors via block_connected may result in FUNDS LOSS. fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + /// Updates a monitor for the given `funding_txo`. + /// + /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with + /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected + /// callbacks with the funding transaction, or any spends of it. + /// + /// Further, the implementer must also ensure that each output returned in + /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about + /// any spends of any of the outputs. + /// + /// Any spends of outputs which should have been registered which aren't passed to + /// ChannelMonitors via block_connected may result in FUNDS LOSS. + fn update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr>; + /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated /// with success or failure. /// @@ -253,6 +313,18 @@ impl Result<(), MonitorUpdateError> { + let mut monitors = self.monitors.lock().unwrap(); + match monitors.get_mut(&key) { + Some(orig_monitor) => { + log_trace!(self, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor.key_storage)); + orig_monitor.update_monitor(update) + }, + None => Err(MonitorUpdateError("No such monitor registered")) + } + } } impl ManyChannelMonitor for SimpleManyChannelMonitor @@ -265,6 +337,13 @@ impl ManyChannelMonitor Result<(), ChannelMonitorUpdateErr> { + match self.update_monitor_by_key(funding_txo, update) { + Ok(_) => Ok(()), + Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), + } + } + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { let mut pending_htlcs_updated = Vec::new(); for chan in self.monitors.lock().unwrap().values_mut() { @@ -569,6 +648,22 @@ impl Readable for ClaimTxBumpMaterial { const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; +#[cfg_attr(test, derive(PartialEq))] +#[derive(Clone)] +pub(super) enum ChannelMonitorUpdateStep { +} + +impl Writeable for ChannelMonitorUpdateStep { + fn write(&self, _w: &mut W) -> Result<(), ::std::io::Error> { + Ok(()) + } +} +impl Readable for ChannelMonitorUpdateStep { + fn read(_r: &mut R) -> Result { + unimplemented!() // We don't have any enum variants to read (and never provide Monitor Updates) + } +} + /// A ChannelMonitor handles chain events (blocks connected and disconnected) and generates /// on-chain transactions to ensure no loss of funds occurs. /// @@ -576,6 +671,7 @@ const MIN_SERIALIZATION_VERSION: u8 = 1; /// information and are actively monitoring the chain. #[derive(Clone)] pub struct ChannelMonitor { + latest_update_id: u64, commitment_transaction_number_obscure_factor: u64, key_storage: Storage, @@ -711,7 +807,8 @@ macro_rules! subtract_high_prio_fee { /// underlying object impl PartialEq for ChannelMonitor { fn eq(&self, other: &Self) -> bool { - if self.commitment_transaction_number_obscure_factor != other.commitment_transaction_number_obscure_factor || + if self.latest_update_id != other.latest_update_id || + self.commitment_transaction_number_obscure_factor != other.commitment_transaction_number_obscure_factor || self.key_storage != other.key_storage || self.their_htlc_base_key != other.their_htlc_base_key || self.their_delayed_payment_base_key != other.their_delayed_payment_base_key || @@ -751,6 +848,8 @@ impl ChannelMonitor { writer.write_all(&[SERIALIZATION_VERSION; 1])?; writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + self.latest_update_id.write(writer)?; + // Set in initial Channel-object creation, so should always be set by now: U48(self.commitment_transaction_number_obscure_factor).write(writer)?; @@ -999,6 +1098,7 @@ impl ChannelMonitor { impl ChannelMonitor { pub(super) fn new(keys: ChanSigner, funding_key: &SecretKey, revocation_base_key: &SecretKey, delayed_payment_base_key: &SecretKey, htlc_base_key: &SecretKey, payment_base_key: &SecretKey, shutdown_pubkey: &PublicKey, our_to_self_delay: u16, destination_script: Script, logger: Arc) -> ChannelMonitor { ChannelMonitor { + latest_update_id: 0, commitment_transaction_number_obscure_factor: 0, key_storage: Storage::Local { @@ -1229,6 +1329,22 @@ impl ChannelMonitor { self.payment_preimages.insert(payment_hash.clone(), payment_preimage.clone()); } + /// Updates a ChannelMonitor on the basis of some new information provided by the Channel + /// itself. + /// + /// panics if the given update is not the next update by update_id. + pub fn update_monitor(&mut self, mut updates: ChannelMonitorUpdate) -> Result<(), MonitorUpdateError> { + if self.latest_update_id + 1 != updates.update_id { + panic!("Attempted to apply ChannelMonitorUpdates out of order, check the update_id before passing an update to update_monitor!"); + } + for update in updates.updates.drain(..) { + match update { + } + } + self.latest_update_id = updates.update_id; + Ok(()) + } + /// Combines this ChannelMonitor with the information contained in the other ChannelMonitor. /// After a successful call this ChannelMonitor is up-to-date and is safe to use to monitor the /// chain for new blocks/transactions. @@ -1320,6 +1436,12 @@ impl ChannelMonitor { self.commitment_transaction_number_obscure_factor = commitment_transaction_number_obscure_factor; } + /// Gets the update_id from the latest ChannelMonitorUpdate which was applied to this + /// ChannelMonitor. + pub fn get_latest_update_id(&self) -> u64 { + self.latest_update_id + } + /// Gets the funding transaction outpoint of the channel this ChannelMonitor is monitoring for. pub fn get_funding_txo(&self) -> Option { match self.key_storage { @@ -2981,6 +3103,7 @@ impl> ReadableArgs>::read(reader)?.0; let key_storage = match >::read(reader)? { @@ -3241,6 +3364,7 @@ impl> ReadableArgs channelmonitor::ManyChannelMonitor for TestChanne self.update_ret.lock().unwrap().clone() } + fn update_monitor(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + // Every monitor update should survive roundtrip + let mut w = TestVecWriter(Vec::new()); + update.write(&mut w).unwrap(); + assert!(channelmonitor::ChannelMonitorUpdate::read( + &mut ::std::io::Cursor::new(&w.0)).unwrap() == update); + + assert!(self.simple_monitor.update_monitor(funding_txo, update).is_ok()); + // At every point where we get a monitor update, we should be able to send a useful monitor + // to a watchtower and disk... + let monitors = self.simple_monitor.monitors.lock().unwrap(); + let monitor = monitors.get(&funding_txo).unwrap(); + w.0.clear(); + monitor.write_for_disk(&mut w).unwrap(); + assert!(<(Sha256dHash, channelmonitor::ChannelMonitor)>::read( + &mut ::std::io::Cursor::new(&w.0), Arc::new(TestLogger::new())).unwrap().1 == *monitor); + w.0.clear(); + monitor.write_for_watchtower(&mut w).unwrap(); // This at least shouldn't crash... + self.added_monitors.lock().unwrap().push((funding_txo, monitor.clone())); + self.update_ret.lock().unwrap().clone() + } + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { return self.simple_monitor.get_and_clear_pending_htlcs_updated(); }