X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=a9e714cea7bf245b9bc536f87221ffbcfa57a183;hb=6d98aedaf5dba6565839aa1b806ed730e0c7935c;hp=9445f518c54a7d82e6d83e09b6c45f19cc3c37fb;hpb=b9a1db5ad64e45c4440e1dcfae8081f781ff987a;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9445f518..a9e714ce 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,22 +54,23 @@ use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField}; use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner}; use util::config::UserConfig; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer}; use util::chacha20::{ChaCha20, ChaChaReader}; use util::logger::Logger; use util::errors::APIError; -use std::{cmp, mem}; -use std::collections::{HashMap, hash_map, HashSet}; +use prelude::*; +use core::{cmp, mem}; +use core::cell::RefCell; use std::io::{Cursor, Read}; use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::time::Duration; #[cfg(any(test, feature = "allow_wallclock_use"))] use std::time::Instant; -use std::ops::Deref; +use core::ops::Deref; use bitcoin::hashes::hex::ToHex; // We hold various information about HTLC relay in the HTLC objects in Channel itself: @@ -496,6 +497,7 @@ pub struct ChannelManager { - log_trace!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg); + log_error!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg); if let Some(short_id) = $channel.get_short_channel_id() { $short_to_id.remove(&short_id); } @@ -1241,7 +1243,7 @@ impl ChannelMana #[inline] fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { let (monitor_update_option, mut failed_htlcs) = shutdown_res; - log_trace!(self.logger, "Finishing force-closure of channel {} HTLCs to fail", failed_htlcs.len()); + log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len()); for htlc_source in failed_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } @@ -1272,7 +1274,7 @@ impl ChannelMana return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}); } }; - log_trace!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); + log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.finish_force_close_channel(chan.force_shutdown(true)); if let Ok(update) = self.get_channel_update(&chan) { let mut channel_state = self.channel_state.lock().unwrap(); @@ -1669,6 +1671,7 @@ impl ChannelMana return Err(APIError::MonitorUpdateFailed); } + log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan.get().channel_id())); channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: path.first().unwrap().pubkey, updates: msgs::CommitmentUpdate { @@ -1860,6 +1863,8 @@ impl ChannelMana /// Note that this includes RBF or similar transaction replacement strategies - lightning does /// not currently support replacing a funding transaction on an existing channel. Instead, /// create a new channel with a conflicting funding transaction. + /// + /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@ -1923,26 +1928,31 @@ impl ChannelMana // be absurd. We ensure this by checking that at least 500 (our stated public contract on when // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB // message... - const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2; + const HALF_MESSAGE_IS_ADDRS: u32 = ::core::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2; #[deny(const_err)] #[allow(dead_code)] // ...by failing to compile if the number of addresses that would be half of a message is // smaller than 500: const STATIC_ASSERT: u32 = Self::HALF_MESSAGE_IS_ADDRS - 500; - /// Generates a signed node_announcement from the given arguments and creates a - /// BroadcastNodeAnnouncement event. Note that such messages will be ignored unless peers have - /// seen a channel_announcement from us (ie unless we have public channels open). + /// Regenerates channel_announcements and generates a signed node_announcement from the given + /// arguments, providing them in corresponding events via + /// [`get_and_clear_pending_msg_events`], if at least one public channel has been confirmed + /// on-chain. This effectively re-broadcasts all channel announcements and sends our node + /// announcement to ensure that the lightning P2P network is aware of the channels we have and + /// our network addresses. + /// + /// `rgb` is a node "color" and `alias` is a printable human-readable string to describe this + /// node to humans. They carry no in-protocol meaning. /// - /// RGB is a node "color" and alias is a printable human-readable string to describe this node - /// to humans. They carry no in-protocol meaning. + /// `addresses` represent the set (possibly empty) of socket addresses on which this node + /// accepts incoming connections. These will be included in the node_announcement, publicly + /// tying these addresses together and to this node. If you wish to preserve user privacy, + /// addresses should likely contain only Tor Onion addresses. /// - /// addresses represent the set (possibly empty) of socket addresses on which this node accepts - /// incoming connections. These will be broadcast to the network, publicly tying these - /// addresses together. If you wish to preserve user privacy, addresses should likely contain - /// only Tor Onion addresses. + /// Panics if `addresses` is absurdly large (more than 500). /// - /// Panics if addresses is absurdly large (more than 500). + /// [`get_and_clear_pending_msg_events`]: MessageSendEventsProvider::get_and_clear_pending_msg_events pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@ -1963,14 +1973,37 @@ impl ChannelMana excess_data: Vec::new(), }; let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]); + let node_announce_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement { - msg: msgs::NodeAnnouncement { - signature: self.secp_ctx.sign(&msghash, &self.our_network_key), - contents: announcement - }, - }); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + + let mut announced_chans = false; + for (_, chan) in channel_state.by_id.iter() { + if let Some(msg) = chan.get_signed_channel_announcement(&self.our_network_key, self.get_our_node_id(), self.genesis_hash.clone()) { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg, + update_msg: match self.get_channel_update(chan) { + Ok(msg) => msg, + Err(_) => continue, + }, + }); + announced_chans = true; + } else { + // If the channel is not public or has not yet reached funding_locked, check the + // next channel. If we don't yet have any public channels, we'll skip the broadcast + // below as peers may not accept it without channels on chain first. + } + } + + if announced_chans { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement { + msg: msgs::NodeAnnouncement { + signature: node_announce_sig, + contents: announcement + }, + }); + } } /// Processes HTLCs which are pending waiting on random forward delay. @@ -2028,7 +2061,7 @@ impl ChannelMana onion_packet, .. }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value }, prev_funding_outpoint } => { - log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", log_bytes!(payment_hash.0), prev_short_channel_id, short_chan_id); + log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, log_bytes!(payment_hash.0), short_chan_id); let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: prev_short_channel_id, outpoint: prev_funding_outpoint, @@ -2068,11 +2101,11 @@ impl ChannelMana panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); }, HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { - log_trace!(self.logger, "Failing HTLC back to channel with short id {} after delay", short_chan_id); - match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet) { + log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); + match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) { Err(e) => { if let ChannelError::Ignore(msg) = e { - log_trace!(self.logger, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg); + log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); } else { panic!("Stated return value requirements in get_update_fail_htlc() were not met"); } @@ -2126,6 +2159,8 @@ impl ChannelMana handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true))); continue; } + log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}", + add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id())); channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { @@ -2289,7 +2324,8 @@ impl ChannelMana } #[cfg(any(test, feature = "_test_utils"))] - pub(crate) fn test_process_background_events(&self) { + /// Process background events, for functional testing + pub fn test_process_background_events(&self) { self.process_background_events(); } @@ -2631,6 +2667,8 @@ impl ChannelMana } } if let Some((msg, commitment_signed)) = msgs { + log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", + log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { @@ -2894,7 +2932,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - try_chan_entry!(self, chan.get_mut().funding_locked(&msg), channel_state, chan); + try_chan_entry!(self, chan.get_mut().funding_locked(&msg, &self.logger), channel_state, chan); if let Some(announcement_sigs) = self.get_announcement_sigs(chan.get()) { log_trace!(self.logger, "Sending announcement_signatures for {} in response to funding_locked", log_bytes!(chan.get().channel_id())); // If we see locking block before receiving remote funding_locked, we broadcast our @@ -3298,39 +3336,8 @@ impl ChannelMana return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError})); } - let our_node_id = self.get_our_node_id(); - let (announcement, our_bitcoin_sig) = - try_chan_entry!(self, chan.get_mut().get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()), channel_state, chan); - - let were_node_one = announcement.node_id_1 == our_node_id; - let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]); - { - let their_node_key = if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }; - let their_bitcoin_key = if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }; - match (self.secp_ctx.verify(&msghash, &msg.node_signature, their_node_key), - self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, their_bitcoin_key)) { - (Err(e), _) => { - let chan_err: ChannelError = ChannelError::Close(format!("Bad announcement_signatures. Failed to verify node_signature: {:?}. Maybe using different node_secret for transport and routing msg? UnsignedChannelAnnouncement used for verification is {:?}. their_node_key is {:?}", e, &announcement, their_node_key)); - try_chan_entry!(self, Err(chan_err), channel_state, chan); - }, - (_, Err(e)) => { - let chan_err: ChannelError = ChannelError::Close(format!("Bad announcement_signatures. Failed to verify bitcoin_signature: {:?}. UnsignedChannelAnnouncement used for verification is {:?}. their_bitcoin_key is ({:?})", e, &announcement, their_bitcoin_key)); - try_chan_entry!(self, Err(chan_err), channel_state, chan); - }, - _ => {} - } - } - - let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { - msg: msgs::ChannelAnnouncement { - node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature }, - node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig }, - bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature }, - bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig }, - contents: announcement, - }, + msg: try_chan_entry!(self, chan.get_mut().announcement_signatures(&self.our_network_key, self.get_our_node_id(), self.genesis_hash.clone(), msg), channel_state, chan), update_msg: self.get_channel_update(chan.get()).unwrap(), // can only fail if we're not in a ready state }); }, @@ -3352,8 +3359,13 @@ impl ChannelMana match channel_state.by_id.entry(chan_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_counterparty_node_id() != *counterparty_node_id { - // TODO: see issue #153, need a consistent behavior on obnoxious behavior from random node - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), chan_id)); + if chan.get().should_announce() { + // If the announcement is about a channel of ours which is public, some + // other peer may simply be forwarding all its gossip to us. Don't provide + // a scary-looking error message and return Ok instead. + return Ok(()); + } + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); } try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan); }, @@ -3426,6 +3438,7 @@ impl ChannelMana if let Err(_e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { unimplemented!(); } + log_debug!(self.logger, "Updating fee resulted in a commitment_signed for channel {}", log_bytes!(chan.get().channel_id())); channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { @@ -3449,60 +3462,66 @@ impl ChannelMana } } - /// Process pending events from the `chain::Watch`. - fn process_pending_monitor_events(&self) { + /// Process pending events from the `chain::Watch`, returning whether any events were processed. + fn process_pending_monitor_events(&self) -> bool { let mut failed_channels = Vec::new(); - { - for monitor_event in self.chain_monitor.release_pending_monitor_events() { - match monitor_event { - MonitorEvent::HTLCEvent(htlc_update) => { - if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); - self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); - } else { - log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0)); - self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); + let pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); + let has_pending_monitor_events = !pending_monitor_events.is_empty(); + for monitor_event in pending_monitor_events { + match monitor_event { + MonitorEvent::HTLCEvent(htlc_update) => { + if let Some(preimage) = htlc_update.payment_preimage { + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); + self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); + } else { + log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0)); + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); + } + }, + MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => { + let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_lock; + let by_id = &mut channel_state.by_id; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; + if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) { + if let Some(short_id) = chan.get_short_channel_id() { + short_to_id.remove(&short_id); } - }, - MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - let by_id = &mut channel_state.by_id; - let short_to_id = &mut channel_state.short_to_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) { - if let Some(short_id) = chan.get_short_channel_id() { - short_to_id.remove(&short_id); - } - failed_channels.push(chan.force_shutdown(false)); - if let Ok(update) = self.get_channel_update(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: chan.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } - }, + failed_channels.push(chan.force_shutdown(false)); + if let Ok(update) = self.get_channel_update(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update }); } - }, - } + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } + }, + }); + } + }, } } for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + has_pending_monitor_events } /// Check the holding cell in each channel and free any pending HTLCs in them if possible. + /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor + /// update was applied. + /// /// This should only apply to HTLCs which were added to the holding cell because we were /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user /// code to inform them of a channel monitor update. - fn check_free_holding_cells(&self) { + fn check_free_holding_cells(&self) -> bool { + let mut has_monitor_update = false; let mut failed_htlcs = Vec::new(); let mut handle_errors = Vec::new(); { @@ -3514,11 +3533,13 @@ impl ChannelMana by_id.retain(|channel_id, chan| { match chan.maybe_free_holding_cell_htlcs(&self.logger) { - Ok((None, ref htlcs)) if htlcs.is_empty() => true, Ok((commitment_opt, holding_cell_failed_htlcs)) => { - failed_htlcs.push((holding_cell_failed_htlcs, *channel_id)); + if !holding_cell_failed_htlcs.is_empty() { + failed_htlcs.push((holding_cell_failed_htlcs, *channel_id)); + } if let Some((commitment_update, monitor_update)) = commitment_opt { if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { + has_monitor_update = true; let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id); handle_errors.push((chan.get_counterparty_node_id(), res)); if close_channel { return false; } @@ -3539,6 +3560,8 @@ impl ChannelMana } }); } + + let has_update = has_monitor_update || !failed_htlcs.is_empty(); for (failures, channel_id) in failed_htlcs.drain(..) { self.fail_holding_cell_htlcs(failures, channel_id); } @@ -3546,6 +3569,8 @@ impl ChannelMana for (counterparty_node_id, err) in handle_errors.drain(..) { let _ = handle_error!(self, err, counterparty_node_id); } + + has_update } /// Handle a list of channel failures during a block_connected or block_disconnected call, @@ -3670,6 +3695,14 @@ impl ChannelMana pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result { self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id) } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + let events = core::cell::RefCell::new(Vec::new()); + let event_handler = |event| events.borrow_mut().push(event); + self.process_pending_events(&event_handler); + events.into_inner() + } } impl MessageSendEventsProvider for ChannelManager @@ -3680,35 +3713,71 @@ impl MessageSend L::Target: Logger, { fn get_and_clear_pending_msg_events(&self) -> Vec { - //TODO: This behavior should be documented. It's non-intuitive that we query - // ChannelMonitors when clearing other events. - self.process_pending_monitor_events(); + let events = RefCell::new(Vec::new()); + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut result = NotifyOption::SkipPersist; - self.check_free_holding_cells(); + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } - let mut ret = Vec::new(); - let mut channel_state = self.channel_state.lock().unwrap(); - mem::swap(&mut ret, &mut channel_state.pending_msg_events); - ret + if self.check_free_holding_cells() { + result = NotifyOption::DoPersist; + } + + let mut pending_events = Vec::new(); + let mut channel_state = self.channel_state.lock().unwrap(); + mem::swap(&mut pending_events, &mut channel_state.pending_msg_events); + + if !pending_events.is_empty() { + events.replace(pending_events); + } + + result + }); + events.into_inner() } } impl EventsProvider for ChannelManager - where M::Target: chain::Watch, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +where + M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, { - fn get_and_clear_pending_events(&self) -> Vec { - //TODO: This behavior should be documented. It's non-intuitive that we query - // ChannelMonitors when clearing other events. - self.process_pending_monitor_events(); + /// Processes events that must be periodically handled. + /// + /// An [`EventHandler`] may safely call back to the provider in order to handle an event. + /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock. + /// + /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared + /// when processed, an [`EventHandler`] must be able to handle previously seen events when + /// restarting from an old state. + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut result = NotifyOption::SkipPersist; - let mut ret = Vec::new(); - let mut pending_events = self.pending_events.lock().unwrap(); - mem::swap(&mut ret, &mut *pending_events); - ret + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events.drain(..) { + handler.handle_event(event); + } + + result + }); } } @@ -3746,7 +3815,7 @@ where *best_block = BestBlock::new(header.prev_blockhash, new_height) } - self.do_chain_event(Some(new_height), |channel| channel.best_block_updated(new_height, header.time)); + self.do_chain_event(Some(new_height), |channel| channel.best_block_updated(new_height, header.time, &self.logger)); } } @@ -3782,7 +3851,7 @@ where *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); - self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time)); + self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, &self.logger)); macro_rules! max_time { ($timestamp: expr) => { @@ -3824,7 +3893,7 @@ where self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.get_funding_txo() { if funding_txo.txid == *txid { - channel.funding_transaction_unconfirmed().map(|_| (None, Vec::new())) + channel.funding_transaction_unconfirmed(&self.logger).map(|_| (None, Vec::new())) } else { Ok((None, Vec::new())) } } else { Ok((None, Vec::new())) } }); @@ -4260,244 +4329,89 @@ impl PersistenceNotifier { const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; -impl Writeable for PendingHTLCInfo { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match &self.routing { - &PendingHTLCRouting::Forward { ref onion_packet, ref short_channel_id } => { - 0u8.write(writer)?; - onion_packet.write(writer)?; - short_channel_id.write(writer)?; - }, - &PendingHTLCRouting::Receive { ref payment_data, ref incoming_cltv_expiry } => { - 1u8.write(writer)?; - payment_data.payment_secret.write(writer)?; - payment_data.total_msat.write(writer)?; - incoming_cltv_expiry.write(writer)?; - }, - } - self.incoming_shared_secret.write(writer)?; - self.payment_hash.write(writer)?; - self.amt_to_forward.write(writer)?; - self.outgoing_cltv_value.write(writer)?; - Ok(()) - } -} - -impl Readable for PendingHTLCInfo { - fn read(reader: &mut R) -> Result { - Ok(PendingHTLCInfo { - routing: match Readable::read(reader)? { - 0u8 => PendingHTLCRouting::Forward { - onion_packet: Readable::read(reader)?, - short_channel_id: Readable::read(reader)?, - }, - 1u8 => PendingHTLCRouting::Receive { - payment_data: msgs::FinalOnionHopData { - payment_secret: Readable::read(reader)?, - total_msat: Readable::read(reader)?, - }, - incoming_cltv_expiry: Readable::read(reader)?, - }, - _ => return Err(DecodeError::InvalidValue), - }, - incoming_shared_secret: Readable::read(reader)?, - payment_hash: Readable::read(reader)?, - amt_to_forward: Readable::read(reader)?, - outgoing_cltv_value: Readable::read(reader)?, - }) - } -} - -impl Writeable for HTLCFailureMsg { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self { - &HTLCFailureMsg::Relay(ref fail_msg) => { - 0u8.write(writer)?; - fail_msg.write(writer)?; - }, - &HTLCFailureMsg::Malformed(ref fail_msg) => { - 1u8.write(writer)?; - fail_msg.write(writer)?; - } - } - Ok(()) - } -} - -impl Readable for HTLCFailureMsg { - fn read(reader: &mut R) -> Result { - match ::read(reader)? { - 0 => Ok(HTLCFailureMsg::Relay(Readable::read(reader)?)), - 1 => Ok(HTLCFailureMsg::Malformed(Readable::read(reader)?)), - _ => Err(DecodeError::InvalidValue), - } - } -} - -impl Writeable for PendingHTLCStatus { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self { - &PendingHTLCStatus::Forward(ref forward_info) => { - 0u8.write(writer)?; - forward_info.write(writer)?; - }, - &PendingHTLCStatus::Fail(ref fail_msg) => { - 1u8.write(writer)?; - fail_msg.write(writer)?; - } - } - Ok(()) - } -} - -impl Readable for PendingHTLCStatus { - fn read(reader: &mut R) -> Result { - match ::read(reader)? { - 0 => Ok(PendingHTLCStatus::Forward(Readable::read(reader)?)), - 1 => Ok(PendingHTLCStatus::Fail(Readable::read(reader)?)), - _ => Err(DecodeError::InvalidValue), - } - } -} - -impl_writeable!(HTLCPreviousHopData, 0, { - short_channel_id, - outpoint, - htlc_id, - incoming_packet_shared_secret -}); - -impl Writeable for ClaimableHTLC { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - self.prev_hop.write(writer)?; - self.value.write(writer)?; - self.payment_data.payment_secret.write(writer)?; - self.payment_data.total_msat.write(writer)?; - self.cltv_expiry.write(writer) - } -} - -impl Readable for ClaimableHTLC { - fn read(reader: &mut R) -> Result { - Ok(ClaimableHTLC { - prev_hop: Readable::read(reader)?, - value: Readable::read(reader)?, - payment_data: msgs::FinalOnionHopData { - payment_secret: Readable::read(reader)?, - total_msat: Readable::read(reader)?, - }, - cltv_expiry: Readable::read(reader)?, - }) - } -} - -impl Writeable for HTLCSource { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self { - &HTLCSource::PreviousHopData(ref hop_data) => { - 0u8.write(writer)?; - hop_data.write(writer)?; - }, - &HTLCSource::OutboundRoute { ref path, ref session_priv, ref first_hop_htlc_msat } => { - 1u8.write(writer)?; - path.write(writer)?; - session_priv.write(writer)?; - first_hop_htlc_msat.write(writer)?; - } - } - Ok(()) - } -} - -impl Readable for HTLCSource { - fn read(reader: &mut R) -> Result { - match ::read(reader)? { - 0 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)), - 1 => Ok(HTLCSource::OutboundRoute { - path: Readable::read(reader)?, - session_priv: Readable::read(reader)?, - first_hop_htlc_msat: Readable::read(reader)?, - }), - _ => Err(DecodeError::InvalidValue), - } - } -} - -impl Writeable for HTLCFailReason { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self { - &HTLCFailReason::LightningError { ref err } => { - 0u8.write(writer)?; - err.write(writer)?; - }, - &HTLCFailReason::Reason { ref failure_code, ref data } => { - 1u8.write(writer)?; - failure_code.write(writer)?; - data.write(writer)?; - } - } - Ok(()) - } -} - -impl Readable for HTLCFailReason { - fn read(reader: &mut R) -> Result { - match ::read(reader)? { - 0 => Ok(HTLCFailReason::LightningError { err: Readable::read(reader)? }), - 1 => Ok(HTLCFailReason::Reason { - failure_code: Readable::read(reader)?, - data: Readable::read(reader)?, - }), - _ => Err(DecodeError::InvalidValue), - } - } -} - -impl Writeable for HTLCForwardInfo { - fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - match self { - &HTLCForwardInfo::AddHTLC { ref prev_short_channel_id, ref prev_funding_outpoint, ref prev_htlc_id, ref forward_info } => { - 0u8.write(writer)?; - prev_short_channel_id.write(writer)?; - prev_funding_outpoint.write(writer)?; - prev_htlc_id.write(writer)?; - forward_info.write(writer)?; - }, - &HTLCForwardInfo::FailHTLC { ref htlc_id, ref err_packet } => { - 1u8.write(writer)?; - htlc_id.write(writer)?; - err_packet.write(writer)?; - }, - } - Ok(()) - } -} - -impl Readable for HTLCForwardInfo { - fn read(reader: &mut R) -> Result { - match ::read(reader)? { - 0 => Ok(HTLCForwardInfo::AddHTLC { - prev_short_channel_id: Readable::read(reader)?, - prev_funding_outpoint: Readable::read(reader)?, - prev_htlc_id: Readable::read(reader)?, - forward_info: Readable::read(reader)?, - }), - 1 => Ok(HTLCForwardInfo::FailHTLC { - htlc_id: Readable::read(reader)?, - err_packet: Readable::read(reader)?, - }), - _ => Err(DecodeError::InvalidValue), - } - } -} - -impl_writeable!(PendingInboundPayment, 0, { - payment_secret, - expiry_time, - user_payment_id, - payment_preimage, - min_value_msat -}); +impl_writeable_tlv_based_enum!(PendingHTLCRouting, + (0, Forward) => { + (0, onion_packet), + (2, short_channel_id), + }, {}, {}, + (1, Receive) => { + (0, payment_data), + (2, incoming_cltv_expiry), + }, {}, {} +;); + +impl_writeable_tlv_based!(PendingHTLCInfo, { + (0, routing), + (2, incoming_shared_secret), + (4, payment_hash), + (6, amt_to_forward), + (8, outgoing_cltv_value) +}, {}, {}); + +impl_writeable_tlv_based_enum!(HTLCFailureMsg, ; + (0, Relay), + (1, Malformed), +); +impl_writeable_tlv_based_enum!(PendingHTLCStatus, ; + (0, Forward), + (1, Fail), +); + +impl_writeable_tlv_based!(HTLCPreviousHopData, { + (0, short_channel_id), + (2, outpoint), + (4, htlc_id), + (6, incoming_packet_shared_secret) +}, {}, {}); + +impl_writeable_tlv_based!(ClaimableHTLC, { + (0, prev_hop), + (2, value), + (4, payment_data), + (6, cltv_expiry), +}, {}, {}); + +impl_writeable_tlv_based_enum!(HTLCSource, + (0, OutboundRoute) => { + (0, session_priv), + (2, first_hop_htlc_msat), + }, {}, { + (4, path), + }; + (1, PreviousHopData) +); + +impl_writeable_tlv_based_enum!(HTLCFailReason, + (0, LightningError) => { + (0, err), + }, {}, {}, + (1, Reason) => { + (0, failure_code), + }, {}, { + (2, data), + }, +;); + +impl_writeable_tlv_based_enum!(HTLCForwardInfo, + (0, AddHTLC) => { + (0, forward_info), + (2, prev_short_channel_id), + (4, prev_htlc_id), + (6, prev_funding_outpoint), + }, {}, {}, + (1, FailHTLC) => { + (0, htlc_id), + (2, err_packet), + }, {}, {}, +;); + +impl_writeable_tlv_based!(PendingInboundPayment, { + (0, payment_secret), + (2, expiry_time), + (4, user_payment_id), + (6, payment_preimage), + (8, min_value_msat), +}, {}, {}); impl Writeable for ChannelManager where M::Target: chain::Watch, @@ -4509,8 +4423,7 @@ impl Writeable f fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { let _consistency_lock = self.total_consistency_lock.write().unwrap(); - writer.write_all(&[SERIALIZATION_VERSION; 1])?; - writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; + write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); self.genesis_hash.write(writer)?; { @@ -4593,6 +4506,8 @@ impl Writeable f session_priv.write(writer)?; } + write_tlv_fields!(writer, {}, {}); + Ok(()) } } @@ -4716,11 +4631,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> L::Target: Logger, { fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { - let _ver: u8 = Readable::read(reader)?; - let min_ver: u8 = Readable::read(reader)?; - if min_ver > SERIALIZATION_VERSION { - return Err(DecodeError::UnknownVersion); - } + let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let genesis_hash: BlockHash = Readable::read(reader)?; let best_block_height: u32 = Readable::read(reader)?; @@ -4742,6 +4653,11 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> channel.get_cur_counterparty_commitment_transaction_number() < monitor.get_cur_counterparty_commitment_number() || channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() { // If the channel is ahead of the monitor, return InvalidValue: + log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); + log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", + log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); return Err(DecodeError::InvalidValue); } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || @@ -4758,6 +4674,9 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> by_id.insert(channel.channel_id(), channel); } } else { + log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id())); + log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); + log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); return Err(DecodeError::InvalidValue); } } @@ -4840,6 +4759,8 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> } } + read_tlv_fields!(reader, {}, {}); + let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes()); @@ -4895,9 +4816,9 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> mod tests { use ln::channelmanager::PersistenceNotifier; use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; + use core::sync::atomic::{AtomicBool, Ordering}; use std::thread; - use std::time::Duration; + use core::time::Duration; #[test] fn test_wait_timeout() { @@ -4956,13 +4877,13 @@ pub mod bench { use routing::router::get_route; use util::test_utils; use util::config::UserConfig; - use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; + use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::{Block, BlockHeader, Transaction, TxOut}; - use std::sync::Mutex; + use std::sync::{Arc, Mutex}; use test::Bencher; @@ -4988,7 +4909,7 @@ pub mod bench { let network = bitcoin::Network::Testnet; let genesis_hash = bitcoin::blockdata::constants::genesis_block(network).header.block_hash(); - let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}; + let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; let mut config: UserConfig = Default::default();