X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=d47eba0539bf3438b21c70447d77aee3bf6a2200;hb=d73711104482c78d501a5ea47a39b466585ca34d;hp=fdbbb43da3dd104ee5bd6699937af92df6dc4f7d;hpb=af69fae97bcfeb3d9b3b4b84d37240f45e37bb85;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fdbbb43d..d47eba05 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -16,12 +16,11 @@ //! It does not manage routing logic (see routing::router::get_route for that) nor does it manage constructing //! on-chain transactions (it only monitors the chain to watch for any force-closes that might //! imply it needs to fail HTLCs/payments/channels it manages). +//! use bitcoin::blockdata::block::BlockHeader; -use bitcoin::blockdata::transaction::Transaction; use bitcoin::blockdata::constants::genesis_block; use bitcoin::network::constants::Network; -use bitcoin::util::hash::BitcoinHash; use bitcoin::hashes::{Hash, HashEngine}; use bitcoin::hashes::hmac::{Hmac, HmacEngine}; @@ -35,17 +34,21 @@ use bitcoin::secp256k1::Secp256k1; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1; -use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator}; -use chain::transaction::OutPoint; +use chain; +use chain::Watch; +use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, ChannelMonitorUpdateErr, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; +use chain::transaction::{OutPoint, TransactionData}; use ln::channel::{Channel, ChannelError}; -use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent}; use ln::features::{InitFeatures, NodeFeatures}; use routing::router::{Route, RouteHop}; use ln::msgs; +use ln::msgs::NetAddress; use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField}; use chain::keysinterface::{ChannelKeys, KeysInterface, KeysManager, InMemoryChannelKeys}; use util::config::UserConfig; +use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer}; use util::chacha20::{ChaCha20, ChaChaReader}; @@ -115,9 +118,15 @@ pub(super) enum PendingHTLCStatus { pub(super) enum HTLCForwardInfo { AddHTLC { + forward_info: PendingHTLCInfo, + + // These fields are produced in `forward_htlcs()` and consumed in + // `process_pending_htlc_forwards()` for constructing the + // `HTLCSource::PreviousHopData` for failed and forwarded + // HTLCs. prev_short_channel_id: u64, prev_htlc_id: u64, - forward_info: PendingHTLCInfo, + prev_funding_outpoint: OutPoint, }, FailHTLC { htlc_id: u64, @@ -127,10 +136,14 @@ pub(super) enum HTLCForwardInfo { /// Tracks the inbound corresponding to an outbound HTLC #[derive(Clone, PartialEq)] -pub(super) struct HTLCPreviousHopData { +pub(crate) struct HTLCPreviousHopData { short_channel_id: u64, htlc_id: u64, incoming_packet_shared_secret: [u8; 32], + + // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards + // channel with a preimage provided by the forward channel. + outpoint: OutPoint, } struct ClaimableHTLC { @@ -146,7 +159,7 @@ struct ClaimableHTLC { /// Tracks the inbound corresponding to an outbound HTLC #[derive(Clone, PartialEq)] -pub(super) enum HTLCSource { +pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { path: Vec, @@ -179,12 +192,15 @@ pub(super) enum HTLCFailReason { } /// payment_hash type, use to cross-lock hop +/// (C-not exported) as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] pub struct PaymentHash(pub [u8;32]); /// payment_preimage type, use to route payment between hop +/// (C-not exported) as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] pub struct PaymentPreimage(pub [u8;32]); /// payment_secret type, use to authenticate sender to the receiver and tie MPP HTLCs together +/// (C-not exported) as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] pub struct PaymentSecret(pub [u8;32]); @@ -312,7 +328,7 @@ pub(super) struct ChannelHolder { claimable_htlcs: HashMap<(PaymentHash, Option), Vec>, /// Messages to send to peers - pushed to in the same lock that they are generated in (except /// for broadcast messages, where ordering isn't as strict). - pub(super) pending_msg_events: Vec, + pub(super) pending_msg_events: Vec, } /// State we hold per-peer. In the future we should put channels in here, but for now we only hold @@ -356,7 +372,7 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage /// /// Note that you can be a bit lazier about writing out ChannelManager than you can be with /// ChannelMonitors. With ChannelMonitors you MUST write each monitor update out to disk before -/// returning from ManyChannelMonitor::add_/update_monitor, with ChannelManagers, writing updates +/// returning from chain::Watch::watch_/update_channel, with ChannelManagers, writing updates /// happens out-of-band (and will prevent any other ChannelManager operations from occurring during /// the serialization process). If the deserialized version is out-of-date compared to the /// ChannelMonitors passed by reference to read(), those channels will be force-closed based on the @@ -380,7 +396,7 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L> = ChannelManage /// SimpleArcChannelManager when you require a ChannelManager with a static lifetime, such as when /// you're using lightning-net-tokio. pub struct ChannelManager - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, @@ -389,7 +405,7 @@ pub struct ChannelManager, secp_ctx: Secp256k1, - #[cfg(test)] + #[cfg(any(test, feature = "_test_utils"))] pub(super) channel_state: Mutex>, - #[cfg(not(test))] + #[cfg(not(any(test, feature = "_test_utils")))] channel_state: Mutex>, our_network_key: SecretKey, @@ -459,6 +475,7 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_P const CHECK_CLTV_EXPIRY_SANITY_2: u32 = CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER; /// Details of a channel, as returned by ChannelManager::list_channels and ChannelManager::list_usable_channels +#[derive(Clone)] pub struct ChannelDetails { /// The channel's ID (prior to funding transaction generation, this is a random 32 bytes, /// thereafter this is the txid of the funding transaction xor the funding transaction output). @@ -531,7 +548,7 @@ pub enum PaymentSendFailure { } macro_rules! handle_error { - ($self: ident, $internal: expr, $their_node_id: expr) => { + ($self: ident, $internal: expr, $counterparty_node_id: expr) => { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish }) => { @@ -557,7 +574,7 @@ macro_rules! handle_error { if let msgs::ErrorAction::IgnoreError = err.action { } else { msg_events.push(events::MessageSendEvent::HandleError { - node_id: $their_node_id, + node_id: $counterparty_node_id, action: err.action.clone() }); } @@ -693,7 +710,7 @@ macro_rules! maybe_break_monitor_err { } impl ChannelManager - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, @@ -713,18 +730,14 @@ impl /// /// Users need to notify the new ChannelManager when a new block is connected or /// disconnected using its `block_connected` and `block_disconnected` methods. - /// However, rather than calling these methods directly, the user should register - /// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's - /// `block_(dis)connected` methods, which will notify all registered listeners in one - /// go. - pub fn new(network: Network, fee_est: F, monitor: M, tx_broadcaster: T, logger: L, keys_manager: K, config: UserConfig, current_blockchain_height: usize) -> Self { + pub fn new(network: Network, fee_est: F, chain_monitor: M, tx_broadcaster: T, logger: L, keys_manager: K, config: UserConfig, current_blockchain_height: usize) -> Self { let secp_ctx = Secp256k1::new(); ChannelManager { default_configuration: config.clone(), - genesis_hash: genesis_block(network).header.bitcoin_hash(), + genesis_hash: genesis_block(network).header.block_hash(), fee_estimator: fee_est, - monitor, + chain_monitor, tx_broadcaster, latest_block_height: AtomicUsize::new(current_blockchain_height), @@ -774,7 +787,7 @@ impl let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?; let res = channel.get_open_channel(self.genesis_hash.clone()); - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -803,7 +816,7 @@ impl res.push(ChannelDetails { channel_id: (*channel_id).clone(), short_channel_id: channel.get_short_channel_id(), - remote_network_id: channel.get_their_node_id(), + remote_network_id: channel.get_counterparty_node_id(), counterparty_features: InitFeatures::empty(), channel_value_satoshis: channel.get_value_satoshis(), inbound_capacity_msat, @@ -846,7 +859,7 @@ impl /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -855,7 +868,7 @@ impl hash_map::Entry::Occupied(mut chan_entry) => { let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?; channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: chan_entry.get().get_their_node_id(), + node_id: chan_entry.get().get_counterparty_node_id(), msg: shutdown_msg }); if chan_entry.get().is_shutdown() { @@ -899,14 +912,14 @@ impl // force-closing. The monitor update on the required in-memory copy should broadcast // the latest local state, which is the best we can do anyway. Thus, it is safe to // ignore the result here. - let _ = self.monitor.update_monitor(funding_txo, monitor_update); + let _ = self.chain_monitor.update_channel(funding_txo, monitor_update); } } /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. pub fn force_close_channel(&self, channel_id: &[u8; 32]) { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -1125,7 +1138,7 @@ impl PendingHTLCStatus::Forward(PendingHTLCInfo { routing: PendingHTLCRouting::Forward { onion_packet: outgoing_packet, - short_channel_id: short_channel_id, + short_channel_id, }, payment_hash: msg.payment_hash.clone(), incoming_shared_secret: shared_secret, @@ -1158,10 +1171,10 @@ impl if !chan.is_live() { // channel_disabled break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, Some(self.get_channel_update(chan).unwrap()))); } - if *amt_to_forward < chan.get_their_htlc_minimum_msat() { // amount_below_minimum + if *amt_to_forward < chan.get_counterparty_htlc_minimum_msat() { // amount_below_minimum break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, Some(self.get_channel_update(chan).unwrap()))); } - let fee = amt_to_forward.checked_mul(chan.get_fee_proportional_millionths() as u64).and_then(|prop_fee| { (prop_fee / 1000000).checked_add(chan.get_our_fee_base_msat(&self.fee_estimator) as u64) }); + let fee = amt_to_forward.checked_mul(chan.get_fee_proportional_millionths() as u64).and_then(|prop_fee| { (prop_fee / 1000000).checked_add(chan.get_holder_fee_base_msat(&self.fee_estimator) as u64) }); if fee.is_none() || msg.amount_msat < fee.unwrap() || (msg.amount_msat - fee.unwrap()) < *amt_to_forward { // fee_insufficient break Some(("Prior hop has deviated from specified fees parameters or origin node has obsolete ones", 0x1000 | 12, Some(self.get_channel_update(chan).unwrap()))); } @@ -1216,17 +1229,17 @@ impl Some(id) => id, }; - let were_node_one = PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key).serialize()[..] < chan.get_their_node_id().serialize()[..]; + let were_node_one = PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key).serialize()[..] < chan.get_counterparty_node_id().serialize()[..]; let unsigned = msgs::UnsignedChannelUpdate { chain_hash: self.genesis_hash, - short_channel_id: short_channel_id, + short_channel_id, timestamp: chan.get_update_time_counter(), flags: (!were_node_one) as u8 | ((!chan.is_live() as u8) << 1), cltv_expiry_delta: CLTV_EXPIRY_DELTA, - htlc_minimum_msat: chan.get_our_htlc_minimum_msat(), + htlc_minimum_msat: chan.get_counterparty_htlc_minimum_msat(), htlc_maximum_msat: OptionalField::Present(chan.get_announced_htlc_max_msat()), - fee_base_msat: chan.get_our_fee_base_msat(&self.fee_estimator), + fee_base_msat: chan.get_holder_fee_base_msat(&self.fee_estimator), fee_proportional_millionths: chan.get_fee_proportional_millionths(), excess_data: Vec::new(), }; @@ -1254,7 +1267,7 @@ impl } let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1266,7 +1279,7 @@ impl let channel_state = &mut *channel_lock; if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { match { - if chan.get().get_their_node_id() != path.first().unwrap().pubkey { + if chan.get().get_counterparty_node_id() != path.first().unwrap().pubkey { return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); } if !chan.get().is_live() { @@ -1279,7 +1292,7 @@ impl }, onion_packet, &self.logger), channel_state, chan) } { Some((update_add, commitment_signed, monitor_update)) => { - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { maybe_break_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true); // Note that MonitorUpdateFailed here indicates (per function docs) // that we will resend the commitment update once monitor updating @@ -1422,7 +1435,7 @@ impl /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let (chan, msg) = { let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { @@ -1435,7 +1448,7 @@ impl }, None => return }; - match handle_error!(self, res, chan.get_their_node_id()) { + match handle_error!(self, res, chan.get_counterparty_node_id()) { Ok(funding_msg) => { (chan, funding_msg) }, @@ -1445,8 +1458,8 @@ impl let mut channel_state = self.channel_state.lock().unwrap(); channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { - node_id: chan.get_their_node_id(), - msg: msg, + node_id: chan.get_counterparty_node_id(), + msg, }); match channel_state.by_id.entry(chan.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -1484,7 +1497,7 @@ impl // be absurd. We ensure this by checking that at least 500 (our stated public contract on when // broadcast_node_announcement panics) of the maximum-length addresses would fit in a 64KB // message... - const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (msgs::NetAddress::MAX_LEN as u32 + 1) / 2; + const HALF_MESSAGE_IS_ADDRS: u32 = ::std::u16::MAX as u32 / (NetAddress::MAX_LEN as u32 + 1) / 2; #[deny(const_err)] #[allow(dead_code)] // ...by failing to compile if the number of addresses that would be half of a message is @@ -1504,8 +1517,8 @@ impl /// only Tor Onion addresses. /// /// Panics if addresses is absurdly large (more than 500). - pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec) { - let _ = self.total_consistency_lock.read().unwrap(); + pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); if addresses.len() > 500 { panic!("More than half the message size was taken up by public addresses!"); @@ -1535,7 +1548,7 @@ impl /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); @@ -1552,9 +1565,11 @@ impl failed_forwards.reserve(pending_forwards.len()); for forward_info in pending_forwards.drain(..) { match forward_info { - HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info } => { + HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info, + prev_funding_outpoint } => { let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: prev_short_channel_id, + outpoint: prev_funding_outpoint, htlc_id: prev_htlc_id, incoming_packet_shared_secret: forward_info.incoming_shared_secret, }); @@ -1581,10 +1596,12 @@ impl HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { routing: PendingHTLCRouting::Forward { onion_packet, .. - }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value }, } => { + }, incoming_shared_secret, payment_hash, amt_to_forward, outgoing_cltv_value }, + prev_funding_outpoint } => { log_trace!(self.logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", log_bytes!(payment_hash.0), prev_short_channel_id, short_chan_id); let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: prev_short_channel_id, + outpoint: prev_funding_outpoint, htlc_id: prev_htlc_id, incoming_packet_shared_secret: incoming_shared_secret, }); @@ -1656,7 +1673,7 @@ impl Err(e) => { // We surely failed send_commitment due to bad keys, in that case // close channel and then send error message to peer. - let their_node_id = chan.get().get_their_node_id(); + let counterparty_node_id = chan.get().get_counterparty_node_id(); let err: Result<(), _> = match e { ChannelError::Ignore(_) => { panic!("Stated return value requirements in send_commitment() were not met"); @@ -1671,16 +1688,16 @@ impl }, ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); } }; - handle_errors.push((their_node_id, err)); + handle_errors.push((counterparty_node_id, err)); continue; } }; - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { - handle_errors.push((chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true))); + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { + handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true))); continue; } channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_their_node_id(), + node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: add_htlc_msgs, update_fulfill_htlcs: Vec::new(), @@ -1699,9 +1716,11 @@ impl match forward_info { HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo { routing: PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry }, - incoming_shared_secret, payment_hash, amt_to_forward, .. }, } => { + incoming_shared_secret, payment_hash, amt_to_forward, .. }, + prev_funding_outpoint } => { let prev_hop = HTLCPreviousHopData { short_channel_id: prev_short_channel_id, + outpoint: prev_funding_outpoint, htlc_id: prev_htlc_id, incoming_packet_shared_secret: incoming_shared_secret, }; @@ -1736,6 +1755,7 @@ impl ); failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: htlc.prev_hop.short_channel_id, + outpoint: prev_funding_outpoint, htlc_id: htlc.prev_hop.htlc_id, incoming_packet_shared_secret: htlc.prev_hop.incoming_packet_shared_secret, }), payment_hash, @@ -1744,14 +1764,14 @@ impl } } else if total_value == data.total_msat { new_events.push(events::Event::PaymentReceived { - payment_hash: payment_hash, + payment_hash, payment_secret: Some(data.payment_secret), amt: total_value, }); } } else { new_events.push(events::Event::PaymentReceived { - payment_hash: payment_hash, + payment_hash, payment_secret: None, amt: amt_to_forward, }); @@ -1773,8 +1793,8 @@ impl self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, failure_reason); } - for (their_node_id, err) in handle_errors.drain(..) { - let _ = handle_error!(self, err, their_node_id); + for (counterparty_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, counterparty_node_id); } if new_events.is_empty() { return } @@ -1788,7 +1808,7 @@ impl /// /// This method handles all the details, and must be called roughly once per minute. pub fn timer_chan_freshness_every_min(&self) { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; for (_, chan) in channel_state.by_id.iter_mut() { @@ -1813,7 +1833,7 @@ impl /// Returns false if no payment was found to fail backwards, true if the process of failing the /// HTLC backwards has been started. pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash, payment_secret: &Option) -> bool { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(*payment_hash, *payment_secret)); @@ -1938,7 +1958,7 @@ impl } } }, - HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, incoming_packet_shared_secret }) => { + HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, incoming_packet_shared_secret, .. }) => { let err_packet = match onion_error { HTLCFailReason::Reason { failure_code, data } => { log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with code {}", log_bytes!(payment_hash.0), failure_code); @@ -1992,7 +2012,7 @@ impl pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option, expected_amount: u64) -> bool { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret)); @@ -2061,9 +2081,9 @@ impl // which were generated. channel_state.take(); - for (their_node_id, err) in errs.drain(..) { + for (counterparty_node_id, err) in errs.drain(..) { let res: Result<(), _> = Err(err); - let _ = handle_error!(self, res, their_node_id); + let _ = handle_error!(self, res, counterparty_node_id); } claimed_any_htlcs @@ -2085,17 +2105,17 @@ impl match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { Ok((msgs, monitor_option)) => { if let Some(monitor_update) = monitor_option { - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { if was_frozen_for_monitor { assert!(msgs.is_none()); } else { - return Err(Some((chan.get().get_their_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err()))); + return Err(Some((chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err()))); } } } if let Some((msg, commitment_signed)) = msgs { channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_their_node_id(), + node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: vec![msg], @@ -2133,19 +2153,30 @@ impl }); }, HTLCSource::PreviousHopData(hop_data) => { - if let Err((their_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) { + let prev_outpoint = hop_data.outpoint; + if let Err((counterparty_node_id, err)) = match self.claim_funds_from_hop(&mut channel_state_lock, hop_data, payment_preimage) { Ok(()) => Ok(()), Err(None) => { - // TODO: There is probably a channel monitor somewhere that needs to - // learn the preimage as the channel already hit the chain and that's - // why it's missing. + let preimage_update = ChannelMonitorUpdate { + update_id: CLOSED_CHANNEL_UPDATE_ID, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: payment_preimage.clone(), + }], + }; + // We update the ChannelMonitor on the backward link, after + // receiving an offchain preimage event from the forward link (the + // event being update_fulfill_htlc). + if let Err(e) = self.chain_monitor.update_channel(prev_outpoint, preimage_update) { + log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}", + payment_preimage, e); + } Ok(()) }, Err(Some(res)) => Err(res), } { mem::drop(channel_state_lock); let res: Result<(), _> = Err(err); - let _ = handle_error!(self, res, their_node_id); + let _ = handle_error!(self, res, counterparty_node_id); } }, } @@ -2169,7 +2200,7 @@ impl /// exists largely only to prevent races between this and concurrent update_monitor calls. /// /// Thus, the anticipated use is, at a high level: - /// 1) You register a ManyChannelMonitor with this ChannelManager, + /// 1) You register a chain::Watch with this ChannelManager, /// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of /// said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures /// any time it cannot do so instantly, @@ -2177,7 +2208,7 @@ impl /// 4) once all remote copies are updated, you call this function with the update_id that /// completed, and once it is the latest the Channel will be re-enabled. pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { - let _ = self.total_consistency_lock.read().unwrap(); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); @@ -2199,14 +2230,14 @@ impl let (raa, commitment_update, order, pending_forwards, mut pending_failures, needs_broadcast_safe, funding_locked) = channel.monitor_updating_restored(&self.logger); if !pending_forwards.is_empty() { - htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), pending_forwards)); + htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards)); } htlc_failures.append(&mut pending_failures); macro_rules! handle_cs { () => { if let Some(update) = commitment_update { pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: channel.get_their_node_id(), + node_id: channel.get_counterparty_node_id(), updates: update, }); } @@ -2214,7 +2245,7 @@ impl macro_rules! handle_raa { () => { if let Some(revoke_and_ack) = raa { pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: channel.get_their_node_id(), + node_id: channel.get_counterparty_node_id(), msg: revoke_and_ack, }); } @@ -2237,12 +2268,12 @@ impl } if let Some(msg) = funding_locked { pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: channel.get_their_node_id(), + node_id: channel.get_counterparty_node_id(), msg, }); if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: channel.get_their_node_id(), + node_id: channel.get_counterparty_node_id(), msg: announcement_sigs, }); } @@ -2262,12 +2293,12 @@ impl } } - fn internal_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + fn internal_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } - let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, their_node_id.clone(), their_features, msg, 0, &self.default_configuration) + let channel = Channel::new_from_req(&self.fee_estimator, &self.keys_manager, counterparty_node_id.clone(), their_features, msg, 0, &self.default_configuration) .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?; let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@ -2275,7 +2306,7 @@ impl hash_map::Entry::Occupied(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!".to_owned(), msg.temporary_channel_id.clone())), hash_map::Entry::Vacant(entry) => { channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg: channel.get_accept_channel(), }); entry.insert(channel); @@ -2284,13 +2315,13 @@ impl Ok(()) } - fn internal_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } try_chan_entry!(self, chan.get_mut().accept_channel(&msg, &self.default_configuration, their_features), channel_state, chan); @@ -2303,19 +2334,19 @@ impl pending_events.push(events::Event::FundingGenerationReady { temporary_channel_id: msg.temporary_channel_id, channel_value_satoshis: value, - output_script: output_script, + output_script, user_channel_id: user_id, }); Ok(()) } - fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { - let ((funding_msg, monitor_update), mut chan) = { + fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { + let ((funding_msg, monitor), mut chan) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } (try_chan_entry!(self, chan.get_mut().funding_created(msg, &self.logger), channel_state, chan), chan.remove()) @@ -2324,8 +2355,8 @@ impl } }; // Because we have exclusive ownership of the channel here we can release the channel_state - // lock before add_monitor - if let Err(e) = self.monitor.add_monitor(monitor_update.get_funding_txo().0, monitor_update) { + // lock before watch_channel + if let Err(e) = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor) { match e { ChannelMonitorUpdateErr::PermanentFailure => { // Note that we reply with the new channel_id in error messages if we gave up on the @@ -2351,7 +2382,7 @@ impl }, hash_map::Entry::Vacant(e) => { channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg: funding_msg, }); e.insert(chan); @@ -2360,20 +2391,20 @@ impl Ok(()) } - fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { + fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let (funding_txo, user_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let monitor = match chan.get_mut().funding_signed(&msg, &self.logger) { Ok(update) => update, Err(e) => try_chan_entry!(self, Err(e), channel_state, chan), }; - if let Err(e) = self.monitor.add_monitor(chan.get().get_funding_txo().unwrap(), monitor) { + if let Err(e) = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) { return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false); } (chan.get().get_funding_txo().unwrap(), chan.get().get_user_id()) @@ -2383,18 +2414,18 @@ impl }; let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::FundingBroadcastSafe { - funding_txo: funding_txo, + funding_txo, user_channel_id: user_id, }); Ok(()) } - fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), MsgHandleErrInternal> { + fn internal_funding_locked(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } try_chan_entry!(self, chan.get_mut().funding_locked(&msg), channel_state, chan); @@ -2410,7 +2441,7 @@ impl // failures is an issue. Note, to achieve its goal, only one of the announcement_sigs needs // to be received, from then sigs are going to be flood to the whole network. channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg: announcement_sigs, }); } @@ -2420,26 +2451,26 @@ impl } } - fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { + fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { let (mut dropped_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - if chan_entry.get().get_their_node_id() != *their_node_id { + if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let (shutdown, closing_signed, dropped_htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &msg), channel_state, chan_entry); if let Some(msg) = shutdown { channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } if let Some(msg) = closing_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } @@ -2467,19 +2498,19 @@ impl Ok(()) } - fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { + fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { let (tx, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - if chan_entry.get().get_their_node_id() != *their_node_id { + if chan_entry.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), channel_state, chan_entry); if let Some(msg) = closing_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } @@ -2513,7 +2544,7 @@ impl Ok(()) } - fn internal_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> { + fn internal_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), MsgHandleErrInternal> { //TODO: BOLT 4 points out a specific attack where a peer may re-send an onion packet and //determine the state of the payment based on our response/if we forward anything/the time //we take to respond. We should take care to avoid allowing such an attack. @@ -2528,7 +2559,7 @@ impl match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } @@ -2578,13 +2609,13 @@ impl Ok(()) } - fn internal_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { + fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); let htlc_source = { let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), channel_state, chan) @@ -2596,12 +2627,12 @@ impl Ok(()) } - fn internal_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } try_chan_entry!(self, chan.get_mut().update_fail_htlc(&msg, HTLCFailReason::LightningError { err: msg.reason.clone() }), channel_state, chan); @@ -2611,12 +2642,12 @@ impl Ok(()) } - fn internal_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } if (msg.failure_code & 0x8000) == 0 { @@ -2630,12 +2661,12 @@ impl } } - fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { + fn internal_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let (revoke_and_ack, commitment_signed, closing_signed, monitor_update) = @@ -2643,23 +2674,23 @@ impl Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan), Err((Some(update), e)) => { assert!(chan.get().is_awaiting_monitor_update()); - let _ = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), update); + let _ = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), update); try_chan_entry!(self, Err(e), channel_state, chan); unreachable!(); }, Ok(res) => res }; - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()); //TODO: Rebroadcast closing_signed if present on monitor update restoration } channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg: revoke_and_ack, }); if let Some(msg) = commitment_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: Vec::new(), @@ -2672,7 +2703,7 @@ impl } if let Some(msg) = closing_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } @@ -2683,8 +2714,8 @@ impl } #[inline] - fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Vec<(PendingHTLCInfo, u64)>)]) { - for &mut (prev_short_channel_id, ref mut pending_forwards) in per_source_pending_forwards { + fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, Vec<(PendingHTLCInfo, u64)>)]) { + for &mut (prev_short_channel_id, prev_funding_outpoint, ref mut pending_forwards) in per_source_pending_forwards { let mut forward_event = None; if !pending_forwards.is_empty() { let mut channel_state = self.channel_state.lock().unwrap(); @@ -2697,10 +2728,12 @@ impl PendingHTLCRouting::Receive { .. } => 0, }) { hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().push(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info }); + entry.get_mut().push(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_funding_outpoint, + prev_htlc_id, forward_info }); }, hash_map::Entry::Vacant(entry) => { - entry.insert(vec!(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info })); + entry.insert(vec!(HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_funding_outpoint, + prev_htlc_id, forward_info })); } } } @@ -2717,21 +2750,21 @@ impl } } - fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let mut htlcs_to_fail = Vec::new(); let res = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update(); let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail_in) = break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan); htlcs_to_fail = htlcs_to_fail_in; - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { if was_frozen_for_monitor { assert!(commitment_update.is_none() && closing_signed.is_none() && pending_forwards.is_empty() && pending_failures.is_empty()); break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned())); @@ -2743,40 +2776,40 @@ impl } if let Some(updates) = commitment_update { channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), updates, }); } if let Some(msg) = closing_signed { channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } - break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"))) + break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap())) }, hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) } }; self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id); match res { - Ok((pending_forwards, mut pending_failures, short_channel_id)) => { + Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => { for failure in pending_failures.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2); } - self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]); + self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]); Ok(()) }, Err(e) => Err(e) } } - fn internal_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { + fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } try_chan_entry!(self, chan.get_mut().update_fee(&self.fee_estimator, &msg), channel_state, chan); @@ -2786,13 +2819,13 @@ impl Ok(()) } - fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { + fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } if !chan.get().is_usable() { @@ -2840,13 +2873,13 @@ impl Ok(()) } - fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { - if chan.get().get_their_node_id() != *their_node_id { + if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } // Currently, we expect all holding cell update_adds to be dropped on peer @@ -2856,7 +2889,7 @@ impl let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) = try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan); if let Some(monitor_update) = monitor_update_opt { - if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { // channel_reestablish doesn't guarantee the order it returns is sensical // for the messages it returns, but if we're setting what messages to // re-transmit on monitor update success, we need to make sure it is sane. @@ -2872,14 +2905,14 @@ impl } if let Some(msg) = funding_locked { channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg }); } macro_rules! send_raa { () => { if let Some(msg) = revoke_and_ack { channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg }); } @@ -2887,7 +2920,7 @@ impl macro_rules! send_cu { () => { if let Some(updates) = commitment_update { channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), updates }); } @@ -2904,7 +2937,7 @@ impl } if let Some(msg) = shutdown { channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: their_node_id.clone(), + node_id: counterparty_node_id.clone(), msg, }); } @@ -2918,10 +2951,11 @@ impl /// If successful, will generate a UpdateHTLCs event, so you should probably poll /// PeerManager::process_events afterwards. /// Note: This API is likely to change! + /// (C-not exported) Cause its doc(hidden) anyway #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> { - let _ = self.total_consistency_lock.read().unwrap(); - let their_node_id; + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let counterparty_node_id; let err: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; @@ -2938,15 +2972,15 @@ impl if !chan.get().is_live() { return Err(APIError::ChannelUnavailable{err: "Channel is either not yet fully established or peer is currently disconnected".to_owned()}); } - their_node_id = chan.get().get_their_node_id(); + counterparty_node_id = chan.get().get_counterparty_node_id(); if let Some((update_fee, commitment_signed, monitor_update)) = break_chan_entry!(self, chan.get_mut().send_update_fee_and_commit(feerate_per_kw, &self.logger), channel_state, chan) { - if let Err(_e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) { + if let Err(_e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { unimplemented!(); } channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_their_node_id(), + node_id: chan.get().get_counterparty_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: Vec::new(), @@ -2962,17 +2996,17 @@ impl return Ok(()) }; - match handle_error!(self, err, their_node_id) { + match handle_error!(self, err, counterparty_node_id) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::APIMisuseError { err: e.err })} } } - /// Process pending events from the ManyChannelMonitor. + /// Process pending events from the `chain::Watch`. fn process_pending_monitor_events(&self) { let mut failed_channels = Vec::new(); { - for monitor_event in self.monitor.get_and_clear_pending_monitor_events() { + for monitor_event in self.chain_monitor.release_pending_monitor_events() { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { @@ -3011,14 +3045,14 @@ impl } } -impl events::MessageSendEventsProvider for ChannelManager - where M::Target: ManyChannelMonitor, +impl MessageSendEventsProvider for ChannelManager + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn get_and_clear_pending_msg_events(&self) -> Vec { + fn get_and_clear_pending_msg_events(&self) -> Vec { //TODO: This behavior should be documented. It's non-intuitive that we query // ChannelMonitors when clearing other events. self.process_pending_monitor_events(); @@ -3030,14 +3064,14 @@ impl } } -impl events::EventsProvider for ChannelManager - where M::Target: ManyChannelMonitor, +impl EventsProvider for ChannelManager + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn get_and_clear_pending_events(&self) -> Vec { + fn get_and_clear_pending_events(&self) -> Vec { //TODO: This behavior should be documented. It's non-intuitive that we query // ChannelMonitors when clearing other events. self.process_pending_monitor_events(); @@ -3049,18 +3083,18 @@ impl } } -impl - ChainListener for ChannelManager - where M::Target: ManyChannelMonitor, +impl ChannelManager + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, - L::Target: Logger, + L::Target: Logger, { - fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[usize]) { - let header_hash = header.bitcoin_hash(); - log_trace!(self.logger, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); - let _ = self.total_consistency_lock.read().unwrap(); + /// Updates channel state based on transactions seen in a connected block. + pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + let header_hash = header.block_hash(); + log_trace!(self.logger, "Block {} at height {} connected", header_hash, height); + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); let mut timed_out_htlcs = Vec::new(); { @@ -3069,7 +3103,7 @@ impl ChannelMessageHandler for ChannelManager - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id); + fn handle_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, their_features, msg), *counterparty_node_id); } - fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id); + fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, their_features, msg), *counterparty_node_id); } - fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id); + fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id); + fn handle_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_funding_signed(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id); + fn handle_funding_locked(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingLocked) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_funding_locked(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id); + fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_shutdown(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id); + fn handle_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_closing_signed(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id); + fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id); + fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_update_fulfill_htlc(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id); + fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id); + fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id); + fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_commitment_signed(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id); + fn handle_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_revoke_and_ack(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id); + fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id); + fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_announcement_signatures(counterparty_node_id, msg), *counterparty_node_id); } - fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _ = self.total_consistency_lock.read().unwrap(); - let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id); + fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); } - fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { - let _ = self.total_consistency_lock.read().unwrap(); + fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) { + let _consistency_lock = self.total_consistency_lock.read().unwrap(); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); let mut no_channels_remain = true; @@ -3301,9 +3338,9 @@ impl node_id != their_node_id, - &events::MessageSendEvent::SendOpenChannel { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendFundingCreated { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendFundingSigned { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendFundingLocked { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendClosingSigned { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != their_node_id, - &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id, + &events::MessageSendEvent::SendAcceptChannel { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendOpenChannel { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendFundingCreated { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendFundingSigned { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendFundingLocked { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendClosingSigned { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != counterparty_node_id, + &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != counterparty_node_id, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, - &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id, + &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id, &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true, } }); } if no_channels_remain { - self.per_peer_state.write().unwrap().remove(their_node_id); + self.per_peer_state.write().unwrap().remove(counterparty_node_id); } for failure in failed_channels.drain(..) { @@ -3379,14 +3416,14 @@ impl { e.insert(Mutex::new(PeerState { latest_features: init_msg.features.clone(), @@ -3402,7 +3439,7 @@ impl(&self, writer: &mut W) -> Result<(), ::std::io::Error> { match self { - &HTLCForwardInfo::AddHTLC { ref prev_short_channel_id, ref prev_htlc_id, ref forward_info } => { + &HTLCForwardInfo::AddHTLC { ref prev_short_channel_id, ref prev_funding_outpoint, ref prev_htlc_id, ref forward_info } => { 0u8.write(writer)?; prev_short_channel_id.write(writer)?; + prev_funding_outpoint.write(writer)?; prev_htlc_id.write(writer)?; forward_info.write(writer)?; }, @@ -3634,6 +3673,7 @@ impl Readable for HTLCForwardInfo { match ::read(reader)? { 0 => Ok(HTLCForwardInfo::AddHTLC { prev_short_channel_id: Readable::read(reader)?, + prev_funding_outpoint: Readable::read(reader)?, prev_htlc_id: Readable::read(reader)?, forward_info: Readable::read(reader)?, }), @@ -3647,14 +3687,14 @@ impl Readable for HTLCForwardInfo { } impl Writeable for ChannelManager - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { - let _ = self.total_consistency_lock.write().unwrap(); + let _consistency_lock = self.total_consistency_lock.write().unwrap(); writer.write_all(&[SERIALIZATION_VERSION; 1])?; writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; @@ -3725,19 +3765,17 @@ impl - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - /// The keys provider which will give us relevant keys. Some keys will be loaded during /// deserialization. pub keys_manager: K, @@ -3746,12 +3784,12 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: 'a + ChannelKeys, M: Deref, T: /// /// No calls to the FeeEstimator will be made during deserialization. pub fee_estimator: F, - /// The ManyChannelMonitor for use in the ChannelManager in the future. + /// The chain::Watch for use in the ChannelManager in the future. /// - /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that + /// No calls to the chain::Watch will be made during deserialization. It is assumed that /// you have deserialized ChannelMonitors separately and will add them to your - /// ManyChannelMonitor after deserializing this ChannelManager. - pub monitor: M, + /// chain::Watch after deserializing this ChannelManager. + pub chain_monitor: M, /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be /// used to broadcast the latest local commitment transactions of channels which must be @@ -3774,14 +3812,36 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: 'a + ChannelKeys, M: Deref, T: /// /// In such cases the latest local transactions will be sent to the tx_broadcaster included in /// this struct. - pub channel_monitors: &'a mut HashMap>, + /// + /// (C-not exported) because we have no HashMap bindings + pub channel_monitors: HashMap>, +} + +impl<'a, ChanSigner: 'a + ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> + ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F, L> + where M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + /// Simple utility function to create a ChannelManagerReadArgs which creates the monitor + /// HashMap for you. This is primarily useful for C bindings where it is not practical to + /// populate a HashMap directly from C. + pub fn new(keys_manager: K, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, logger: L, default_config: UserConfig, + mut channel_monitors: Vec<&'a mut ChannelMonitor>) -> Self { + Self { + keys_manager, fee_estimator, chain_monitor, tx_broadcaster, logger, default_config, + channel_monitors: channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }).collect() + } + } } // Implement ReadableArgs for an Arc'd ChannelManager to make it a bit easier to work with the // SipmleArcChannelManager type: impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ReadableArgs> for (BlockHash, Arc>) - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, @@ -3795,13 +3855,13 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ReadableArgs> for (BlockHash, ChannelManager) - where M::Target: ManyChannelMonitor, + where M::Target: chain::Watch, T::Target: BroadcasterInterface, K::Target: KeysInterface, F::Target: FeeEstimator, L::Target: Logger, { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F, L>) -> Result { + fn read(reader: &mut R, mut args: ChannelManagerReadArgs<'a, ChanSigner, M, T, K, F, L>) -> Result { let _ver: u8 = Readable::read(reader)?; let min_ver: u8 = Readable::read(reader)?; if min_ver > SERIALIZATION_VERSION { @@ -3827,20 +3887,20 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_cur_local_commitment_transaction_number() < monitor.get_cur_local_commitment_number() || - channel.get_revoked_remote_commitment_transaction_number() < monitor.get_min_seen_secret() || - channel.get_cur_remote_commitment_transaction_number() < monitor.get_cur_remote_commitment_number() || + if channel.get_cur_holder_commitment_transaction_number() < monitor.get_cur_holder_commitment_number() || + channel.get_revoked_counterparty_commitment_transaction_number() < monitor.get_min_seen_secret() || + channel.get_cur_counterparty_commitment_transaction_number() < monitor.get_cur_counterparty_commitment_number() || channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() { // If the channel is ahead of the monitor, return InvalidValue: return Err(DecodeError::InvalidValue); - } else if channel.get_cur_local_commitment_transaction_number() > monitor.get_cur_local_commitment_number() || - channel.get_revoked_remote_commitment_transaction_number() > monitor.get_min_seen_secret() || - channel.get_cur_remote_commitment_transaction_number() > monitor.get_cur_remote_commitment_number() || + } else if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() || + channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() || + channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() { // But if the channel is behind of the monitor, close the channel: let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true); failed_htlcs.append(&mut new_failed_htlcs); - monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster, &args.logger); + monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger); } else { if let Some(short_channel_id) = channel.get_short_channel_id() { short_to_id.insert(short_channel_id, channel.channel_id()); @@ -3854,7 +3914,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De for (ref funding_txo, ref mut monitor) in args.channel_monitors.iter_mut() { if !funding_txo_set.contains(funding_txo) { - monitor.broadcast_latest_local_commitment_txn(&args.tx_broadcaster, &args.logger); + monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger); } } @@ -3907,7 +3967,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De let channel_manager = ChannelManager { genesis_hash, fee_estimator: args.fee_estimator, - monitor: args.monitor, + chain_monitor: args.chain_monitor, tx_broadcaster: args.tx_broadcaster, latest_block_height: AtomicUsize::new(latest_block_height as usize),