X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=src%2Fln%2Fchannelmanager.rs;h=f39d0c093ceac2055c7105821a8396c7be431fd8;hb=b17211ffe42d029280fc45df42dd1d6152eadd13;hp=67837af719c31829452ebff10849ae6c0f4da80d;hpb=9aed28fbf0a42f8d623523761f6b1fb14fa6f87e;p=rust-lightning diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index 67837af7..f39d0c09 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -115,14 +115,19 @@ struct ChannelHolder { short_to_id: HashMap, next_forward: Instant, /// short channel id -> forward infos. Key of 0 means payments received + /// Note that while this is held in the same mutex as the channels themselves, no consistency + /// guarantees are made about there existing a channel with the short id here, nor the short + /// ids in the PendingForwardHTLCInfo! forward_htlcs: HashMap>, + /// Note that while this is held in the same mutex as the channels themselves, no consistency + /// guarantees are made about the channels given here actually existing anymore by the time you + /// go to read them! claimable_htlcs: HashMap<[u8; 32], PendingOutboundHTLC>, } struct MutChannelHolder<'a> { by_id: &'a mut HashMap<[u8; 32], Channel>, short_to_id: &'a mut HashMap, next_forward: &'a mut Instant, - /// short channel id -> forward infos. Key of 0 means payments received forward_htlcs: &'a mut HashMap>, claimable_htlcs: &'a mut HashMap<[u8; 32], PendingOutboundHTLC>, } @@ -132,13 +137,15 @@ impl ChannelHolder { by_id: &mut self.by_id, short_to_id: &mut self.short_to_id, next_forward: &mut self.next_forward, - /// short channel id -> forward infos. Key of 0 means payments received forward_htlcs: &mut self.forward_htlcs, claimable_htlcs: &mut self.claimable_htlcs, } } } +#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] +const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height"; + /// Manager which keeps track of a number of channels and sends messages to the appropriate /// channel, also tracking HTLC preimages and forwarding onion packets appropriately. /// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through @@ -152,7 +159,7 @@ pub struct ChannelManager { announce_channels_publicly: bool, fee_proportional_millionths: u32, - latest_block_height: AtomicUsize, //TODO: Compile-time assert this is at least 32-bits long + latest_block_height: AtomicUsize, secp_ctx: Secp256k1, channel_state: Mutex, @@ -360,6 +367,47 @@ impl ChannelManager { Ok(()) } + #[inline] + fn finish_force_close_channel(&self, shutdown_res: (Vec, Vec<[u8; 32]>)) { + let (local_txn, failed_htlcs) = shutdown_res; + for payment_hash in failed_htlcs { + // unknown_next_peer...I dunno who that is anymore.... + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() }); + } + for tx in local_txn { + self.tx_broadcaster.broadcast_transaction(&tx); + } + //TODO: We need to have a way where outbound HTLC claims can result in us claiming the + //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards). + //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and + //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after + //timeouts are hit and our claims confirm). + } + + /// Force closes a channel, immediately broadcasting the latest local commitment transaction to + /// the chain and rejecting new HTLCs on the given channel. + pub fn force_close_channel(&self, channel_id: &[u8; 32]) { + let mut chan = { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + if let Some(chan) = channel_state.by_id.remove(channel_id) { + if let Some(short_id) = chan.get_short_channel_id() { + channel_state.short_to_id.remove(&short_id); + } + chan + } else { + return; + } + }; + self.finish_force_close_channel(chan.force_shutdown()); + let mut events = self.pending_events.lock().unwrap(); + if let Ok(update) = self.get_channel_update(&chan) { + events.push(events::Event::BroadcastChannelUpdate { + msg: update + }); + } + } + #[inline] fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) { ({ @@ -884,6 +932,12 @@ impl ChannelManager { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: Vec::new() }) } + /// Fails an HTLC backwards to the sender of it to us. + /// Note that while we take a channel_state lock as input, we do *not* assume consistency here. + /// There are several callsites that do stupid things like loop over a list of payment_hashes + /// to fail and take the channel_state lock for each iteration (as we take ownership and may + /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to + /// still-available channels. fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard, payment_hash: &[u8; 32], onion_error: HTLCFailReason) -> bool { let mut pending_htlc = { match channel_state.claimable_htlcs.remove(payment_hash) { @@ -904,7 +958,7 @@ impl ChannelManager { } match pending_htlc { - PendingOutboundHTLC::CycledRoute { .. } => { panic!("WAT"); }, + PendingOutboundHTLC::CycledRoute { .. } => unreachable!(), PendingOutboundHTLC::OutboundRoute { .. } => { mem::drop(channel_state); @@ -1000,7 +1054,7 @@ impl ChannelManager { } match pending_htlc { - PendingOutboundHTLC::CycledRoute { .. } => { panic!("WAT"); }, + PendingOutboundHTLC::CycledRoute { .. } => unreachable!(), PendingOutboundHTLC::OutboundRoute { .. } => { if from_user { panic!("Called claim_funds with a preimage for an outgoing payment. There is nothing we can do with this, and something is seriously wrong if you knew this..."); @@ -1016,13 +1070,20 @@ impl ChannelManager { let (node_id, fulfill_msgs) = { let chan_id = match channel_state.short_to_id.get(&source_short_channel_id) { Some(chan_id) => chan_id.clone(), - None => return false + None => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel already hit the chain and that's + // why its missing. + return false + } }; let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); match chan.get_update_fulfill_htlc_and_commit(payment_preimage) { Ok(msg) => (chan.get_their_node_id(), msg), Err(_e) => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel may be about to hit the chain. //TODO: Do something with e? return false; }, @@ -1074,12 +1135,14 @@ impl events::EventsProvider for ChannelManager { impl ChainListener for ChannelManager { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { let mut new_events = Vec::new(); + let mut failed_channels = Vec::new(); { - let mut channel_state = self.channel_state.lock().unwrap(); - let mut short_to_ids_to_insert = Vec::new(); - let mut short_to_ids_to_remove = Vec::new(); + let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_lock.borrow_parts(); + let short_to_id = channel_state.short_to_id; channel_state.by_id.retain(|_, channel| { - if let Some(funding_locked) = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched) { + let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); + if let Ok(Some(funding_locked)) = chan_res { let announcement_sigs = match self.get_announcement_sigs(channel) { Ok(res) => res, Err(_e) => { @@ -1092,16 +1155,29 @@ impl ChainListener for ChannelManager { msg: funding_locked, announcement_sigs: announcement_sigs }); - short_to_ids_to_insert.push((channel.get_short_channel_id().unwrap(), channel.channel_id())); + short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); + } else if let Err(e) = chan_res { + if let Some(msgs::ErrorAction::DisconnectPeer { msg }) = e.action { + new_events.push(events::Event::DisconnectPeer { + node_id: channel.get_their_node_id(), + msg: msg + }); + } else { unreachable!(); } + if channel.is_shutdown() { + return false; + } } if let Some(funding_txo) = channel.get_funding_txo() { for tx in txn_matched { for inp in tx.input.iter() { if inp.prev_hash == funding_txo.txid && inp.prev_index == funding_txo.index as u32 { if let Some(short_id) = channel.get_short_channel_id() { - short_to_ids_to_remove.push(short_id); + short_to_id.remove(&short_id); } - channel.force_shutdown(); + // It looks like our counterparty went on-chain. We go ahead and + // broadcast our latest local state as well here, just in case its + // some kind of SPV attack, though we expect these to be dropped. + failed_channels.push(channel.force_shutdown()); if let Ok(update) = self.get_channel_update(&channel) { new_events.push(events::Event::BroadcastChannelUpdate { msg: update @@ -1114,9 +1190,13 @@ impl ChainListener for ChannelManager { } if channel.channel_monitor().would_broadcast_at_height(height) { if let Some(short_id) = channel.get_short_channel_id() { - short_to_ids_to_remove.push(short_id); + short_to_id.remove(&short_id); } - channel.force_shutdown(); + failed_channels.push(channel.force_shutdown()); + // If would_broadcast_at_height() is true, the channel_monitor will broadcast + // the latest local tx for us, so we should skip that here (it doesn't really + // hurt anything, but does make tests a bit simpler). + failed_channels.last_mut().unwrap().0 = Vec::new(); if let Ok(update) = self.get_channel_update(&channel) { new_events.push(events::Event::BroadcastChannelUpdate { msg: update @@ -1126,12 +1206,9 @@ impl ChainListener for ChannelManager { } true }); - for to_remove in short_to_ids_to_remove { - channel_state.short_to_id.remove(&to_remove); - } - for to_insert in short_to_ids_to_insert { - channel_state.short_to_id.insert(to_insert.0, to_insert.1); - } + } + for failure in failed_channels.drain(..) { + self.finish_force_close_channel(failure); } let mut pending_events = self.pending_events.lock().unwrap(); for funding_locked in new_events.drain(..) { @@ -1142,23 +1219,38 @@ impl ChainListener for ChannelManager { /// We force-close the channel without letting our counterparty participate in the shutdown fn block_disconnected(&self, header: &BlockHeader) { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = channel_lock.borrow_parts(); - let short_to_id = channel_state.short_to_id; - channel_state.by_id.retain(|_, v| { - if v.block_disconnected(header) { - let tx = v.force_shutdown(); - for broadcast_tx in tx { - self.tx_broadcaster.broadcast_transaction(&broadcast_tx); - } - if let Some(short_id) = v.get_short_channel_id() { - short_to_id.remove(&short_id); + let mut new_events = Vec::new(); + let mut failed_channels = Vec::new(); + { + let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_lock.borrow_parts(); + let short_to_id = channel_state.short_to_id; + channel_state.by_id.retain(|_, v| { + if v.block_disconnected(header) { + if let Some(short_id) = v.get_short_channel_id() { + short_to_id.remove(&short_id); + } + failed_channels.push(v.force_shutdown()); + if let Ok(update) = self.get_channel_update(&v) { + new_events.push(events::Event::BroadcastChannelUpdate { + msg: update + }); + } + false + } else { + true } - false - } else { - true + }); + } + for failure in failed_channels.drain(..) { + self.finish_force_close_channel(failure); + } + if !new_events.is_empty() { + let mut pending_events = self.pending_events.lock().unwrap(); + for funding_locked in new_events.drain(..) { + pending_events.push(funding_locked); } - }); + } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); } } @@ -1543,20 +1635,14 @@ impl ChannelMessageHandler for ChannelManager { // destination. That's OK since those nodes are probably busted or trying to do network // mapping through repeated loops. In either case, we want them to stop talking to us, so // we send permanent_node_failure. - match &claimable_htlcs_entry { - &hash_map::Entry::Occupied(ref e) => { - let mut acceptable_cycle = false; - match e.get() { - &PendingOutboundHTLC::OutboundRoute { .. } => { - acceptable_cycle = pending_forward_info.short_channel_id == 0; - }, - _ => {}, - } - if !acceptable_cycle { - return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]); - } - }, - _ => {}, + if let &hash_map::Entry::Occupied(ref e) = &claimable_htlcs_entry { + let mut acceptable_cycle = false; + if let &PendingOutboundHTLC::OutboundRoute { .. } = e.get() { + acceptable_cycle = pending_forward_info.short_channel_id == 0; + } + if !acceptable_cycle { + return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]); + } } let (source_short_channel_id, res) = match channel_state.by_id.get_mut(&msg.channel_id) { @@ -1571,7 +1657,7 @@ impl ChannelMessageHandler for ChannelManager { pending_forward_info.prev_short_channel_id = short_channel_id; (short_channel_id, chan.update_add_htlc(&msg, pending_forward_info)?) }, - None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}), //TODO: panic? + None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}), }; match claimable_htlcs_entry { @@ -1581,7 +1667,7 @@ impl ChannelMessageHandler for ChannelManager { &mut PendingOutboundHTLC::OutboundRoute { ref route, ref session_priv } => { (route.clone(), session_priv.clone()) }, - _ => { panic!("WAT") }, + _ => unreachable!(), }; *outbound_route = PendingOutboundHTLC::CycledRoute { source_short_channel_id, @@ -1607,22 +1693,16 @@ impl ChannelMessageHandler for ChannelManager { // is broken, we may have enough info to get our own money! self.claim_funds_internal(msg.payment_preimage.clone(), false); - let monitor = { - let mut channel_state = self.channel_state.lock().unwrap(); - match channel_state.by_id.get_mut(&msg.channel_id) { - Some(chan) => { - if chan.get_their_node_id() != *their_node_id { - return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None}) - } - chan.update_fulfill_htlc(&msg)? - }, - None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}) - } - }; - if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { - unimplemented!(); + let mut channel_state = self.channel_state.lock().unwrap(); + match channel_state.by_id.get_mut(&msg.channel_id) { + Some(chan) => { + if chan.get_their_node_id() != *their_node_id { + return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None}) + } + chan.update_fulfill_htlc(&msg) + }, + None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}) } - Ok(()) } fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result, HandleError> { @@ -1837,6 +1917,7 @@ impl ChannelMessageHandler for ChannelManager { fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { let mut new_events = Vec::new(); + let mut failed_channels = Vec::new(); { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); @@ -1847,10 +1928,7 @@ impl ChannelMessageHandler for ChannelManager { if let Some(short_id) = chan.get_short_channel_id() { short_to_id.remove(&short_id); } - let txn_to_broadcast = chan.force_shutdown(); - for tx in txn_to_broadcast { - self.tx_broadcaster.broadcast_transaction(&tx); - } + failed_channels.push(chan.force_shutdown()); if let Ok(update) = self.get_channel_update(&chan) { new_events.push(events::Event::BroadcastChannelUpdate { msg: update @@ -1871,6 +1949,9 @@ impl ChannelMessageHandler for ChannelManager { } } } + for failure in failed_channels.drain(..) { + self.finish_force_close_channel(failure); + } if !new_events.is_empty() { let mut pending_events = self.pending_events.lock().unwrap(); for event in new_events.drain(..) { @@ -2416,10 +2497,9 @@ mod tests { { let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap(); if $last_node { - assert_eq!(added_monitors.len(), 1); + assert_eq!(added_monitors.len(), 0); } else { - assert_eq!(added_monitors.len(), 2); - assert!(added_monitors[0].0 != added_monitors[1].0); + assert_eq!(added_monitors.len(), 1); } added_monitors.clear(); } @@ -2862,7 +2942,7 @@ mod tests { let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE); let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1); - assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0); + test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE); } get_announce_close_broadcast_events(&nodes, 0, 1); assert_eq!(nodes[0].node.list_channels().len(), 0); @@ -2877,7 +2957,7 @@ mod tests { let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT); let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1); - assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0); + test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE); } get_announce_close_broadcast_events(&nodes, 1, 2); assert_eq!(nodes[1].node.list_channels().len(), 0); @@ -2972,14 +3052,15 @@ mod tests { nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1); { let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap(); - assert_eq!(node_txn.len(), 1); + assert_eq!(node_txn.len(), 2); assert_eq!(node_txn[0].input.len(), 1); let mut funding_tx_map = HashMap::new(); funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone()); node_txn[0].verify(&funding_tx_map).unwrap(); - node_txn.clear(); + node_txn.swap_remove(0); } + test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE); nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1); let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);