Support serializing TLV fields which may or may not be present
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 9f9820c1dbc083218c66efce32348f7f3c1da075..b1416c30f53becd5450306dcb3daa64db02181f0 100644 (file)
@@ -54,22 +54,23 @@ use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
 use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
 use util::config::UserConfig;
-use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 use util::{byte_utils, events};
 use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::Logger;
 use util::errors::APIError;
 
-use std::{cmp, mem};
+use core::{cmp, mem};
+use std::cell::RefCell;
 use std::collections::{HashMap, hash_map, HashSet};
 use std::io::{Cursor, Read};
 use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::time::Duration;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::time::Duration;
 #[cfg(any(test, feature = "allow_wallclock_use"))]
 use std::time::Instant;
-use std::ops::Deref;
+use core::ops::Deref;
 use bitcoin::hashes::hex::ToHex;
 
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
@@ -1860,6 +1861,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Note that this includes RBF or similar transaction replacement strategies - lightning does
        /// not currently support replacing a funding transaction on an existing channel. Instead,
        /// create a new channel with a conflicting funding transaction.
+       ///
+       /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
        pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
@@ -1923,7 +1926,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        // be absurd. We ensure this by checking that at least 500 (our stated public contract on when
        // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB
        // message...
-       const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
+       const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2;
        #[deny(const_err)]
        #[allow(dead_code)]
        // ...by failing to compile if the number of addresses that would be half of a message is
@@ -3449,60 +3452,66 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       /// Process pending events from the `chain::Watch`.
-       fn process_pending_monitor_events(&self) {
+       /// Process pending events from the `chain::Watch`, returning whether any events were processed.
+       fn process_pending_monitor_events(&self) -> bool {
                let mut failed_channels = Vec::new();
-               {
-                       for monitor_event in self.chain_monitor.release_pending_monitor_events() {
-                               match monitor_event {
-                                       MonitorEvent::HTLCEvent(htlc_update) => {
-                                               if let Some(preimage) = htlc_update.payment_preimage {
-                                                       log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
-                                                       self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
-                                               } else {
-                                                       log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
-                                                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+               let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
+               let has_pending_monitor_events = !pending_monitor_events.is_empty();
+               for monitor_event in pending_monitor_events {
+                       match monitor_event {
+                               MonitorEvent::HTLCEvent(htlc_update) => {
+                                       if let Some(preimage) = htlc_update.payment_preimage {
+                                               log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+                                               self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
+                                       } else {
+                                               log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+                                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+                                       }
+                               },
+                               MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
+                                       let mut channel_lock = self.channel_state.lock().unwrap();
+                                       let channel_state = &mut *channel_lock;
+                                       let by_id = &mut channel_state.by_id;
+                                       let short_to_id = &mut channel_state.short_to_id;
+                                       let pending_msg_events = &mut channel_state.pending_msg_events;
+                                       if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
+                                               if let Some(short_id) = chan.get_short_channel_id() {
+                                                       short_to_id.remove(&short_id);
                                                }
-                                       },
-                                       MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
-                                               let mut channel_lock = self.channel_state.lock().unwrap();
-                                               let channel_state = &mut *channel_lock;
-                                               let by_id = &mut channel_state.by_id;
-                                               let short_to_id = &mut channel_state.short_to_id;
-                                               let pending_msg_events = &mut channel_state.pending_msg_events;
-                                               if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
-                                                       if let Some(short_id) = chan.get_short_channel_id() {
-                                                               short_to_id.remove(&short_id);
-                                                       }
-                                                       failed_channels.push(chan.force_shutdown(false));
-                                                       if let Ok(update) = self.get_channel_update(&chan) {
-                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                                       msg: update
-                                                               });
-                                                       }
-                                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
-                                                               node_id: chan.get_counterparty_node_id(),
-                                                               action: msgs::ErrorAction::SendErrorMessage {
-                                                                       msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
-                                                               },
+                                               failed_channels.push(chan.force_shutdown(false));
+                                               if let Ok(update) = self.get_channel_update(&chan) {
+                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                               msg: update
                                                        });
                                                }
-                                       },
-                               }
+                                               pending_msg_events.push(events::MessageSendEvent::HandleError {
+                                                       node_id: chan.get_counterparty_node_id(),
+                                                       action: msgs::ErrorAction::SendErrorMessage {
+                                                               msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+                                                       },
+                                               });
+                                       }
+                               },
                        }
                }
 
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
+
+               has_pending_monitor_events
        }
 
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
+       /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
+       /// update was applied.
+       ///
        /// This should only apply to HTLCs which were added to the holding cell because we were
        /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
        /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
        /// code to inform them of a channel monitor update.
