X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=60c7a6a2bef70801fe08949c623dd18aab599dbe;hb=a0f8c2519b522e7d600946cf5d90f97c2188bc19;hp=632800b2cd8db29217c3aa5eb98ed8ce66e3f429;hpb=31cc243e6f04e82f7754d26df7a2bb0275d7ebd1;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 632800b2..60c7a6a2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -30,26 +30,26 @@ use chain::transaction::OutPoint; use ln::channel::{Channel, ChannelError}; use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY}; use ln::router::Route; +use ln::features::{InitFeatures, NodeFeatures}; use ln::msgs; -use ln::msgs::InitFeatures; use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; -use chain::keysinterface::{ChannelKeys, KeysInterface}; +use chain::keysinterface::{ChannelKeys, KeysInterface, InMemoryChannelKeys}; use util::config::UserConfig; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, Writeable, Writer}; -use util::chacha20::ChaCha20; +use util::chacha20::{ChaCha20, ChaChaReader}; use util::logger::Logger; use util::errors::APIError; use std::{cmp, mem}; use std::collections::{HashMap, hash_map, HashSet}; -use std::io::Cursor; +use std::io::{Cursor, Read}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; - -const SIXTY_FIVE_ZEROS: [u8; 65] = [0; 65]; +use std::marker::{Sync, Send}; +use std::ops::Deref; // We hold various information about HTLC relay in the HTLC objects in Channel itself: // @@ -274,28 +274,31 @@ pub(super) struct ChannelHolder { /// for broadcast messages, where ordering isn't as strict). pub(super) pending_msg_events: Vec, } -pub(super) struct MutChannelHolder<'a, ChanSigner: ChannelKeys + 'a> { - pub(super) by_id: &'a mut HashMap<[u8; 32], Channel>, - pub(super) short_to_id: &'a mut HashMap, - pub(super) forward_htlcs: &'a mut HashMap>, - pub(super) claimable_htlcs: &'a mut HashMap>, - pub(super) pending_msg_events: &'a mut Vec, -} -impl ChannelHolder { - pub(super) fn borrow_parts(&mut self) -> MutChannelHolder { - MutChannelHolder { - by_id: &mut self.by_id, - short_to_id: &mut self.short_to_id, - forward_htlcs: &mut self.forward_htlcs, - claimable_htlcs: &mut self.claimable_htlcs, - pending_msg_events: &mut self.pending_msg_events, - } - } + +/// State we hold per-peer. In the future we should put channels in here, but for now we only hold +/// the latest Init features we heard from the peer. +struct PeerState { + latest_features: InitFeatures, } #[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height"; +/// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g. +/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static +/// lifetimes). Other times you can afford a reference, which is more efficient, in which case +/// SimpleRefChannelManager is the more appropriate type. Defining these type aliases prevents +/// issues such as overly long function definitions. +pub type SimpleArcChannelManager = Arc>>; + +/// SimpleRefChannelManager is a type alias for a ChannelManager reference, and is the reference +/// counterpart to the SimpleArcChannelManager type alias. Use this type by default when you don't +/// need a ChannelManager with a static lifetime. You'll need a static lifetime in cases such as +/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes). +/// But if this is not necessary, using a reference is more efficient. Defining these type aliases +/// helps with issues such as long function definitions. +pub type SimpleRefChannelManager<'a, M> = ChannelManager; + /// Manager which keeps track of a number of channels and sends messages to the appropriate /// channel, also tracking HTLC preimages and forwarding onion packets appropriately. /// @@ -325,12 +328,18 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum /// ChannelUpdate messages informing peers that the channel is temporarily disabled. To avoid /// spam due to quick disconnection/reconnection, updates are not sent until the channel has been /// offline for a full minute. In order to track this, you must call -/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfec. -pub struct ChannelManager { +/// timer_chan_freshness_every_min roughly once per minute, though it doesn't have to be perfect. +/// +/// Rather than using a plain ChannelManager, it is preferable to use either a SimpleArcChannelManager +/// a SimpleRefChannelManager, for conciseness. See their documentation for more details, but +/// essentially you should default to using a SimpleRefChannelManager, and use a +/// SimpleArcChannelManager when you require a ChannelManager with a static lifetime, such as when +/// you're using lightning-net-tokio. +pub struct ChannelManager where M::Target: ManyChannelMonitor { default_configuration: UserConfig, genesis_hash: Sha256dHash, fee_estimator: Arc, - monitor: Arc, + monitor: M, tx_broadcaster: Arc, #[cfg(test)] @@ -346,6 +355,16 @@ pub struct ChannelManager { channel_state: Mutex>, our_network_key: SecretKey, + last_node_announcement_serial: AtomicUsize, + + /// The bulk of our storage will eventually be here (channels and message queues and the like). + /// If we are connected to a peer we always at least have an entry here, even if no channels + /// are currently open with that peer. + /// Because adding or removing an entry is rare, we usually take an outer read lock and then + /// operate on the inner value freely. Sadly, this prevents parallel operation when opening a + /// new channel. + per_peer_state: RwLock>>, + pending_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. /// Essentially just when we're serializing ourselves out. @@ -408,6 +427,10 @@ pub struct ChannelDetails { pub short_channel_id: Option, /// The node_id of our counterparty pub remote_network_id: PublicKey, + /// The Features the channel counterparty provided upon last connection. + /// Useful for routing as it is the most up-to-date copy of the counterparty's features and + /// many routing-relevant features are present in the init context. + pub counterparty_features: InitFeatures, /// The value, in satoshis, of this channel as appears in the funding output pub channel_value_satoshis: u64, /// The user_id passed in to create_channel, or 0 if the channel was inbound. @@ -586,7 +609,7 @@ macro_rules! maybe_break_monitor_err { } } -impl ChannelManager { +impl ChannelManager where M::Target: ManyChannelMonitor { /// Constructs a new ChannelManager to hold several channels and route between them. /// /// This is the main "logic hub" for all channel-related actions, and implements @@ -605,14 +628,14 @@ impl ChannelManager { /// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's /// `block_(dis)connected` methods, which will notify all registered listeners in one /// go. - pub fn new(network: Network, feeest: Arc, monitor: Arc, tx_broadcaster: Arc, logger: Arc,keys_manager: Arc>, config: UserConfig, current_blockchain_height: usize) -> Result>, secp256k1::Error> { + pub fn new(network: Network, feeest: Arc, monitor: M, tx_broadcaster: Arc, logger: Arc,keys_manager: Arc>, config: UserConfig, current_blockchain_height: usize) -> Result, secp256k1::Error> { let secp_ctx = Secp256k1::new(); - let res = Arc::new(ChannelManager { + let res = ChannelManager { default_configuration: config.clone(), genesis_hash: genesis_block(network).header.bitcoin_hash(), fee_estimator: feeest.clone(), - monitor: monitor.clone(), + monitor, tx_broadcaster, latest_block_height: AtomicUsize::new(current_blockchain_height), @@ -628,13 +651,17 @@ impl ChannelManager { }), our_network_key: keys_manager.get_node_secret(), + last_node_announcement_serial: AtomicUsize::new(0), + + per_peer_state: RwLock::new(HashMap::new()), + pending_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), keys_manager, logger, - }); + }; Ok(res) } @@ -678,56 +705,53 @@ impl ChannelManager { Ok(()) } - /// Gets the list of open channels, in random order. See ChannelDetail field documentation for - /// more information. - pub fn list_channels(&self) -> Vec { - let channel_state = self.channel_state.lock().unwrap(); - let mut res = Vec::with_capacity(channel_state.by_id.len()); - for (channel_id, channel) in channel_state.by_id.iter() { - let (inbound_capacity_msat, outbound_capacity_msat) = channel.get_inbound_outbound_available_balance_msat(); - res.push(ChannelDetails { - channel_id: (*channel_id).clone(), - short_channel_id: channel.get_short_channel_id(), - remote_network_id: channel.get_their_node_id(), - channel_value_satoshis: channel.get_value_satoshis(), - inbound_capacity_msat, - outbound_capacity_msat, - user_id: channel.get_user_id(), - is_live: channel.is_live(), - }); - } - res - } - - /// Gets the list of usable channels, in random order. Useful as an argument to - /// Router::get_route to ensure non-announced channels are used. - /// - /// These are guaranteed to have their is_live value set to true, see the documentation for - /// ChannelDetails::is_live for more info on exactly what the criteria are. - pub fn list_usable_channels(&self) -> Vec { - let channel_state = self.channel_state.lock().unwrap(); - let mut res = Vec::with_capacity(channel_state.by_id.len()); - for (channel_id, channel) in channel_state.by_id.iter() { - // Note we use is_live here instead of usable which leads to somewhat confused - // internal/external nomenclature, but that's ok cause that's probably what the user - // really wanted anyway. - if channel.is_live() { + fn list_channels_with_filter)) -> bool>(&self, f: F) -> Vec { + let mut res = Vec::new(); + { + let channel_state = self.channel_state.lock().unwrap(); + res.reserve(channel_state.by_id.len()); + for (channel_id, channel) in channel_state.by_id.iter().filter(f) { let (inbound_capacity_msat, outbound_capacity_msat) = channel.get_inbound_outbound_available_balance_msat(); res.push(ChannelDetails { channel_id: (*channel_id).clone(), short_channel_id: channel.get_short_channel_id(), remote_network_id: channel.get_their_node_id(), + counterparty_features: InitFeatures::empty(), channel_value_satoshis: channel.get_value_satoshis(), inbound_capacity_msat, outbound_capacity_msat, user_id: channel.get_user_id(), - is_live: true, + is_live: channel.is_live(), }); } } + let per_peer_state = self.per_peer_state.read().unwrap(); + for chan in res.iter_mut() { + if let Some(peer_state) = per_peer_state.get(&chan.remote_network_id) { + chan.counterparty_features = peer_state.lock().unwrap().latest_features.clone(); + } + } res } + /// Gets the list of open channels, in random order. See ChannelDetail field documentation for + /// more information. + pub fn list_channels(&self) -> Vec { + self.list_channels_with_filter(|_| true) + } + + /// Gets the list of usable channels, in random order. Useful as an argument to + /// Router::get_route to ensure non-announced channels are used. + /// + /// These are guaranteed to have their is_live value set to true, see the documentation for + /// ChannelDetails::is_live for more info on exactly what the criteria are. + pub fn list_usable_channels(&self) -> Vec { + // Note we use is_live here instead of usable which leads to somewhat confused + // internal/external nomenclature, but that's ok cause that's probably what the user + // really wanted anyway. + self.list_channels_with_filter(|&(_, ref channel)| channel.is_live()) + } + /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs /// will be accepted on the given channel, and after additional timeout/the closing of all /// pending HTLCs, the channel will be closed on chain. @@ -738,7 +762,7 @@ impl ChannelManager { let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?; @@ -795,7 +819,7 @@ impl ChannelManager { let mut chan = { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; if let Some(chan) = channel_state.by_id.remove(channel_id) { if let Some(short_id) = chan.get_short_channel_id() { channel_state.short_to_id.remove(&short_id); @@ -884,22 +908,30 @@ impl ChannelManager { } let mut chacha = ChaCha20::new(&rho, &[0u8; 8]); - let next_hop_data = { - let mut decoded = [0; 65]; - chacha.process(&msg.onion_routing_packet.hop_data[0..65], &mut decoded); - match msgs::OnionHopData::read(&mut Cursor::new(&decoded[..])) { + let mut chacha_stream = ChaChaReader { chacha: &mut chacha, read: Cursor::new(&msg.onion_routing_packet.hop_data[..]) }; + let (next_hop_data, next_hop_hmac) = { + match msgs::OnionHopData::read(&mut chacha_stream) { Err(err) => { let error_code = match err { msgs::DecodeError::UnknownVersion => 0x4000 | 1, // unknown realm byte + msgs::DecodeError::UnknownRequiredFeature| + msgs::DecodeError::InvalidValue| + msgs::DecodeError::ShortRead => 0x4000 | 22, // invalid_onion_payload _ => 0x2000 | 2, // Should never happen }; return_err!("Unable to decode our hop data", error_code, &[0;0]); }, - Ok(msg) => msg + Ok(msg) => { + let mut hmac = [0; 32]; + if let Err(_) = chacha_stream.read_exact(&mut hmac[..]) { + return_err!("Unable to decode hop data", 0x4000 | 22, &[0;0]); + } + (msg, hmac) + }, } }; - let pending_forward_info = if next_hop_data.hmac == [0; 32] { + let pending_forward_info = if next_hop_hmac == [0; 32] { #[cfg(test)] { // In tests, make sure that the initial onion pcket data is, at least, non-0. @@ -909,10 +941,11 @@ impl ChannelManager { // as-is (and were originally 0s). // Of course reverse path calculation is still pretty easy given naive routing // algorithms, but this fixes the most-obvious case. - let mut new_packet_data = [0; 19*65]; - chacha.process(&msg.onion_routing_packet.hop_data[65..], &mut new_packet_data[0..19*65]); - assert_ne!(new_packet_data[0..65], [0; 65][..]); - assert_ne!(new_packet_data[..], [0; 19*65][..]); + let mut next_bytes = [0; 32]; + chacha_stream.read_exact(&mut next_bytes).unwrap(); + assert_ne!(next_bytes[..], [0; 32][..]); + chacha_stream.read_exact(&mut next_bytes).unwrap(); + assert_ne!(next_bytes[..], [0; 32][..]); } // OUR PAYMENT! @@ -921,11 +954,11 @@ impl ChannelManager { return_err!("The final CLTV expiry is too soon to handle", 17, &[0;0]); } // final_incorrect_htlc_amount - if next_hop_data.data.amt_to_forward > msg.amount_msat { + if next_hop_data.amt_to_forward > msg.amount_msat { return_err!("Upstream node sent less than we were supposed to receive in payment", 19, &byte_utils::be64_to_array(msg.amount_msat)); } // final_incorrect_cltv_expiry - if next_hop_data.data.outgoing_cltv_value != msg.cltv_expiry { + if next_hop_data.outgoing_cltv_value != msg.cltv_expiry { return_err!("Upstream node set CLTV to the wrong value", 18, &byte_utils::be32_to_array(msg.cltv_expiry)); } @@ -939,13 +972,24 @@ impl ChannelManager { payment_hash: msg.payment_hash.clone(), short_channel_id: 0, incoming_shared_secret: shared_secret, - amt_to_forward: next_hop_data.data.amt_to_forward, - outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value, + amt_to_forward: next_hop_data.amt_to_forward, + outgoing_cltv_value: next_hop_data.outgoing_cltv_value, }) } else { let mut new_packet_data = [0; 20*65]; - chacha.process(&msg.onion_routing_packet.hop_data[65..], &mut new_packet_data[0..19*65]); - chacha.process(&SIXTY_FIVE_ZEROS[..], &mut new_packet_data[19*65..]); + let read_pos = chacha_stream.read(&mut new_packet_data).unwrap(); + #[cfg(debug_assertions)] + { + // Check two things: + // a) that the behavior of our stream here will return Ok(0) even if the TLV + // read above emptied out our buffer and the unwrap() wont needlessly panic + // b) that we didn't somehow magically end up with extra data. + let mut t = [0; 1]; + debug_assert!(chacha_stream.read(&mut t).unwrap() == 0); + } + // Once we've emptied the set of bytes our peer gave us, encrypt 0 bytes until we + // fill the onion hop data we'll forward to our next-hop peer. + chacha_stream.chacha.process_in_place(&mut new_packet_data[read_pos..]); let mut new_pubkey = msg.onion_routing_packet.public_key.unwrap(); @@ -964,16 +1008,24 @@ impl ChannelManager { version: 0, public_key, hop_data: new_packet_data, - hmac: next_hop_data.hmac.clone(), + hmac: next_hop_hmac.clone(), + }; + + let short_channel_id = match next_hop_data.format { + msgs::OnionHopDataFormat::Legacy { short_channel_id } => short_channel_id, + msgs::OnionHopDataFormat::NonFinalNode { short_channel_id } => short_channel_id, + msgs::OnionHopDataFormat::FinalNode => { + return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0;0]); + }, }; PendingHTLCStatus::Forward(PendingForwardHTLCInfo { onion_packet: Some(outgoing_packet), payment_hash: msg.payment_hash.clone(), - short_channel_id: next_hop_data.data.short_channel_id, + short_channel_id: short_channel_id, incoming_shared_secret: shared_secret, - amt_to_forward: next_hop_data.data.amt_to_forward, - outgoing_cltv_value: next_hop_data.data.outgoing_cltv_value, + amt_to_forward: next_hop_data.amt_to_forward, + outgoing_cltv_value: next_hop_data.outgoing_cltv_value, }) }; @@ -1115,6 +1167,9 @@ impl ChannelManager { let onion_keys = secp_call!(onion_utils::construct_onion_keys(&self.secp_ctx, &route, &session_priv), APIError::RouteError{err: "Pubkey along hop was maliciously selected"}); let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route, cur_height)?; + if onion_utils::route_size_insane(&onion_payloads) { + return Err(APIError::RouteError{err: "Route had too large size once"}); + } let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, &payment_hash); let _ = self.total_consistency_lock.read().unwrap(); @@ -1127,7 +1182,7 @@ impl ChannelManager { Some(id) => id.clone(), }; - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; if let hash_map::Entry::Occupied(mut chan) = channel_state.by_id.entry(id) { match { if chan.get().get_their_node_id() != route.hops.first().unwrap().pubkey { @@ -1263,6 +1318,37 @@ impl ChannelManager { }) } + /// Generates a signed node_announcement from the given arguments and creates a + /// BroadcastNodeAnnouncement event. + /// + /// RGB is a node "color" and alias a printable human-readable string to describe this node to + /// humans. They carry no in-protocol meaning. + /// + /// addresses represent the set (possibly empty) of socket addresses on which this node accepts + /// incoming connections. + pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: msgs::NetAddressSet) { + let _ = self.total_consistency_lock.read().unwrap(); + + let announcement = msgs::UnsignedNodeAnnouncement { + features: NodeFeatures::supported(), + timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32, + node_id: self.get_our_node_id(), + rgb, alias, + addresses: addresses.to_vec(), + excess_address_data: Vec::new(), + excess_data: Vec::new(), + }; + let msghash = hash_to_message!(&Sha256dHash::hash(&announcement.encode()[..])[..]); + + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastNodeAnnouncement { + msg: msgs::NodeAnnouncement { + signature: self.secp_ctx.sign(&msghash, &self.our_network_key), + contents: announcement + }, + }); + } + /// Processes HTLCs which are pending waiting on random forward delay. /// /// Should only really ever be called in response to a PendingHTLCsForwardable event. @@ -1275,7 +1361,7 @@ impl ChannelManager { let mut handle_errors = Vec::new(); { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; for (short_chan_id, mut pending_forwards) in channel_state.forward_htlcs.drain() { if short_chan_id != 0 { @@ -1473,8 +1559,8 @@ impl ChannelManager { pub fn timer_chan_freshness_every_min(&self) { let _ = self.total_consistency_lock.read().unwrap(); let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); - for (_, chan) in channel_state.by_id { + let channel_state = &mut *channel_state_lock; + for (_, chan) in channel_state.by_id.iter_mut() { if chan.is_disabled_staged() && !chan.is_live() { if let Ok(update) = self.get_channel_update(&chan) { channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { @@ -1657,7 +1743,7 @@ impl ChannelManager { }, HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => { //TODO: Delay the claimed_funds relaying just like we do outbound relay! - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; let chan_id = match channel_state.short_to_id.get(&short_channel_id) { Some(chan_id) => chan_id.clone(), @@ -1729,9 +1815,9 @@ impl ChannelManager { { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); - let short_to_id = channel_state.short_to_id; - let pending_msg_events = channel_state.pending_msg_events; + let channel_state = &mut *channel_lock; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { if channel.is_awaiting_monitor_update() { let chan_monitor = channel.channel_monitor().clone(); @@ -1836,7 +1922,7 @@ impl ChannelManager { let channel = Channel::new_from_req(&*self.fee_estimator, &self.keys_manager, their_node_id.clone(), their_features, msg, 0, Arc::clone(&self.logger), &self.default_configuration) .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?; let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!", msg.temporary_channel_id.clone())), hash_map::Entry::Vacant(entry) => { @@ -1853,7 +1939,7 @@ impl ChannelManager { fn internal_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { let (value, output_script, user_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -1878,7 +1964,7 @@ impl ChannelManager { fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let ((funding_msg, monitor_update), mut chan) = { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -1910,7 +1996,7 @@ impl ChannelManager { } } let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id", funding_msg.channel_id)) @@ -1929,7 +2015,7 @@ impl ChannelManager { fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let (funding_txo, user_id) = { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -1954,7 +2040,7 @@ impl ChannelManager { fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -1985,7 +2071,7 @@ impl ChannelManager { fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { let (mut dropped_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { @@ -2032,7 +2118,7 @@ impl ChannelManager { fn internal_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { let (tx, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { if chan_entry.get().get_their_node_id() != *their_node_id { @@ -2086,7 +2172,7 @@ impl ChannelManager { //but we should prevent it anyway. let (mut pending_forward_info, mut channel_state_lock) = self.decode_update_add_htlc_onion(msg); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -2135,7 +2221,7 @@ impl ChannelManager { fn internal_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); let htlc_source = { - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2152,7 +2238,7 @@ impl ChannelManager { fn internal_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2167,7 +2253,7 @@ impl ChannelManager { fn internal_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2185,7 +2271,7 @@ impl ChannelManager { fn internal_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2261,7 +2347,7 @@ impl ChannelManager { fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (pending_forwards, mut pending_failures, short_channel_id) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2305,7 +2391,7 @@ impl ChannelManager { fn internal_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); + let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { if chan.get().get_their_node_id() != *their_node_id { @@ -2320,7 +2406,7 @@ impl ChannelManager { fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -2362,7 +2448,7 @@ impl ChannelManager { fn internal_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { @@ -2440,7 +2526,7 @@ impl ChannelManager { let mut channel_state_lock = self.channel_state.lock().unwrap(); let their_node_id; let err: Result<(), _> = loop { - let channel_state = channel_state_lock.borrow_parts(); + let channel_state = &mut *channel_state_lock; match channel_state.by_id.entry(channel_id) { hash_map::Entry::Vacant(_) => return Err(APIError::APIMisuseError{err: "Failed to find corresponding channel"}), @@ -2485,7 +2571,7 @@ impl ChannelManager { } } -impl events::MessageSendEventsProvider for ChannelManager { +impl events::MessageSendEventsProvider for ChannelManager where M::Target: ManyChannelMonitor { fn get_and_clear_pending_msg_events(&self) -> Vec { // TODO: Event release to users and serialization is currently race-y: it's very easy for a // user to serialize a ChannelManager with pending events in it and lose those events on @@ -2510,7 +2596,7 @@ impl events::MessageSendEventsProvider for ChannelManag } } -impl events::EventsProvider for ChannelManager { +impl events::EventsProvider for ChannelManager where M::Target: ManyChannelMonitor { fn get_and_clear_pending_events(&self) -> Vec { // TODO: Event release to users and serialization is currently race-y: it's very easy for a // user to serialize a ChannelManager with pending events in it and lose those events on @@ -2535,7 +2621,7 @@ impl events::EventsProvider for ChannelManager ChainListener for ChannelManager { +impl ChainListener for ChannelManager where M::Target: ManyChannelMonitor { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { let header_hash = header.bitcoin_hash(); log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len()); @@ -2543,9 +2629,9 @@ impl ChainListener for ChannelManager { let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); - let short_to_id = channel_state.short_to_id; - let pending_msg_events = channel_state.pending_msg_events; + let channel_state = &mut *channel_lock; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); if let Ok(Some(funding_locked)) = chan_res { @@ -2621,9 +2707,9 @@ impl ChainListener for ChannelManager { let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); - let short_to_id = channel_state.short_to_id; - let pending_msg_events = channel_state.pending_msg_events; + let channel_state = &mut *channel_lock; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, v| { if v.block_disconnected(header) { if let Some(short_id) = v.get_short_channel_id() { @@ -2649,7 +2735,7 @@ impl ChainListener for ChannelManager { } } -impl ChannelMessageHandler for ChannelManager { +impl ChannelMessageHandler for ChannelManager where M::Target: ManyChannelMonitor { fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) { let _ = self.total_consistency_lock.read().unwrap(); let res = self.internal_open_channel(their_node_id, their_features, msg); @@ -2798,11 +2884,12 @@ impl ChannelMessageHandler for ChannelManager ChannelMessageHandler for ChannelManager ChannelMessageHandler for ChannelManager node_id != their_node_id, &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != their_node_id, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, + &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != their_node_id, &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true, } }); } + if no_channels_remain { + self.per_peer_state.write().unwrap().remove(their_node_id); + } + for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } @@ -2871,13 +2965,28 @@ impl ChannelMessageHandler for ChannelManager { + e.insert(Mutex::new(PeerState { + latest_features: init_msg.features.clone(), + })); + }, + hash_map::Entry::Occupied(e) => { + e.get().lock().unwrap().latest_features = init_msg.features.clone(); + }, + } + } + let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_state_lock.borrow_parts(); - let pending_msg_events = channel_state.pending_msg_events; + let channel_state = &mut *channel_state_lock; + let pending_msg_events = &mut channel_state.pending_msg_events; channel_state.by_id.retain(|_, chan| { if chan.get_their_node_id() == *their_node_id { if !chan.have_received_message() { @@ -3097,7 +3206,7 @@ impl Readable for HTLCForwardInfo { } } -impl Writeable for ChannelManager { +impl Writeable for ChannelManager where M::Target: ManyChannelMonitor { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { let _ = self.total_consistency_lock.write().unwrap(); @@ -3141,6 +3250,16 @@ impl Writeable for ChannelManager Writeable for ChannelManager { +pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys, M: Deref> where M::Target: ManyChannelMonitor { /// The keys provider which will give us relevant keys. Some keys will be loaded during /// deserialization. pub keys_manager: Arc>, @@ -3174,7 +3293,7 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys> { /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that /// you have deserialized ChannelMonitors separately and will add them to your /// ManyChannelMonitor after deserializing this ChannelManager. - pub monitor: Arc, + pub monitor: M, /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be /// used to broadcast the latest local commitment transactions of channels which must be @@ -3200,8 +3319,8 @@ pub struct ChannelManagerReadArgs<'a, ChanSigner: ChannelKeys> { pub channel_monitors: &'a mut HashMap, } -impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable> ReadableArgs> for (Sha256dHash, ChannelManager) { - fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner>) -> Result { +impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable, M: Deref> ReadableArgs> for (Sha256dHash, ChannelManager) where M::Target: ManyChannelMonitor { + fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, ChanSigner, M>) -> Result { let _ver: u8 = Readable::read(reader)?; let min_ver: u8 = Readable::read(reader)?; if min_ver > SERIALIZATION_VERSION { @@ -3274,6 +3393,18 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable> ReadableArg claimable_htlcs.insert(payment_hash, previous_hops); } + let peer_count: u64 = Readable::read(reader)?; + let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, 128)); + for _ in 0..peer_count { + let peer_pubkey = Readable::read(reader)?; + let peer_state = PeerState { + latest_features: Readable::read(reader)?, + }; + per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); + } + + let last_node_announcement_serial: u32 = Readable::read(reader)?; + let channel_manager = ChannelManager { genesis_hash, fee_estimator: args.fee_estimator, @@ -3293,6 +3424,10 @@ impl<'a, R : ::std::io::Read, ChanSigner: ChannelKeys + Readable> ReadableArg }), our_network_key: args.keys_manager.get_node_secret(), + last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize), + + per_peer_state: RwLock::new(per_peer_state), + pending_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), keys_manager: args.keys_manager,