X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=31c8dccae637e6c93a9ae41affa3f94da129ea3e;hb=47ad3d6bd87affc14281ac8dbf62d69b6c066072;hp=b56ce5d9d052aa38c955d172e498d6cde4260713;hpb=4894d52d30399c21b7994952a8de0d1d7848c58d;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b56ce5d9..31c8dcca 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -39,6 +39,9 @@ use chain::Watch; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, ChannelMonitorUpdateErr, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; use chain::transaction::{OutPoint, TransactionData}; +// Since this struct is returned in `list_channels` methods, expose it here in case users want to +// construct one themselves. +pub use ln::channel::CounterpartyForwardingInfo; use ln::channel::{Channel, ChannelError}; use ln::features::{InitFeatures, NodeFeatures}; use routing::router::{Route, RouteHop}; @@ -206,7 +209,7 @@ pub struct PaymentPreimage(pub [u8;32]); #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] pub struct PaymentSecret(pub [u8;32]); -type ShutdownResult = (Option, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>); +type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>); /// Error type returned across the channel_state mutex boundary. When an Err is generated for a /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel @@ -333,6 +336,15 @@ pub(super) struct ChannelHolder { pub(super) pending_msg_events: Vec, } +/// Events which we process internally but cannot be procsesed immediately at the generation site +/// for some reason. They are handled in timer_chan_freshness_every_min, so may be processed with +/// quite some time lag. +enum BackgroundEvent { + /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder + /// commitment transaction. + ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), +} + /// State we hold per-peer. In the future we should put channels in here, but for now we only hold /// the latest Init features we heard from the peer. struct PeerState { @@ -380,7 +392,7 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage /// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the /// ChannelMonitor state and no funds will be lost (mod on-chain transaction fees). /// -/// Note that the deserializer is only implemented for (Option, ChannelManager), which +/// Note that the deserializer is only implemented for (BlockHash, ChannelManager), which /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along /// the "reorg path" (ie call block_disconnected() until you get to a common block and then call /// block_connected() to step towards your best block) upon deserialization before using the @@ -414,7 +426,7 @@ pub struct ChannelManager, + last_block_hash: RwLock, secp_ctx: Secp256k1, #[cfg(any(test, feature = "_test_utils"))] @@ -436,6 +448,7 @@ pub struct ChannelManager>>, pending_events: Mutex>, + pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. /// Essentially just when we're serializing ourselves out. /// Taken first everywhere where we are making changes before any other locks. @@ -451,11 +464,30 @@ pub struct ChannelManager { persistence_notifier: &'a PersistenceNotifier, // We hold onto this result so the lock doesn't get released immediately. @@ -479,18 +511,28 @@ impl<'a> Drop for PersistenceNotifierGuard<'a> { } } -/// The amount of time we require our counterparty wait to claim their money (ie time between when -/// we, or our watchtower, must check for them having broadcast a theft transaction). -pub(crate) const BREAKDOWN_TIMEOUT: u16 = 6 * 24; -/// The amount of time we're willing to wait to claim money back to us -pub(crate) const MAX_LOCAL_BREAKDOWN_TIMEOUT: u16 = 6 * 24 * 7; +/// The amount of time in blocks we require our counterparty wait to claim their money (ie time +/// between when we, or our watchtower, must check for them having broadcast a theft transaction). +/// +/// This can be increased (but not decreased) through [`ChannelHandshakeConfig::our_to_self_delay`] +/// +/// [`ChannelHandshakeConfig::our_to_self_delay`]: crate::util::config::ChannelHandshakeConfig::our_to_self_delay +pub const BREAKDOWN_TIMEOUT: u16 = 6 * 24; +/// The amount of time in blocks we're willing to wait to claim money back to us. This matches +/// the maximum required amount in lnd as of March 2021. +pub(crate) const MAX_LOCAL_BREAKDOWN_TIMEOUT: u16 = 2 * 6 * 24 * 7; /// The minimum number of blocks between an inbound HTLC's CLTV and the corresponding outbound -/// HTLC's CLTV. This should always be a few blocks greater than channelmonitor::CLTV_CLAIM_BUFFER, -/// ie the node we forwarded the payment on to should always have enough room to reliably time out -/// the HTLC via a full update_fail_htlc/commitment_signed dance before we hit the -/// CLTV_CLAIM_BUFFER point (we static assert that it's at least 3 blocks more). -const CLTV_EXPIRY_DELTA: u16 = 6 * 12; //TODO? +/// HTLC's CLTV. The current default represents roughly six hours of blocks at six blocks/hour. +/// +/// This can be increased (but not decreased) through [`ChannelConfig::cltv_expiry_delta`] +/// +/// [`ChannelConfig::cltv_expiry_delta`]: crate::util::config::ChannelConfig::cltv_expiry_delta +// This should always be a few blocks greater than channelmonitor::CLTV_CLAIM_BUFFER, +// i.e. the node we forwarded the payment on to should always have enough room to reliably time out +// the HTLC via a full update_fail_htlc/commitment_signed dance before we hit the +// CLTV_CLAIM_BUFFER point (we static assert that it's at least 3 blocks more). +pub const MIN_CLTV_EXPIRY_DELTA: u16 = 6 * 6; pub(super) const CLTV_FAR_FAR_AWAY: u32 = 6 * 24 * 7; //TODO? // Check that our CLTV_EXPIRY is at least CLTV_CLAIM_BUFFER + ANTI_REORG_DELAY + LATENCY_GRACE_PERIOD_BLOCKS, @@ -501,13 +543,13 @@ pub(super) const CLTV_FAR_FAR_AWAY: u32 = 6 * 24 * 7; //TODO? // LATENCY_GRACE_PERIOD_BLOCKS. #[deny(const_err)] #[allow(dead_code)] -const CHECK_CLTV_EXPIRY_SANITY: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - CLTV_CLAIM_BUFFER - ANTI_REORG_DELAY - LATENCY_GRACE_PERIOD_BLOCKS; +const CHECK_CLTV_EXPIRY_SANITY: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - CLTV_CLAIM_BUFFER - ANTI_REORG_DELAY - LATENCY_GRACE_PERIOD_BLOCKS; // Check for ability of an attacker to make us fail on-chain by delaying inbound claim. See // ChannelMontior::would_broadcast_at_height for a description of why this is needed. #[deny(const_err)] #[allow(dead_code)] -const CHECK_CLTV_EXPIRY_SANITY_2: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; +const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; /// Details of a channel, as returned by ChannelManager::list_channels and ChannelManager::list_usable_channels #[derive(Clone)] @@ -544,6 +586,10 @@ pub struct ChannelDetails { /// True if the channel is (a) confirmed and funding_locked messages have been exchanged, (b) /// the peer is connected, and (c) no monitor update failure is pending resolution. pub is_live: bool, + + /// Information on the fees and requirements that the counterparty requires when forwarding + /// payments to us through this channel. + pub counterparty_forwarding_info: Option, } /// If a payment fails to send, it can be in one of several states. This enum is returned as the @@ -760,24 +806,22 @@ impl ChannelMana /// /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`! /// - /// Users must provide the current blockchain height from which to track onchain channel - /// funding outpoints and send payments with reliable timelocks. - /// /// Users need to notify the new ChannelManager when a new block is connected or - /// disconnected using its `block_connected` and `block_disconnected` methods. - pub fn new(network: Network, fee_est: F, chain_monitor: M, tx_broadcaster: T, logger: L, keys_manager: K, config: UserConfig, current_blockchain_height: usize) -> Self { + /// disconnected using its `block_connected` and `block_disconnected` methods, starting + /// from after `params.latest_hash`. + pub fn new(fee_est: F, chain_monitor: M, tx_broadcaster: T, logger: L, keys_manager: K, config: UserConfig, params: ChainParameters) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes()); ChannelManager { default_configuration: config.clone(), - genesis_hash: genesis_block(network).header.block_hash(), + genesis_hash: genesis_block(params.network).header.block_hash(), fee_estimator: fee_est, chain_monitor, tx_broadcaster, - latest_block_height: AtomicUsize::new(current_blockchain_height), - last_block_hash: Mutex::new(Default::default()), + latest_block_height: AtomicUsize::new(params.latest_height), + last_block_hash: RwLock::new(params.latest_hash), secp_ctx, channel_state: Mutex::new(ChannelHolder{ @@ -794,6 +838,7 @@ impl ChannelMana per_peer_state: RwLock::new(HashMap::new()), pending_events: Mutex::new(Vec::new()), + pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), persistence_notifier: PersistenceNotifier::new(), @@ -863,6 +908,7 @@ impl ChannelMana outbound_capacity_msat, user_id: channel.get_user_id(), is_live: channel.is_live(), + counterparty_forwarding_info: channel.counterparty_forwarding_info(), }); } } @@ -942,12 +988,12 @@ impl ChannelMana #[inline] fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { - let (funding_txo_option, monitor_update, mut failed_htlcs) = shutdown_res; + let (monitor_update_option, mut failed_htlcs) = shutdown_res; log_trace!(self.logger, "Finishing force-closure of channel {} HTLCs to fail", failed_htlcs.len()); for htlc_source in failed_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } - if let Some(funding_txo) = funding_txo_option { + if let Some((funding_txo, monitor_update)) = monitor_update_option { // There isn't anything we can do if we get an update failure - we're already // force-closing. The monitor update on the required in-memory copy should broadcast // the latest local state, which is the best we can do anyway. Thus, it is safe to @@ -956,16 +1002,14 @@ impl ChannelMana } } - fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result<(), APIError> { + fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result { let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; if let hash_map::Entry::Occupied(chan) = channel_state.by_id.entry(channel_id.clone()) { if let Some(node_id) = peer_node_id { if chan.get().get_counterparty_node_id() != *node_id { - // Error or Ok here doesn't matter - the result is only exposed publicly - // when peer_node_id is None anyway. - return Ok(()); + return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()}); } } if let Some(short_id) = chan.get().get_short_channel_id() { @@ -985,14 +1029,27 @@ impl ChannelMana }); } - Ok(()) + Ok(chan.get_counterparty_node_id()) } /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager. pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - self.force_close_channel_with_peer(channel_id, None) + match self.force_close_channel_with_peer(channel_id, None) { + Ok(counterparty_node_id) => { + self.channel_state.lock().unwrap().pending_msg_events.push( + events::MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } + }, + } + ); + Ok(()) + }, + Err(e) => Err(e) + } } /// Force close all channels, immediately broadcasting the latest local commitment transaction @@ -1230,7 +1287,7 @@ impl ChannelMana if fee.is_none() || msg.amount_msat < fee.unwrap() || (msg.amount_msat - fee.unwrap()) < *amt_to_forward { // fee_insufficient break Some(("Prior hop has deviated from specified fees parameters or origin node has obsolete ones", 0x1000 | 12, Some(self.get_channel_update(chan).unwrap()))); } - if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + CLTV_EXPIRY_DELTA as u64 { // incorrect_cltv_expiry + if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + chan.get_cltv_expiry_delta() as u64 { // incorrect_cltv_expiry break Some(("Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", 0x1000 | 13, Some(self.get_channel_update(chan).unwrap()))); } let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; @@ -1288,7 +1345,7 @@ impl ChannelMana short_channel_id, timestamp: chan.get_update_time_counter(), flags: (!were_node_one) as u8 | ((!chan.is_live() as u8) << 1), - cltv_expiry_delta: CLTV_EXPIRY_DELTA, + cltv_expiry_delta: chan.get_cltv_expiry_delta(), htlc_minimum_msat: chan.get_counterparty_htlc_minimum_msat(), htlc_maximum_msat: OptionalField::Present(chan.get_announced_htlc_max_msat()), fee_base_msat: chan.get_holder_fee_base_msat(&self.fee_estimator), @@ -1854,13 +1911,42 @@ impl ChannelMana events.append(&mut new_events); } + /// Free the background events, generally called from timer_chan_freshness_every_min. + /// + /// Exposed for testing to allow us to process events quickly without generating accidental + /// BroadcastChannelUpdate events in timer_chan_freshness_every_min. + /// + /// Expects the caller to have a total_consistency_lock read lock. + fn process_background_events(&self) { + let mut background_events = Vec::new(); + mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); + for event in background_events.drain(..) { + match event { + BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => { + // The channel has already been closed, so no use bothering to care about the + // monitor updating completing. + let _ = self.chain_monitor.update_channel(funding_txo, update); + }, + } + } + } + + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_process_background_events(&self) { + self.process_background_events(); + } + /// If a peer is disconnected we mark any channels with that peer as 'disabled'. /// After some time, if channels are still disabled we need to broadcast a ChannelUpdate /// to inform the network about the uselessness of these channels. /// /// This method handles all the details, and must be called roughly once per minute. + /// + /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call. pub fn timer_chan_freshness_every_min(&self) { let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + self.process_background_events(); + let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; for (_, chan) in channel_state.by_id.iter_mut() { @@ -1953,6 +2039,10 @@ impl ChannelMana //identify whether we sent it or not based on the (I presume) very different runtime //between the branches here. We should make this async and move it into the forward HTLCs //timer handling. + + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // from block_connected which may run during initialization prior to the chain_monitor + // being fully configured. See the docs for `ChannelManagerReadArgs` for more. match source { HTLCSource::OutboundRoute { ref path, .. } => { log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0)); @@ -2394,6 +2484,7 @@ impl ChannelMana fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let ((funding_msg, monitor), mut chan) = { + let last_block_hash = *self.last_block_hash.read().unwrap(); let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { @@ -2401,7 +2492,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } - (try_chan_entry!(self, chan.get_mut().funding_created(msg, &self.logger), channel_state, chan), chan.remove()) + (try_chan_entry!(self, chan.get_mut().funding_created(msg, last_block_hash, &self.logger), channel_state, chan), chan.remove()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) } @@ -2418,7 +2509,7 @@ impl ChannelMana // We do not do a force-close here as that would generate a monitor update for // a monitor that we didn't manage to store (and that we don't care about - we // don't respond with the funding_signed so the channel can never go on chain). - let (_funding_txo_option, _monitor_update, failed_htlcs) = chan.force_shutdown(true); + let (_monitor_update, failed_htlcs) = chan.force_shutdown(true); assert!(failed_htlcs.is_empty()); return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id)); }, @@ -2450,6 +2541,7 @@ impl ChannelMana fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let (funding_txo, user_id) = { + let last_block_hash = *self.last_block_hash.read().unwrap(); let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { @@ -2457,7 +2549,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let monitor = match chan.get_mut().funding_signed(&msg, &self.logger) { + let monitor = match chan.get_mut().funding_signed(&msg, last_block_hash, &self.logger) { Ok(update) => update, Err(e) => try_chan_entry!(self, Err(e), channel_state, chan), }; @@ -2930,6 +3022,29 @@ impl ChannelMana Ok(()) } + fn internal_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) -> Result<(), MsgHandleErrInternal> { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + let chan_id = match channel_state.short_to_id.get(&msg.contents.short_channel_id) { + Some(chan_id) => chan_id.clone(), + None => { + // It's not a local channel + return Ok(()) + } + }; + match channel_state.by_id.entry(chan_id) { + hash_map::Entry::Occupied(mut chan) => { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { + // TODO: see issue #153, need a consistent behavior on obnoxious behavior from random node + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), chan_id)); + } + try_chan_entry!(self, chan.get_mut().channel_update(&msg), channel_state, chan); + }, + hash_map::Entry::Vacant(_) => unreachable!() + } + Ok(()) + } + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@ -3090,6 +3205,12 @@ impl ChannelMana msg: update }); } + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } + }, + }); } }, } @@ -3100,6 +3221,29 @@ impl ChannelMana self.finish_force_close_channel(failure); } } + + /// Handle a list of channel failures during a block_connected or block_disconnected call, + /// pushing the channel monitor update (if any) to the background events queue and removing the + /// Channel object. + fn handle_init_event_channel_failures(&self, mut failed_channels: Vec) { + for mut failure in failed_channels.drain(..) { + // Either a commitment transactions has been confirmed on-chain or + // Channel::block_disconnected detected that the funding transaction has been + // reorganized out of the main chain. + // We cannot broadcast our latest local state via monitor update (as + // Channel::force_shutdown tries to make us do) as we may still be in initialization, + // so we track the update internally and handle it when the user next calls + // timer_chan_freshness_every_min, guaranteeing we're running normally. + if let Some((funding_txo, update)) = failure.0.take() { + assert_eq!(update.updates.len(), 1); + if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { + assert!(should_broadcast); + } else { unreachable!(); } + self.pending_background_events.lock().unwrap().push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, update))); + } + self.finish_force_close_channel(failure); + } + } } impl MessageSendEventsProvider for ChannelManager @@ -3149,12 +3293,26 @@ where L::Target: Logger, { fn block_connected(&self, block: &Block, height: u32) { + assert_eq!(*self.last_block_hash.read().unwrap(), block.header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); - ChannelManager::block_connected(self, &block.header, &txdata, height); + self.transactions_confirmed(&block.header, height, &txdata); + self.update_best_block(&block.header, height); } - fn block_disconnected(&self, header: &BlockHeader, _height: u32) { - ChannelManager::block_disconnected(self, header); + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + let new_height = self.latest_block_height.fetch_sub(1, Ordering::AcqRel) as u32 - 1; + assert_eq!(new_height, height - 1, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *self.last_block_hash.write().unwrap() = header.prev_blockhash; + + self.do_chain_event(new_height, |channel| channel.update_best_block(new_height, header.time)); } } @@ -3165,11 +3323,12 @@ impl ChannelMana F::Target: FeeEstimator, L::Target: Logger, { - /// Updates channel state based on transactions seen in a connected block. - pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { - let header_hash = header.block_hash(); - log_trace!(self.logger, "Block {} at height {} connected", header_hash, height); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + fn do_chain_event) -> Result<(Option, Vec<(HTLCSource, PaymentHash)>), msgs::ErrorMessage>> + (&self, height: u32, f: FN) { + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // during initialization prior to the chain_monitor being fully configured in some cases. + // See the docs for `ChannelManagerReadArgs` for more. + let mut failed_channels = Vec::new(); let mut timed_out_htlcs = Vec::new(); { @@ -3178,7 +3337,7 @@ impl ChannelMana let short_to_id = &mut channel_state.short_to_id; let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { - let res = channel.block_connected(header, txdata, height); + let res = f(channel); if let Ok((chan_res, mut timed_out_pending_htlcs)) = res { for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { let chan_update = self.get_channel_update(&channel).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe @@ -3204,34 +3363,23 @@ impl ChannelMana short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } } else if let Err(e) = res { + if let Some(short_id) = channel.get_short_channel_id() { + short_to_id.remove(&short_id); + } + // It looks like our counterparty went on-chain or funding transaction was + // reorged out of the main chain. Close the channel. + failed_channels.push(channel.force_shutdown(true)); + if let Ok(update) = self.get_channel_update(&channel) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_counterparty_node_id(), action: msgs::ErrorAction::SendErrorMessage { msg: e }, }); return false; } - if let Some(funding_txo) = channel.get_funding_txo() { - for &(_, tx) in txdata.iter() { - for inp in tx.input.iter() { - if inp.previous_output == funding_txo.into_bitcoin_outpoint() { - log_trace!(self.logger, "Detected channel-closing tx {} spending {}:{}, closing channel {}", tx.txid(), inp.previous_output.txid, inp.previous_output.vout, log_bytes!(channel.channel_id())); - if let Some(short_id) = channel.get_short_channel_id() { - short_to_id.remove(&short_id); - } - // It looks like our counterparty went on-chain. We go ahead and - // broadcast our latest local state as well here, just in case its - // some kind of SPV attack, though we expect these to be dropped. - failed_channels.push(channel.force_shutdown(true)); - if let Ok(update) = self.get_channel_update(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - return false; - } - } - } - } true }); @@ -3254,15 +3402,71 @@ impl ChannelMana !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. }); } - for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); - } + + self.handle_init_event_channel_failures(failed_channels); for (source, payment_hash, reason) in timed_out_htlcs.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, reason); } + } + + /// Updates channel state to take note of transactions which were confirmed in the given block + /// at the given height. + /// + /// Note that you must still call (or have called) [`update_best_block`] with the block + /// information which is included here. + /// + /// This method may be called before or after [`update_best_block`] for a given block's + /// transaction data and may be called multiple times with additional transaction data for a + /// given block. + /// + /// This method may be called for a previous block after an [`update_best_block`] call has + /// been made for a later block, however it must *not* be called with transaction data from a + /// block which is no longer in the best chain (ie where [`update_best_block`] has already + /// been informed about a blockchain reorganization which no longer includes the block which + /// corresponds to `header`). + /// + /// [`update_best_block`]: `Self::update_best_block` + pub fn transactions_confirmed(&self, header: &BlockHeader, height: u32, txdata: &TransactionData) { + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // during initialization prior to the chain_monitor being fully configured in some cases. + // See the docs for `ChannelManagerReadArgs` for more. + + let block_hash = header.block_hash(); + log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + self.do_chain_event(height, |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new()))); + } + + /// Updates channel state with the current best blockchain tip. You should attempt to call this + /// quickly after a new block becomes available, however if multiple new blocks become + /// available at the same time, only a single `update_best_block()` call needs to be made. + /// + /// This method should also be called immediately after any block disconnections, once at the + /// reorganization fork point, and once with the new chain tip. Calling this method at the + /// blockchain reorganization fork point ensures we learn when a funding transaction which was + /// previously confirmed is reorganized out of the blockchain, ensuring we do not continue to + /// accept payments which cannot be enforced on-chain. + /// + /// In both the block-connection and block-disconnection case, this method may be called either + /// once per block connected or disconnected, or simply at the fork point and new tip(s), + /// skipping any intermediary blocks. + pub fn update_best_block(&self, header: &BlockHeader, height: u32) { + // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called + // during initialization prior to the chain_monitor being fully configured in some cases. + // See the docs for `ChannelManagerReadArgs` for more. + + let block_hash = header.block_hash(); + log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); + + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + self.latest_block_height.store(height as usize, Ordering::Release); - *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header_hash; + *self.last_block_hash.write().unwrap() = block_hash; + + self.do_chain_event(height, |channel| channel.update_best_block(height, header.time)); + loop { // Update last_node_announcement_serial to be the max of its current value and the // block timestamp. This should keep us close to the current time without relying on @@ -3277,54 +3481,20 @@ impl ChannelMana } } - /// Updates channel state based on a disconnected block. - /// - /// If necessary, the channel may be force-closed without letting the counterparty participate - /// in the shutdown. - pub fn block_disconnected(&self, header: &BlockHeader) { - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - let mut failed_channels = Vec::new(); - { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - let short_to_id = &mut channel_state.short_to_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - channel_state.by_id.retain(|_, v| { - if v.block_disconnected(header) { - if let Some(short_id) = v.get_short_channel_id() { - short_to_id.remove(&short_id); - } - failed_channels.push(v.force_shutdown(true)); - if let Ok(update) = self.get_channel_update(&v) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - false - } else { - true - } - }); - } - for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); - } - self.latest_block_height.fetch_sub(1, Ordering::AcqRel); - *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.block_hash(); - } - /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool - /// indicating whether persistence is necessary. Only one listener on `wait_timeout` is - /// guaranteed to be woken up. + /// indicating whether persistence is necessary. Only one listener on + /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken + /// up. /// Note that the feature `allow_wallclock_use` must be enabled to use this function. #[cfg(any(test, feature = "allow_wallclock_use"))] - pub fn wait_timeout(&self, max_wait: Duration) -> bool { + pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool { self.persistence_notifier.wait_timeout(max_wait) } - /// Blocks until ChannelManager needs to be persisted. Only one listener on `wait` is - /// guaranteed to be woken up. - pub fn wait(&self) { + /// Blocks until ChannelManager needs to be persisted. Only one listener on + /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken + /// up. + pub fn await_persistable_update(&self) { self.persistence_notifier.wait() } @@ -3420,6 +3590,11 @@ impl true, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, &events::MessageSendEvent::SendShortIdsQuery { .. } => false, + &events::MessageSendEvent::SendReplyChannelRange { .. } => false, } }); } @@ -3576,7 +3752,7 @@ impl Writeable f self.genesis_hash.write(writer)?; (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?; - self.last_block_hash.lock().unwrap().write(writer)?; + self.last_block_hash.read().unwrap().write(writer)?; let channel_state = self.channel_state.lock().unwrap(); let mut unfunded_channels = 0; @@ -3914,6 +4090,18 @@ impl Writeable f event.write(writer)?; } + let background_events = self.pending_background_events.lock().unwrap(); + (background_events.len() as u64).write(writer)?; + for event in background_events.iter() { + match event { + BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => { + 0u8.write(writer)?; + funding_txo.write(writer)?; + monitor_update.write(writer)?; + }, + } + } + (self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?; Ok(()) @@ -3925,15 +4113,26 @@ impl Writeable f /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation /// is: /// 1) Deserialize all stored ChannelMonitors. -/// 2) Deserialize the ChannelManager by filling in this struct and calling <(Option, -/// ChannelManager)>::read(reader, args). +/// 2) Deserialize the ChannelManager by filling in this struct and calling: +/// <(BlockHash, ChannelManager)>::read(reader, args) /// This may result in closing some Channels if the ChannelMonitor is newer than the stored /// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. -/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using -/// ChannelMonitor::get_outputs_to_watch() and ChannelMonitor::get_funding_txo(). +/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same +/// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and +/// ChannelMonitor::get_funding_txo(). /// 4) Reconnect blocks on your ChannelMonitors. -/// 5) Move the ChannelMonitors into your local chain::Watch. -/// 6) Disconnect/connect blocks on the ChannelManager. +/// 5) Disconnect/connect blocks on the ChannelManager. +/// 6) Move the ChannelMonitors into your local chain::Watch. +/// +/// Note that the ordering of #4-6 is not of importance, however all three must occur before you +/// call any other methods on the newly-deserialized ChannelManager. +/// +/// Note that because some channels may be closed during deserialization, it is critical that you +/// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to +/// you. If you deserialize an old ChannelManager (during which force-closure transactions may be +/// broadcast), and then later deserialize a newer version of the same ChannelManager (which will +/// not force-close the same channels but consider them live), you may end up revoking a state for +/// which you've already broadcasted the transaction. pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> where M::Target: chain::Watch, T::Target: BroadcasterInterface, @@ -4006,7 +4205,7 @@ impl<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the // SipmleArcChannelManager type: impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (Option, Arc>) + ReadableArgs> for (BlockHash, Arc>) where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, @@ -4014,13 +4213,13 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> L::Target: Logger, { fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, Signer, M, T, K, F, L>) -> Result { - let (blockhash, chan_manager) = <(Option, ChannelManager)>::read(reader, args)?; + let (blockhash, chan_manager) = <(BlockHash, ChannelManager)>::read(reader, args)?; Ok((blockhash, Arc::new(chan_manager))) } } impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> - ReadableArgs> for (Option, ChannelManager) + ReadableArgs> for (BlockHash, ChannelManager) where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, @@ -4046,10 +4245,6 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); for _ in 0..channel_count { let mut channel: Channel = Channel::read(reader, &args.keys_manager)?; - if channel.last_block_connected != Default::default() && channel.last_block_connected != last_block_hash { - return Err(DecodeError::InvalidValue); - } - let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { @@ -4064,7 +4259,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() { // But if the channel is behind of the monitor, close the channel: - let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true); + let (_, mut new_failed_htlcs) = channel.force_shutdown(true); failed_htlcs.append(&mut new_failed_htlcs); monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger); } else { @@ -4128,6 +4323,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> } } + let background_event_count: u64 = Readable::read(reader)?; + let mut pending_background_events_read: Vec = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::())); + for _ in 0..background_event_count { + match ::read(reader)? { + 0 => pending_background_events_read.push(BackgroundEvent::ClosingMonitorUpdate((Readable::read(reader)?, Readable::read(reader)?))), + _ => return Err(DecodeError::InvalidValue), + } + } + let last_node_announcement_serial: u32 = Readable::read(reader)?; let mut secp_ctx = Secp256k1::new(); @@ -4140,7 +4344,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> tx_broadcaster: args.tx_broadcaster, latest_block_height: AtomicUsize::new(latest_block_height as usize), - last_block_hash: Mutex::new(last_block_hash), + last_block_hash: RwLock::new(last_block_hash), secp_ctx, channel_state: Mutex::new(ChannelHolder { @@ -4157,6 +4361,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> per_peer_state: RwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_background_events: Mutex::new(pending_background_events_read), total_consistency_lock: RwLock::new(()), persistence_notifier: PersistenceNotifier::new(), @@ -4172,12 +4377,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - let last_seen_block_hash = if last_block_hash == Default::default() { - None - } else { - Some(last_block_hash) - }; - Ok((last_seen_block_hash, channel_manager)) + Ok((last_block_hash.clone(), channel_manager)) } }