-       fn check_free_holding_cells(&self) {
+       fn check_free_holding_cells(&self) -> bool {
+               let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
                {
@@ -3514,11 +3523,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                        by_id.retain(|channel_id, chan| {
                                match chan.maybe_free_holding_cell_htlcs(&self.logger) {
-                                       Ok((None, ref htlcs)) if htlcs.is_empty() => true,
                                        Ok((commitment_opt, holding_cell_failed_htlcs)) => {
-                                               failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+                                               if !holding_cell_failed_htlcs.is_empty() {
+                                                       failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+                                               }
                                                if let Some((commitment_update, monitor_update)) = commitment_opt {
                                                        if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+                                                               has_monitor_update = true;
                                                                let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
                                                                handle_errors.push((chan.get_counterparty_node_id(), res));
                                                                if close_channel { return false; }
@@ -3539,6 +3550,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                }
                        });
                }
+
+               let has_update = has_monitor_update || !failed_htlcs.is_empty();
                for (failures, channel_id) in failed_htlcs.drain(..) {
                        self.fail_holding_cell_htlcs(failures, channel_id);
                }
@@ -3546,6 +3559,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                for (counterparty_node_id, err) in handle_errors.drain(..) {
                        let _ = handle_error!(self, err, counterparty_node_id);
                }
+
+               has_update
        }
 
        /// Handle a list of channel failures during a block_connected or block_disconnected call,
@@ -3670,6 +3685,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
                self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
        }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               let events = std::cell::RefCell::new(Vec::new());
+               let event_handler = |event| events.borrow_mut().push(event);
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -3680,35 +3703,71 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
                                L::Target: Logger,
 {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
-               //TODO: This behavior should be documented. It's non-intuitive that we query
-               // ChannelMonitors when clearing other events.
-               self.process_pending_monitor_events();
+               let events = RefCell::new(Vec::new());
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut result = NotifyOption::SkipPersist;
+
+                       // TODO: This behavior should be documented. It's unintuitive that we query
+                       // ChannelMonitors when clearing other events.
+                       if self.process_pending_monitor_events() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               self.check_free_holding_cells();
+                       if self.check_free_holding_cells() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               let mut ret = Vec::new();
-               let mut channel_state = self.channel_state.lock().unwrap();
-               mem::swap(&mut ret, &mut channel_state.pending_msg_events);
-               ret
+                       let mut pending_events = Vec::new();
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       mem::swap(&mut pending_events, &mut channel_state.pending_msg_events);
+
+                       if !pending_events.is_empty() {
+                               events.replace(pending_events);
+                       }
+
+                       result
+               });
+               events.into_inner()
        }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
-       where M::Target: chain::Watch<Signer>,
-        T::Target: BroadcasterInterface,
-        K::Target: KeysInterface<Signer = Signer>,
-        F::Target: FeeEstimator,
-                               L::Target: Logger,
+where
+       M::Target: chain::Watch<Signer>,
+       T::Target: BroadcasterInterface,
+       K::Target: KeysInterface<Signer = Signer>,
+       F::Target: FeeEstimator,
+       L::Target: Logger,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               //TODO: This behavior should be documented. It's non-intuitive that we query
-               // ChannelMonitors when clearing other events.
-               self.process_pending_monitor_events();
+       /// Processes events that must be periodically handled.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider in order to handle an event.
+       /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
+       ///
+       /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
+       /// when processed, an [`EventHandler`] must be able to handle previously seen events when
+       /// restarting from an old state.
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+               PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
+                       let mut result = NotifyOption::SkipPersist;
 
-               let mut ret = Vec::new();
-               let mut pending_events = self.pending_events.lock().unwrap();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
+                       // TODO: This behavior should be documented. It's unintuitive that we query
+                       // ChannelMonitors when clearing other events.
+                       if self.process_pending_monitor_events() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+                       if !pending_events.is_empty() {
+                               result = NotifyOption::DoPersist;
+                       }
+
+                       for event in pending_events.drain(..) {
+                               handler.handle_event(event);
+                       }
+
+                       result
+               });
        }
 }
 
@@ -4509,8 +4568,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                let _consistency_lock = self.total_consistency_lock.write().unwrap();
 
-               writer.write_all(&[SERIALIZATION_VERSION; 1])?;
-               writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
+               write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
 
                self.genesis_hash.write(writer)?;
                {
@@ -4593,6 +4651,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                        session_priv.write(writer)?;
                }
 
+               write_tlv_fields!(writer, {}, {});
+
                Ok(())
        }
 }
@@ -4716,11 +4776,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
         L::Target: Logger,
 {
        fn read<R: ::std::io::Read>(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result<Self, DecodeError> {
-               let _ver: u8 = Readable::read(reader)?;
-               let min_ver: u8 = Readable::read(reader)?;
-               if min_ver > SERIALIZATION_VERSION {
-                       return Err(DecodeError::UnknownVersion);
-               }
+               let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
 
                let genesis_hash: BlockHash = Readable::read(reader)?;
                let best_block_height: u32 = Readable::read(reader)?;
@@ -4840,6 +4896,8 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        }
                }
 
+               read_tlv_fields!(reader, {}, {});
+
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
 
@@ -4895,9 +4953,9 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
 mod tests {
        use ln::channelmanager::PersistenceNotifier;
        use std::sync::Arc;
-       use std::sync::atomic::{AtomicBool, Ordering};
+       use core::sync::atomic::{AtomicBool, Ordering};
        use std::thread;
-       use std::time::Duration;
+       use core::time::Duration;
 
        #[test]
        fn test_wait_timeout() {
@@ -4956,7 +5014,7 @@ pub mod bench {
        use routing::router::get_route;
        use util::test_utils;
        use util::config::UserConfig;
-       use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+       use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
 
        use bitcoin::hashes::Hash;
        use bitcoin::hashes::sha256::Hash as Sha256;