X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=8ff6918d4ffc7e814c871f9305a979f7e29a73c0;hb=f2a2fd0d48be78972ed81481c2dfbfb68ecda44e;hp=c863f46b613c77c27198d85e8f3121d9a2eb5c20;hpb=06091cee0fd29549e5e24c673bf361ab3a562529;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c863f46b..8ff6918d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -25,7 +25,7 @@ use secp256k1::Secp256k1; use secp256k1::ecdh::SharedSecret; use secp256k1; -use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; +use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator}; use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; @@ -34,7 +34,7 @@ use ln::msgs; use ln::msgs::LocalFeatures; use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; -use chain::keysinterface::KeysInterface; +use chain::keysinterface::{ChannelKeys, KeysInterface}; use util::config::UserConfig; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, Writeable, Writer}; @@ -49,6 +49,8 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; +const SIXTY_FIVE_ZEROS: [u8; 65] = [0; 65]; + // We hold various information about HTLC relay in the HTLC objects in Channel itself: // // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should @@ -254,8 +256,8 @@ pub(super) enum RAACommitmentOrder { } // Note this is only exposed in cfg(test): -pub(super) struct ChannelHolder { - pub(super) by_id: HashMap<[u8; 32], Channel>, +pub(super) struct ChannelHolder { + pub(super) by_id: HashMap<[u8; 32], Channel>, pub(super) short_to_id: HashMap, /// short channel id -> forward infos. Key of 0 means payments received /// Note that while this is held in the same mutex as the channels themselves, no consistency @@ -272,15 +274,15 @@ pub(super) struct ChannelHolder { /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, } -pub(super) struct MutChannelHolder<'a> { - pub(super) by_id: &'a mut HashMap<[u8; 32], Channel>, +pub(super) struct MutChannelHolder<'a, ChanSigner: ChannelKeys + 'a> { + pub(super) by_id: &'a mut HashMap<[u8; 32], Channel>, pub(super) short_to_id: &'a mut HashMap, pub(super) forward_htlcs: &'a mut HashMap>, pub(super) claimable_htlcs: &'a mut HashMap>, pub(super) pending_msg_events: &'a mut Vec, } -impl ChannelHolder { - pub(super) fn borrow_parts(&mut self) -> MutChannelHolder { +impl ChannelHolder { + pub(super) fn borrow_parts(&mut self) -> MutChannelHolder { MutChannelHolder { by_id: &mut self.by_id, short_to_id: &mut self.short_to_id, @@ -318,12 +320,17 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum /// the "reorg path" (ie call block_disconnected() until you get to a common block and then call /// block_connected() to step towards your best block) upon deserialization before using the /// object! -pub struct ChannelManager { +/// +/// Note that ChannelManager is responsible for tracking liveness of its channels and generating +/// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid +/// spam due to quick disconnection/reconnection, updates are not sent until the channel has been +/// offline for a full minute. In order to track this, you must call +/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfec. +pub struct ChannelManager<'a, ChanSigner: ChannelKeys> { default_configuration: UserConfig, genesis_hash: Sha256dHash, fee_estimator: Arc, - monitor: Arc, - chain_monitor: Arc, + monitor: Arc, tx_broadcaster: Arc, #[cfg(test)] @@ -334,9 +341,9 @@ pub struct ChannelManager { secp_ctx: Secp256k1, #[cfg(test)] - pub(super) channel_state: Mutex, + pub(super) channel_state: Mutex>, #[cfg(not(test))] - channel_state: Mutex, + channel_state: Mutex>, our_network_key: SecretKey, pending_events: Mutex>, @@ -345,7 +352,7 @@ pub struct ChannelManager { /// Taken first everywhere where we are making changes before any other locks. total_consistency_lock: RwLock<()>, - keys_manager: Arc, + keys_manager: Arc>, logger: Arc, } @@ -576,7 +583,7 @@ macro_rules! maybe_break_monitor_err { } } -impl ChannelManager { +impl<'a, ChanSigner: ChannelKeys> ChannelManager<'a, ChanSigner> { /// Constructs a new ChannelManager to hold several channels and route between them. /// /// This is the main "logic hub" for all channel-related actions, and implements @@ -586,9 +593,16 @@ impl ChannelManager { /// /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`! /// - /// User must provide the current blockchain height from which to track onchain channel + /// Users must provide the current blockchain height from which to track onchain channel /// funding outpoints and send payments with reliable timelocks. - pub fn new(network: Network, feeest: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc,keys_manager: Arc, config: UserConfig, current_blockchain_height: usize) -> Result, secp256k1::Error> { + /// + /// Users need to notify the new ChannelManager when a new block is connected or + /// disconnected using its `block_connected` and `block_disconnected` methods. + /// However, rather than calling these methods directly, the user should register + /// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's + /// `block_(dis)connected` methods, which will notify all registered listeners in one + /// go. + pub fn new(network: Network, feeest: Arc, monitor: Arc, tx_broadcaster: Arc, logger: Arc,keys_manager: Arc>, config: UserConfig, current_blockchain_height: usize) -> Result>, secp256k1::Error> { let secp_ctx = Secp256k1::new(); let res = Arc::new(ChannelManager { @@ -596,7 +610,6 @@ impl ChannelManager { genesis_hash: genesis_block(network).header.bitcoin_hash(), fee_estimator: feeest.clone(), monitor: monitor.clone(), - chain_monitor, tx_broadcaster, latest_block_height: AtomicUsize::new(current_blockchain_height), @@ -619,8 +632,7 @@ impl ChannelManager { logger, }); - let weak_res = Arc::downgrade(&res); - res.chain_monitor.register_listener(weak_res); + Ok(res) } @@ -768,6 +780,7 @@ impl ChannelManager { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); } for tx in local_txn { + log_trace!(self, "Broadcast onchain {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transaction(&tx); } } @@ -807,8 +820,7 @@ impl ChannelManager { } } - const ZERO:[u8; 65] = [0; 65]; - fn decode_update_add_htlc_onion(&self, msg: &msgs::UpdateAddHTLC) -> (PendingHTLCStatus, MutexGuard) { + fn decode_update_add_htlc_onion(&self, msg: &msgs::UpdateAddHTLC) -> (PendingHTLCStatus, MutexGuard>) { macro_rules! return_malformed_err { ($msg: expr, $err_code: expr) => { { @@ -885,6 +897,21 @@ impl ChannelManager { }; let pending_forward_info = if next_hop_data.hmac == [0; 32] { + #[cfg(test)] + { + // In tests, make sure that the initial onion pcket data is, at least, non-0. + // We could do some fancy randomness test here, but, ehh, whatever. + // This checks for the issue where you can calculate the path length given the + // onion data as all the path entries that the originator sent will be here + // as-is (and were originally 0s). + // Of course reverse path calculation is still pretty easy given naive routing + // algorithms, but this fixes the most-obvious case. + let mut new_packet_data = [0; 19*65]; + chacha.process(&msg.onion_routing_packet.hop_data[65..], &mut new_packet_data[0..19*65]); + assert_ne!(new_packet_data[0..65], [0; 65][..]); + assert_ne!(new_packet_data[..], [0; 19*65][..]); + } + // OUR PAYMENT! // final_expiry_too_soon if (msg.cltv_expiry as u64) < self.latest_block_height.load(Ordering::Acquire) as u64 + (CLTV_CLAIM_BUFFER + LATENCY_GRACE_PERIOD_BLOCKS) as u64 { @@ -915,7 +942,7 @@ impl ChannelManager { } else { let mut new_packet_data = [0; 20*65]; chacha.process(&msg.onion_routing_packet.hop_data[65..], &mut new_packet_data[0..19*65]); - chacha.process(&ChannelManager::ZERO[..], &mut new_packet_data[19*65..]); + chacha.process(&SIXTY_FIVE_ZEROS[..], &mut new_packet_data[19*65..]); let mut new_pubkey = msg.onion_routing_packet.public_key.unwrap(); @@ -1012,7 +1039,7 @@ impl ChannelManager { /// only fails if the channel does not yet have an assigned short_id /// May be called with channel_state already locked! - fn get_channel_update(&self, chan: &Channel) -> Result { + fn get_channel_update(&self, chan: &Channel) -> Result { let short_channel_id = match chan.get_short_channel_id() { None => return Err(LightningError{err: "Channel not yet established", action: msgs::ErrorAction::IgnoreError}), Some(id) => id, @@ -1078,14 +1105,14 @@ impl ChannelManager { } } - let session_priv = self.keys_manager.get_session_key(); + let (session_priv, prng_seed) = self.keys_manager.get_onion_rand(); let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route, &session_priv), APIError::RouteError{err: "Pubkey along hop was maliciously selected"}); let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route, cur_height)?; - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); let _ = self.total_consistency_lock.read().unwrap(); @@ -1241,7 +1268,7 @@ impl ChannelManager { } } - fn get_announcement_sigs(&self, chan: &Channel) -> Option { + fn get_announcement_sigs(&self, chan: &Channel) -> Option { if !chan.should_announce() { return None } let (announcement, our_bitcoin_sig) = match chan.get_channel_announcement(self.get_our_node_id(), self.genesis_hash.clone()) { @@ -1483,6 +1510,31 @@ impl ChannelManager { events.append(&mut new_events); } + /// If a peer is disconnected we mark any channels with that peer as 'disabled'. + /// After some time, if channels are still disabled we need to broadcast a ChannelUpdate + /// to inform the network about the uselessness of these channels. + /// + /// This method handles all the details, and must be called roughly once per minute. + pub fn timer_chan_freshness_every_min(&self) { + let _ = self.total_consistency_lock.read().unwrap(); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + for (_, chan) in channel_state.by_id { + if chan.is_disabled_staged() && !chan.is_live() { + if let Ok(update) = self.get_channel_update(&chan) { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + chan.to_fresh(); + } else if chan.is_disabled_staged() && chan.is_live() { + chan.to_fresh(); + } else if chan.is_disabled_marked() { + chan.to_disabled_staged(); + } + } + } + /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect /// after a PaymentReceived event, failing the HTLC back to its origin and freeing resources /// along the path (including in our own channel on which we received it). @@ -1510,7 +1562,7 @@ impl ChannelManager { /// to fail and take the channel_state lock for each iteration (as we take ownership and may /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to /// still-available channels. - fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_hash: &PaymentHash, onion_error: HTLCFailReason) { + fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_hash: &PaymentHash, onion_error: HTLCFailReason) { //TODO: There is a timing attack here where if a node fails an HTLC back to us they can //identify whether we sent it or not based on the (I presume) very different runtime //between the branches here. We should make this async and move it into the forward HTLCs @@ -1638,7 +1690,7 @@ impl ChannelManager { true } else { false } } - fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_preimage: PaymentPreimage) { + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard>, source: HTLCSource, payment_preimage: PaymentPreimage) { let (their_node_id, err) = loop { match source { HTLCSource::OutboundRoute { .. } => { @@ -1972,6 +2024,15 @@ impl ChannelManager { } try_chan_entry!(self, chan.get_mut().funding_locked(&msg), channel_state, chan); if let Some(announcement_sigs) = self.get_announcement_sigs(chan.get()) { + // If we see locking block before receiving remote funding_locked, we broadcast our + // announcement_sigs at remote funding_locked reception. If we receive remote + // funding_locked before seeing locking block, we broadcast our announcement_sigs at locking + // block connection. We should guanrantee to broadcast announcement_sigs to our peer whatever + // the order of the events but our peer may not receive it due to disconnection. The specs + // lacking an acknowledgement for announcement_sigs we may have to re-send them at peer + // connection in the future if simultaneous misses by both peers due to network/hardware + // failures is an issue. Note, to achieve its goal, only one of the announcement_sigs needs + // to be received, from then sigs are going to be flood to the whole network. channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { node_id: their_node_id.clone(), msg: announcement_sigs, @@ -2064,6 +2125,7 @@ impl ChannelManager { } }; if let Some(broadcast_tx) = tx { + log_trace!(self, "Broadcast onchain {}", log_tx!(broadcast_tx)); self.tx_broadcaster.broadcast_transaction(&broadcast_tx); } if let Some(chan) = chan_option { @@ -2505,7 +2567,7 @@ impl ChannelManager { } } -impl events::MessageSendEventsProvider for ChannelManager { +impl<'a, ChanSigner: ChannelKeys> events::MessageSendEventsProvider for ChannelManager<'a, ChanSigner> { fn get_and_clear_pending_msg_events(&self) -> Vec { // TODO: Event release to users and serialization is currently race-y: it's very easy for a // user to serialize a ChannelManager with pending events in it and lose those events on @@ -2530,7 +2592,7 @@ impl events::MessageSendEventsProvider for ChannelManager { } } -impl events::EventsProvider for ChannelManager { +impl<'a, ChanSigner: ChannelKeys> events::EventsProvider for ChannelManager<'a, ChanSigner> { fn get_and_clear_pending_events(&self) -> Vec { // TODO: Event release to users and serialization is currently race-y: it's very easy for a // user to serialize a ChannelManager with pending events in it and lose those events on @@ -2555,7 +2617,7 @@ impl events::EventsProvider for ChannelManager { } } -impl ChainListener for ChannelManager { +impl<'a, ChanSigner: ChannelKeys> ChainListener for ChannelManager<'a, ChanSigner> { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { let header_hash = header.bitcoin_hash(); log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); @@ -2669,7 +2731,7 @@ impl ChainListener for ChannelManager { } } -impl ChannelMessageHandler for ChannelManager { +impl<'a, ChanSigner: ChannelKeys> ChannelMessageHandler for ChannelManager<'a, ChanSigner> { //TODO: Handle errors and close channel (or so) fn handle_open_channel(&self, their_node_id: &PublicKey, their_local_features: LocalFeatures, msg: &msgs::OpenChannel) -> Result<(), LightningError> { let _ = self.total_consistency_lock.read().unwrap(); @@ -2782,8 +2844,8 @@ impl ChannelMessageHandler for ChannelManager { log_debug!(self, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(their_node_id)); channel_state.by_id.retain(|_, chan| { if chan.get_their_node_id() == *their_node_id { - //TODO: mark channel disabled (and maybe announce such after a timeout). let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(); + chan.to_disabled_marked(); if !failed_adds.is_empty() { let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe failed_payments.push((chan_update, failed_adds)); @@ -3054,7 +3116,7 @@ impl Readable for HTLCForwardInfo { } } -impl Writeable for ChannelManager { +impl<'a, ChanSigner: ChannelKeys + Writeable> Writeable for ChannelManager<'a, ChanSigner> { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { let _ = self.total_consistency_lock.write().unwrap(); @@ -3116,12 +3178,11 @@ impl Writeable for ChannelManager { /// 4) Reconnect blocks on your ChannelMonitors. /// 5) Move the ChannelMonitors into your local ManyChannelMonitor. /// 6) Disconnect/connect blocks on the ChannelManager. -/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen -/// automatically as it does in ChannelManager::new()). -pub struct ChannelManagerReadArgs<'a> { +/// 7) Register the new ChannelManager with your ChainWatchInterface. +pub struct ChannelManagerReadArgs<'a, 'b, ChanSigner: ChannelKeys> { /// The keys provider which will give us relevant keys. Some keys will be loaded during /// deserialization. - pub keys_manager: Arc, + pub keys_manager: Arc>, /// The fee_estimator for use in the ChannelManager in the future. /// @@ -3132,11 +3193,8 @@ pub struct ChannelManagerReadArgs<'a> { /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that /// you have deserialized ChannelMonitors separately and will add them to your /// ManyChannelMonitor after deserializing this ChannelManager. - pub monitor: Arc, - /// The ChainWatchInterface for use in the ChannelManager in the future. - /// - /// No calls to the ChainWatchInterface will be made during deserialization. - pub chain_monitor: Arc, + pub monitor: Arc, + /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be /// used to broadcast the latest local commitment transactions of channels which must be /// force-closed during deserialization. @@ -3161,8 +3219,8 @@ pub struct ChannelManagerReadArgs<'a> { pub channel_monitors: &'a HashMap, } -impl<'a, R : ::std::io::Read> ReadableArgs> for (Sha256dHash, ChannelManager) { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result { +impl<'a, 'b, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable> ReadableArgs> for (Sha256dHash, ChannelManager<'b, ChanSigner>) { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, 'b, ChanSigner>) -> Result { let _ver: u8 = Readable::read(reader)?; let min_ver: u8 = Readable::read(reader)?; if min_ver > SERIALIZATION_VERSION { @@ -3180,7 +3238,7 @@ impl<'a, R : ::std::io::Read> ReadableArgs> for (S let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); for _ in 0..channel_count { - let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?; + let mut channel: Channel = ReadableArgs::read(reader, args.logger.clone())?; if channel.last_block_connected != last_block_hash { return Err(DecodeError::InvalidValue); } @@ -3239,7 +3297,6 @@ impl<'a, R : ::std::io::Read> ReadableArgs> for (S genesis_hash, fee_estimator: args.fee_estimator, monitor: args.monitor, - chain_monitor: args.chain_monitor, tx_broadcaster: args.tx_broadcaster, latest_block_height: AtomicUsize::new(latest_block_height as usize),