use ln::msgs::{HandleError,ChannelMessageHandler,MsgEncodable,MsgDecodable};
use util::{byte_utils, events, internal_traits, rng};
use util::sha2::Sha256;
+use util::chacha20poly1305rfc::ChaCha20;
+use util::logger::{Logger, Record};
use crypto;
use crypto::mac::{Mac,MacResult};
use crypto::hmac::Hmac;
use crypto::digest::Digest;
use crypto::symmetriccipher::SynchronousStreamCipher;
-use crypto::chacha20::ChaCha20;
use std::{ptr, mem};
use std::collections::HashMap;
short_to_id: HashMap<u64, [u8; 32]>,
next_forward: Instant,
/// short channel id -> forward infos. Key of 0 means payments received
+ /// Note that while this is held in the same mutex as the channels themselves, no consistency
+ /// guarantees are made about there existing a channel with the short id here, nor the short
+ /// ids in the PendingForwardHTLCInfo!
forward_htlcs: HashMap<u64, Vec<PendingForwardHTLCInfo>>,
+ /// Note that while this is held in the same mutex as the channels themselves, no consistency
+ /// guarantees are made about the channels given here actually existing anymore by the time you
+ /// go to read them!
claimable_htlcs: HashMap<[u8; 32], PendingOutboundHTLC>,
}
struct MutChannelHolder<'a> {
by_id: &'a mut HashMap<[u8; 32], Channel>,
short_to_id: &'a mut HashMap<u64, [u8; 32]>,
next_forward: &'a mut Instant,
- /// short channel id -> forward infos. Key of 0 means payments received
forward_htlcs: &'a mut HashMap<u64, Vec<PendingForwardHTLCInfo>>,
claimable_htlcs: &'a mut HashMap<[u8; 32], PendingOutboundHTLC>,
}
by_id: &mut self.by_id,
short_to_id: &mut self.short_to_id,
next_forward: &mut self.next_forward,
- /// short channel id -> forward infos. Key of 0 means payments received
forward_htlcs: &mut self.forward_htlcs,
claimable_htlcs: &mut self.claimable_htlcs,
}
}
}
+#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
+const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height";
+
/// Manager which keeps track of a number of channels and sends messages to the appropriate
/// channel, also tracking HTLC preimages and forwarding onion packets appropriately.
/// Implements ChannelMessageHandler, handling the multi-channel parts and passing things through
announce_channels_publicly: bool,
fee_proportional_millionths: u32,
- latest_block_height: AtomicUsize, //TODO: Compile-time assert this is at least 32-bits long
+ latest_block_height: AtomicUsize,
secp_ctx: Secp256k1,
channel_state: Mutex<ChannelHolder>,
our_network_key: SecretKey,
pending_events: Mutex<Vec<events::Event>>,
+
+ logger: Arc<Logger>,
}
const CLTV_EXPIRY_DELTA: u16 = 6 * 24 * 2; //TODO?
/// fee_proportional_millionths is an optional fee to charge any payments routed through us.
/// Non-proportional fees are fixed according to our risk using the provided fee estimator.
/// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`!
- pub fn new(our_network_key: SecretKey, fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>) -> Result<Arc<ChannelManager>, secp256k1::Error> {
+ pub fn new(our_network_key: SecretKey, fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>) -> Result<Arc<ChannelManager>, secp256k1::Error> {
let secp_ctx = Secp256k1::new();
let res = Arc::new(ChannelManager {
our_network_key,
pending_events: Mutex::new(Vec::new()),
+
+ logger,
});
let weak_res = Arc::downgrade(&res);
res.chain_monitor.register_listener(weak_res);
}
};
- let channel = Channel::new_outbound(&*self.fee_estimator, chan_keys, their_network_key, channel_value_satoshis, self.announce_channels_publicly, user_id);
+ let channel = Channel::new_outbound(&*self.fee_estimator, chan_keys, their_network_key, channel_value_satoshis, self.announce_channels_publicly, user_id, Arc::clone(&self.logger));
let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator)?;
let mut channel_state = self.channel_state.lock().unwrap();
match channel_state.by_id.insert(channel.channel_id(), channel) {
Ok(())
}
+ #[inline]
+ fn finish_force_close_channel(&self, shutdown_res: (Vec<Transaction>, Vec<[u8; 32]>)) {
+ let (local_txn, failed_htlcs) = shutdown_res;
+ for payment_hash in failed_htlcs {
+ // unknown_next_peer...I dunno who that is anymore....
+ self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() });
+ }
+ for tx in local_txn {
+ self.tx_broadcaster.broadcast_transaction(&tx);
+ }
+ //TODO: We need to have a way where outbound HTLC claims can result in us claiming the
+ //now-on-chain HTLC output for ourselves (and, thereafter, passing the HTLC backwards).
+ //TODO: We need to handle monitoring of pending offered HTLCs which just hit the chain and
+ //may be claimed, resulting in us claiming the inbound HTLCs (and back-failing after
+ //timeouts are hit and our claims confirm).
+ }
+
+ /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
+ /// the chain and rejecting new HTLCs on the given channel.
+ pub fn force_close_channel(&self, channel_id: &[u8; 32]) {
+ let mut chan = {
+ let mut channel_state_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_state_lock.borrow_parts();
+ if let Some(chan) = channel_state.by_id.remove(channel_id) {
+ if let Some(short_id) = chan.get_short_channel_id() {
+ channel_state.short_to_id.remove(&short_id);
+ }
+ chan
+ } else {
+ return;
+ }
+ };
+ self.finish_force_close_channel(chan.force_shutdown());
+ let mut events = self.pending_events.lock().unwrap();
+ if let Ok(update) = self.get_channel_update(&chan) {
+ events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ }
+
#[inline]
fn gen_rho_mu_from_shared_secret(shared_secret: &SharedSecret) -> ([u8; 32], [u8; 32]) {
({
/// Call this upon creation of a funding transaction for the given channel.
/// Panics if a funding transaction has already been provided for this channel.
+ /// May panic if the funding_txo is duplicative with some other channel (note that this should
+ /// be trivially prevented by using unique funding transaction keys per-channel).
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) {
+
macro_rules! add_pending_event {
($event: expr) => {
{
(chan, funding_msg.0, funding_msg.1)
},
Err(e) => {
+ log_error!(self, "Got bad signatures: {}!", e.err);
mem::drop(channel_state);
- add_pending_event!(events::Event::DisconnectPeer {
+ add_pending_event!(events::Event::HandleError {
node_id: chan.get_their_node_id(),
- msg: if let Some(msgs::ErrorAction::DisconnectPeer { msg } ) = e.action { msg } else { None },
+ action: e.action,
});
-
return;
},
}
});
let mut channel_state = self.channel_state.lock().unwrap();
- channel_state.by_id.insert(chan.channel_id(), chan);
+ match channel_state.by_id.entry(chan.channel_id()) {
+ hash_map::Entry::Occupied(_) => {
+ panic!("Generated duplicate funding txid?");
+ },
+ hash_map::Entry::Vacant(e) => {
+ e.insert(chan);
+ }
+ }
}
fn get_announcement_sigs(&self, chan: &Channel) -> Result<Option<msgs::AnnouncementSignatures>, HandleError> {
if !add_htlc_msgs.is_empty() {
let (commitment_msg, monitor) = match forward_chan.send_commitment() {
Ok(res) => res,
- Err(_) => {
+ Err(_e) => {
//TODO: Handle...this is bad!
continue;
},
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: Vec::new() })
}
+ /// Fails an HTLC backwards to the sender of it to us.
+ /// Note that while we take a channel_state lock as input, we do *not* assume consistency here.
+ /// There are several callsites that do stupid things like loop over a list of payment_hashes
+ /// to fail and take the channel_state lock for each iteration (as we take ownership and may
+ /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to
+ /// still-available channels.
fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard<ChannelHolder>, payment_hash: &[u8; 32], onion_error: HTLCFailReason) -> bool {
let mut pending_htlc = {
match channel_state.claimable_htlcs.remove(payment_hash) {
}
match pending_htlc {
- PendingOutboundHTLC::CycledRoute { .. } => { panic!("WAT"); },
+ PendingOutboundHTLC::CycledRoute { .. } => unreachable!(),
PendingOutboundHTLC::OutboundRoute { .. } => {
mem::drop(channel_state);
}
let mut pending_events = self.pending_events.lock().unwrap();
+ //TODO: replace by HandleError ? UpdateFailHTLC in handle_update_add_htlc need also to build a CommitmentSigned
pending_events.push(events::Event::SendFailHTLC {
node_id,
msg: msg,
}
match pending_htlc {
- PendingOutboundHTLC::CycledRoute { .. } => { panic!("WAT"); },
+ PendingOutboundHTLC::CycledRoute { .. } => unreachable!(),
PendingOutboundHTLC::OutboundRoute { .. } => {
if from_user {
panic!("Called claim_funds with a preimage for an outgoing payment. There is nothing we can do with this, and something is seriously wrong if you knew this...");
let (node_id, fulfill_msgs) = {
let chan_id = match channel_state.short_to_id.get(&source_short_channel_id) {
Some(chan_id) => chan_id.clone(),
- None => return false
+ None => {
+ // TODO: There is probably a channel manager somewhere that needs to
+ // learn the preimage as the channel already hit the chain and that's
+ // why its missing.
+ return false
+ }
};
let chan = channel_state.by_id.get_mut(&chan_id).unwrap();
match chan.get_update_fulfill_htlc_and_commit(payment_preimage) {
Ok(msg) => (chan.get_their_node_id(), msg),
Err(_e) => {
+ // TODO: There is probably a channel manager somewhere that needs to
+ // learn the preimage as the channel may be about to hit the chain.
//TODO: Do something with e?
return false;
},
};
mem::drop(channel_state);
- match fulfill_msgs {
- Some((msg, commitment_msg, chan_monitor)) => {
- if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
- unimplemented!();// but def dont push the event...
- }
+ if let Some(chan_monitor) = fulfill_msgs.1 {
+ if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
+ unimplemented!();// but def dont push the event...
+ }
+ }
- let mut pending_events = self.pending_events.lock().unwrap();
- pending_events.push(events::Event::SendFulfillHTLC {
- node_id: node_id,
- msg,
- commitment_msg,
- });
- },
- None => {},
+ if let Some((msg, commitment_msg)) = fulfill_msgs.0 {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ pending_events.push(events::Event::SendFulfillHTLC {
+ node_id: node_id,
+ msg,
+ commitment_msg,
+ });
}
true
},
impl ChainListener for ChannelManager {
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
let mut new_events = Vec::new();
+ let mut failed_channels = Vec::new();
{
- let mut channel_state = self.channel_state.lock().unwrap();
- let mut short_to_ids_to_insert = Vec::new();
- let mut short_to_ids_to_remove = Vec::new();
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_lock.borrow_parts();
+ let short_to_id = channel_state.short_to_id;
channel_state.by_id.retain(|_, channel| {
- if let Some(funding_locked) = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched) {
+ let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
+ if let Ok(Some(funding_locked)) = chan_res {
let announcement_sigs = match self.get_announcement_sigs(channel) {
Ok(res) => res,
- Err(_e) => {
+ Err(e) => {
+ log_error!(self, "Got error handling message: {}!", e.err);
//TODO: push e on events and blow up the channel (it has bad keys)
return true;
}
msg: funding_locked,
announcement_sigs: announcement_sigs
});
- short_to_ids_to_insert.push((channel.get_short_channel_id().unwrap(), channel.channel_id()));
+ short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
+ } else if let Err(e) = chan_res {
+ new_events.push(events::Event::HandleError {
+ node_id: channel.get_their_node_id(),
+ action: e.action,
+ });
+ if channel.is_shutdown() {
+ return false;
+ }
}
if let Some(funding_txo) = channel.get_funding_txo() {
for tx in txn_matched {
for inp in tx.input.iter() {
if inp.prev_hash == funding_txo.txid && inp.prev_index == funding_txo.index as u32 {
if let Some(short_id) = channel.get_short_channel_id() {
- short_to_ids_to_remove.push(short_id);
+ short_to_id.remove(&short_id);
}
- channel.force_shutdown();
+ // It looks like our counterparty went on-chain. We go ahead and
+ // broadcast our latest local state as well here, just in case its
+ // some kind of SPV attack, though we expect these to be dropped.
+ failed_channels.push(channel.force_shutdown());
if let Ok(update) = self.get_channel_update(&channel) {
new_events.push(events::Event::BroadcastChannelUpdate {
msg: update
}
}
}
- if channel.channel_monitor().would_broadcast_at_height(height) {
+ if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height) {
if let Some(short_id) = channel.get_short_channel_id() {
- short_to_ids_to_remove.push(short_id);
+ short_to_id.remove(&short_id);
}
- channel.force_shutdown();
+ failed_channels.push(channel.force_shutdown());
+ // If would_broadcast_at_height() is true, the channel_monitor will broadcast
+ // the latest local tx for us, so we should skip that here (it doesn't really
+ // hurt anything, but does make tests a bit simpler).
+ failed_channels.last_mut().unwrap().0 = Vec::new();
if let Ok(update) = self.get_channel_update(&channel) {
new_events.push(events::Event::BroadcastChannelUpdate {
msg: update
}
true
});
- for to_remove in short_to_ids_to_remove {
- channel_state.short_to_id.remove(&to_remove);
- }
- for to_insert in short_to_ids_to_insert {
- channel_state.short_to_id.insert(to_insert.0, to_insert.1);
- }
+ }
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
}
let mut pending_events = self.pending_events.lock().unwrap();
for funding_locked in new_events.drain(..) {
/// We force-close the channel without letting our counterparty participate in the shutdown
fn block_disconnected(&self, header: &BlockHeader) {
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = channel_lock.borrow_parts();
- let short_to_id = channel_state.short_to_id;
- channel_state.by_id.retain(|_, v| {
- if v.block_disconnected(header) {
- let tx = v.force_shutdown();
- for broadcast_tx in tx {
- self.tx_broadcaster.broadcast_transaction(&broadcast_tx);
- }
- if let Some(short_id) = v.get_short_channel_id() {
- short_to_id.remove(&short_id);
+ let mut new_events = Vec::new();
+ let mut failed_channels = Vec::new();
+ {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_lock.borrow_parts();
+ let short_to_id = channel_state.short_to_id;
+ channel_state.by_id.retain(|_, v| {
+ if v.block_disconnected(header) {
+ if let Some(short_id) = v.get_short_channel_id() {
+ short_to_id.remove(&short_id);
+ }
+ failed_channels.push(v.force_shutdown());
+ if let Ok(update) = self.get_channel_update(&v) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ false
+ } else {
+ true
}
- false
- } else {
- true
+ });
+ }
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
+ }
+ if !new_events.is_empty() {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ for funding_locked in new_events.drain(..) {
+ pending_events.push(funding_locked);
}
- });
+ }
self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
}
}
let chan_keys = if cfg!(feature = "fuzztarget") {
ChannelKeys {
- funding_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- revocation_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- delayed_payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- htlc_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- channel_close_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
- channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(),
+ funding_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]).unwrap(),
+ revocation_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0]).unwrap(),
+ payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0]).unwrap(),
+ delayed_payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0]).unwrap(),
+ htlc_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0]).unwrap(),
+ channel_close_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0]).unwrap(),
+ channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0]).unwrap(),
commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
}
} else {
}
};
- let channel = Channel::new_from_req(&*self.fee_estimator, chan_keys, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly)?;
+ let channel = Channel::new_from_req(&*self.fee_estimator, chan_keys, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger))?;
let accept_msg = channel.get_accept_channel()?;
channel_state.by_id.insert(channel.channel_id(), channel);
Ok(accept_msg)
}
fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<msgs::FundingSigned, HandleError> {
- //TODO: broke this - a node shouldn't be able to get their channel removed by sending a
- //funding_created a second time, or long after the first, or whatever (note this also
- //leaves the short_to_id map in a busted state.
let (chan, funding_msg, monitor_update) = {
let mut channel_state = self.channel_state.lock().unwrap();
- match channel_state.by_id.remove(&msg.temporary_channel_id) {
- Some(mut chan) => {
- if chan.get_their_node_id() != *their_node_id {
+ match channel_state.by_id.entry(msg.temporary_channel_id.clone()) {
+ hash_map::Entry::Occupied(mut chan) => {
+ if chan.get().get_their_node_id() != *their_node_id {
return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None})
}
- match chan.funding_created(msg) {
+ match chan.get_mut().funding_created(msg) {
Ok((funding_msg, monitor_update)) => {
- (chan, funding_msg, monitor_update)
+ (chan.remove(), funding_msg, monitor_update)
},
Err(e) => {
+ //TODO: Possibly remove the channel depending on e.action
return Err(e);
}
}
},
- None => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
+ hash_map::Entry::Vacant(_) => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
}
}; // Release channel lock for install_watch_outpoint call,
// note that this means if the remote end is misbehaving and sends a message for the same
unimplemented!();
}
let mut channel_state = self.channel_state.lock().unwrap();
- channel_state.by_id.insert(funding_msg.channel_id, chan);
+ match channel_state.by_id.entry(funding_msg.channel_id) {
+ hash_map::Entry::Occupied(_) => {
+ return Err(HandleError {
+ err: "Duplicate channel_id!",
+ action: Some(msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { channel_id: funding_msg.channel_id, data: "Already had channel with the new channel_id".to_owned() } })
+ });
+ },
+ hash_map::Entry::Vacant(e) => {
+ e.insert(chan);
+ }
+ }
Ok(funding_msg)
}
// destination. That's OK since those nodes are probably busted or trying to do network
// mapping through repeated loops. In either case, we want them to stop talking to us, so
// we send permanent_node_failure.
- match &claimable_htlcs_entry {
- &hash_map::Entry::Occupied(ref e) => {
- let mut acceptable_cycle = false;
- match e.get() {
- &PendingOutboundHTLC::OutboundRoute { .. } => {
- acceptable_cycle = pending_forward_info.short_channel_id == 0;
- },
- _ => {},
- }
- if !acceptable_cycle {
- return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]);
- }
- },
- _ => {},
+ if let &hash_map::Entry::Occupied(ref e) = &claimable_htlcs_entry {
+ let mut acceptable_cycle = false;
+ if let &PendingOutboundHTLC::OutboundRoute { .. } = e.get() {
+ acceptable_cycle = pending_forward_info.short_channel_id == 0;
+ }
+ if !acceptable_cycle {
+ return_err!("Payment looped through us twice", 0x4000 | 0x2000 | 2, &[0;0]);
+ }
}
let (source_short_channel_id, res) = match channel_state.by_id.get_mut(&msg.channel_id) {
pending_forward_info.prev_short_channel_id = short_channel_id;
(short_channel_id, chan.update_add_htlc(&msg, pending_forward_info)?)
},
- None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}), //TODO: panic?
+ None => return Err(HandleError{err: "Failed to find corresponding channel", action: None}),
};
match claimable_htlcs_entry {
&mut PendingOutboundHTLC::OutboundRoute { ref route, ref session_priv } => {
(route.clone(), session_priv.clone())
},
- _ => { panic!("WAT") },
+ _ => unreachable!(),
};
*outbound_route = PendingOutboundHTLC::CycledRoute {
source_short_channel_id,
// is broken, we may have enough info to get our own money!
self.claim_funds_internal(msg.payment_preimage.clone(), false);
- let monitor = {
- let mut channel_state = self.channel_state.lock().unwrap();
- match channel_state.by_id.get_mut(&msg.channel_id) {
- Some(chan) => {
- if chan.get_their_node_id() != *their_node_id {
- return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None})
- }
- chan.update_fulfill_htlc(&msg)?
- },
- None => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
- }
- };
- if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) {
- unimplemented!();
+ let mut channel_state = self.channel_state.lock().unwrap();
+ match channel_state.by_id.get_mut(&msg.channel_id) {
+ Some(chan) => {
+ if chan.get_their_node_id() != *their_node_id {
+ return Err(HandleError{err: "Got a message for a channel from the wrong node!", action: None})
+ }
+ chan.update_fulfill_htlc(&msg)
+ },
+ None => return Err(HandleError{err: "Failed to find corresponding channel", action: None})
}
- Ok(())
}
fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<Option<msgs::HTLCFailChannelUpdate>, HandleError> {
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
let mut new_events = Vec::new();
+ let mut failed_channels = Vec::new();
{
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = channel_state_lock.borrow_parts();
if let Some(short_id) = chan.get_short_channel_id() {
short_to_id.remove(&short_id);
}
- let txn_to_broadcast = chan.force_shutdown();
- for tx in txn_to_broadcast {
- self.tx_broadcaster.broadcast_transaction(&tx);
- }
+ failed_channels.push(chan.force_shutdown());
if let Ok(update) = self.get_channel_update(&chan) {
new_events.push(events::Event::BroadcastChannelUpdate {
msg: update
}
}
}
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
+ }
if !new_events.is_empty() {
let mut pending_events = self.pending_events.lock().unwrap();
for event in new_events.drain(..) {
use ln::msgs::{MsgEncodable,ChannelMessageHandler,RoutingMessageHandler};
use util::test_utils;
use util::events::{Event, EventsProvider};
+ use util::logger::Logger;
use bitcoin::util::hash::Sha256dHash;
use bitcoin::blockdata::block::{Block, BlockHeader};
{
let mut added_monitors = $node.chan_monitor.added_monitors.lock().unwrap();
if $last_node {
- assert_eq!(added_monitors.len(), 1);
+ assert_eq!(added_monitors.len(), 0);
} else {
- assert_eq!(added_monitors.len(), 2);
- assert!(added_monitors[0].0 != added_monitors[1].0);
+ assert_eq!(added_monitors.len(), 1);
}
added_monitors.clear();
}
let mut nodes = Vec::new();
let mut rng = thread_rng();
let secp_ctx = Secp256k1::new();
+ let logger: Arc<Logger> = Arc::new(test_utils::TestLogger::new());
for _ in 0..node_count {
let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
- let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new());
+ let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Arc::clone(&logger)));
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone()));
let node_id = {
rng.fill_bytes(&mut key_slice);
SecretKey::from_slice(&secp_ctx, &key_slice).unwrap()
};
- let node = ChannelManager::new(node_id.clone(), 0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone()).unwrap();
- let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &node_id).unwrap());
+ let node = ChannelManager::new(node_id.clone(), 0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger)).unwrap();
+ let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &node_id).unwrap(), Arc::clone(&logger));
nodes.push(Node { feeest, chain_monitor, tx_broadcaster, chan_monitor, node_id, node, router });
}
let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
- assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+ test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE);
}
get_announce_close_broadcast_events(&nodes, 0, 1);
assert_eq!(nodes[0].node.list_channels().len(), 0);
let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
- assert_eq!(nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+ test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE);
}
get_announce_close_broadcast_events(&nodes, 1, 2);
assert_eq!(nodes[1].node.list_channels().len(), 0);
nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
{
let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
- assert_eq!(node_txn.len(), 1);
+ assert_eq!(node_txn.len(), 2);
assert_eq!(node_txn[0].input.len(), 1);
let mut funding_tx_map = HashMap::new();
funding_tx_map.insert(revoked_local_txn[0].txid(), revoked_local_txn[0].clone());
node_txn[0].verify(&funding_tx_map).unwrap();
- node_txn.clear();
+ node_txn.swap_remove(0);
}
+ test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);