X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fchannelmanager.rs;h=4777a3e810a220349d0f66bbaa921e77e489d9d4;hb=e382a7b4b3e5f3e59a9300b9d8a4d8bff06366fe;hp=d6ba67c2b36dcf24d23e3c880ccf64ddcb5a7b19;hpb=497643a65b67ee0ecc7d09a94459c0f42cc5ac99;p=rust-lightning diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index d6ba67c2..4777a3e8 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -22,11 +22,12 @@ use secp256k1; use chain::chaininterface::{BroadcasterInterface,ChainListener,ChainWatchInterface,FeeEstimator}; use chain::transaction::OutPoint; -use ln::channel::{Channel, ChannelError, ChannelKeys}; +use ln::channel::{Channel, ChannelError}; use ln::channelmonitor::{ChannelMonitorUpdateErr, ManyChannelMonitor, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route,RouteHop}; use ln::msgs; use ln::msgs::{ChannelMessageHandler, HandleError, RAACommitmentOrder}; +use chain::keysinterface::KeysInterface; use util::{byte_utils, events, internal_traits, rng}; use util::sha2::Sha256; use util::ser::{Readable, Writeable}; @@ -221,6 +222,16 @@ impl MsgHandleErrInternal { } } +/// Pass to fail_htlc_backwwards to indicate the reason to fail the payment +/// after a PaymentReceived event. +#[derive(PartialEq)] +pub enum PaymentFailReason { + /// Indicate the preimage for payment_hash is not known after a PaymentReceived event + PreimageUnknown, + /// Indicate the payment amount is incorrect ( received is < expected or > 2*expected ) after a PaymentReceived event + AmountMismatch, +} + /// We hold back HTLCs we intend to relay for a random interval in the range (this, 5*this). This /// provides some limited amount of privacy. Ideally this would range from somewhere like 1 second /// to 30 seconds, but people expect lightning to be, you know, kinda fast, sadly. We could @@ -246,6 +257,9 @@ struct ChannelHolder { /// guarantees are made about the channels given here actually existing anymore by the time you /// go to read them! claimable_htlcs: HashMap<[u8; 32], Vec>, + /// Messages to send to peers - pushed to in the same lock that they are generated in (except + /// for broadcast messages, where ordering isn't as strict). + pending_msg_events: Vec, } struct MutChannelHolder<'a> { by_id: &'a mut HashMap<[u8; 32], Channel>, @@ -253,6 +267,7 @@ struct MutChannelHolder<'a> { next_forward: &'a mut Instant, forward_htlcs: &'a mut HashMap>, claimable_htlcs: &'a mut HashMap<[u8; 32], Vec>, + pending_msg_events: &'a mut Vec, } impl ChannelHolder { fn borrow_parts(&mut self) -> MutChannelHolder { @@ -262,6 +277,7 @@ impl ChannelHolder { next_forward: &mut self.next_forward, forward_htlcs: &mut self.forward_htlcs, claimable_htlcs: &mut self.claimable_htlcs, + pending_msg_events: &mut self.pending_msg_events, } } } @@ -291,6 +307,8 @@ pub struct ChannelManager { pending_events: Mutex>, + keys_manager: Arc, + logger: Arc, } @@ -363,7 +381,7 @@ impl ChannelManager { /// Non-proportional fees are fixed according to our risk using the provided fee estimator. /// /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`! - pub fn new(our_network_key: SecretKey, fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc) -> Result, secp256k1::Error> { + pub fn new(fee_proportional_millionths: u32, announce_channels_publicly: bool, network: Network, feeest: Arc, monitor: Arc, chain_monitor: Arc, tx_broadcaster: Arc, logger: Arc, keys_manager: Arc) -> Result, secp256k1::Error> { let secp_ctx = Secp256k1::new(); let res = Arc::new(ChannelManager { @@ -384,11 +402,14 @@ impl ChannelManager { next_forward: Instant::now(), forward_htlcs: HashMap::new(), claimable_htlcs: HashMap::new(), + pending_msg_events: Vec::new(), }), - our_network_key, + our_network_key: keys_manager.get_node_secret(), pending_events: Mutex::new(Vec::new()), + keys_manager, + logger, }); let weak_res = Arc::downgrade(&res); @@ -403,32 +424,12 @@ impl ChannelManager { /// create_channel call. Note that user_channel_id defaults to 0 for inbound channels, so you /// may wish to avoid using 0 for user_id here. /// - /// If successful, will generate a SendOpenChannel event, so you should probably poll + /// If successful, will generate a SendOpenChannel message event, so you should probably poll /// PeerManager::process_events afterwards. /// /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat being greater than channel_value_satoshis * 1k pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64) -> Result<(), APIError> { - let chan_keys = if cfg!(feature = "fuzztarget") { - ChannelKeys { - funding_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - revocation_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - delayed_payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - htlc_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - channel_close_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap(), - commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - } - } else { - let mut key_seed = [0u8; 32]; - rng::fill_bytes(&mut key_seed); - match ChannelKeys::new_from_seed(&key_seed) { - Ok(key) => key, - Err(_) => panic!("RNG is busted!") - } - }; - - let channel = Channel::new_outbound(&*self.fee_estimator, chan_keys, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?; + let channel = Channel::new_outbound(&*self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, self.announce_channels_publicly, user_id, Arc::clone(&self.logger))?; let res = channel.get_open_channel(self.genesis_hash.clone(), &*self.fee_estimator); let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { @@ -441,9 +442,7 @@ impl ChannelManager { }, hash_map::Entry::Vacant(entry) => { entry.insert(channel); } } - - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::SendOpenChannel { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: their_network_key, msg: res, }); @@ -493,25 +492,29 @@ impl ChannelManager { /// will be accepted on the given channel, and after additional timeout/the closing of all /// pending HTLCs, the channel will be closed on chain. /// - /// May generate a SendShutdown event on success, which should be relayed. + /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let (mut res, node_id, chan_option) = { + let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - let res = chan_entry.get_mut().get_shutdown()?; + let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?; + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: chan_entry.get().get_their_node_id(), + msg: shutdown_msg + }); if chan_entry.get().is_shutdown() { if let Some(short_id) = chan_entry.get().get_short_channel_id() { channel_state.short_to_id.remove(&short_id); } - (res, chan_entry.get().get_their_node_id(), Some(chan_entry.remove_entry().1)) - } else { (res, chan_entry.get().get_their_node_id(), None) } + (failed_htlcs, Some(chan_entry.remove_entry().1)) + } else { (failed_htlcs, None) } }, hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel"}) } }; - for htlc_source in res.1.drain(..) { + for htlc_source in failed_htlcs.drain(..) { // unknown_next_peer...I dunno who that is anymore.... self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() }); } @@ -521,16 +524,12 @@ impl ChannelManager { } else { None } } else { None }; - let mut events = self.pending_events.lock().unwrap(); if let Some(update) = chan_update { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } - events.push(events::Event::SendShutdown { - node_id, - msg: res.0 - }); Ok(()) } @@ -570,9 +569,9 @@ impl ChannelManager { } }; self.finish_force_close_channel(chan.force_shutdown()); - let mut events = self.pending_events.lock().unwrap(); if let Ok(update) = self.get_channel_update(&chan) { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -599,9 +598,9 @@ impl ChannelManager { }; mem::drop(channel_state_lock); self.finish_force_close_channel(chan.force_shutdown()); - let mut events = self.pending_events.lock().unwrap(); if let Ok(update) = self.get_channel_update(&chan) { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1101,7 +1100,7 @@ impl ChannelManager { /// payment_preimage tracking (which you should already be doing as they represent "proof of /// payment") and prevent double-sends yourself. /// - /// May generate a SendHTLCs event on success, which should be relayed. + /// May generate a SendHTLCs message event on success, which should be relayed. /// /// Raises APIError::RoutError when invalid route or forward parameter /// (cltv_delta, fee, node public key) is specified @@ -1129,66 +1128,52 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); - let (first_hop_node_id, update_add, commitment_signed) = { - let mut channel_state = self.channel_state.lock().unwrap(); - - let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { - None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), - Some(id) => id.clone(), - }; - - let res = { - let res = { - let chan = channel_state.by_id.get_mut(&id).unwrap(); - if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { - return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); - } - if chan.is_awaiting_monitor_update() { - return Err(APIError::MonitorUpdateFailed); - } - if !chan.is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); - } - chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - route: route.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? - }; - match res { - Some((update_add, commitment_signed, chan_monitor)) => { - if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst); - return Err(APIError::MonitorUpdateFailed); - } - Some((update_add, commitment_signed)) - }, - None => None, - } - }; + let mut channel_state = self.channel_state.lock().unwrap(); - let first_hop_node_id = route.hops.first().unwrap().pubkey; + let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { + None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), + Some(id) => id.clone(), + }; - match res { - Some((update_add, commitment_signed)) => { - (first_hop_node_id, update_add, commitment_signed) - }, - None => return Ok(()), + let res = { + let chan = channel_state.by_id.get_mut(&id).unwrap(); + if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { + return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); } + if chan.is_awaiting_monitor_update() { + return Err(APIError::MonitorUpdateFailed); + } + if !chan.is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); + } + chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + route: route.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? }; + match res { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst); + return Err(APIError::MonitorUpdateFailed); + } - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::UpdateHTLCs { - node_id: first_hop_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: route.hops.first().unwrap().pubkey, + updates: msgs::CommitmentUpdate { + update_add_htlcs: vec![update_add], + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + }, + }); }, - }); + None => {}, + } + Ok(()) } @@ -1199,15 +1184,6 @@ impl ChannelManager { /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { - macro_rules! add_pending_event { - ($event: expr) => { - { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push($event); - } - } - } - let (chan, msg, chan_monitor) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.remove(temporary_channel_id) { @@ -1218,8 +1194,7 @@ impl ChannelManager { }, Err(e) => { log_error!(self, "Got bad signatures: {}!", e.err); - mem::drop(channel_state); - add_pending_event!(events::Event::HandleError { + channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.get_their_node_id(), action: e.action, }); @@ -1235,12 +1210,12 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } - add_pending_event!(events::Event::SendFundingCreated { + + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { node_id: chan.get_their_node_id(), msg: msg, }); - - let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(chan.channel_id()) { hash_map::Entry::Occupied(_) => { panic!("Generated duplicate funding txid?"); @@ -1349,7 +1324,7 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { unimplemented!();// but def dont push the event... } - new_events.push(events::Event::UpdateHTLCs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: forward_chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: add_htlc_msgs, @@ -1393,16 +1368,14 @@ impl ChannelManager { events.append(&mut new_events); } - /// Indicates that the preimage for payment_hash is unknown after a PaymentReceived event. - pub fn fail_htlc_backwards(&self, payment_hash: &[u8; 32]) -> bool { - // TODO: Add ability to return 0x4000|16 (incorrect_payment_amount) if the amount we - // received is < expected or > 2*expected + /// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect after a PaymentReceived event. + pub fn fail_htlc_backwards(&self, payment_hash: &[u8; 32], reason: PaymentFailReason) -> bool { let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(payment_hash); if let Some(mut sources) = removed_source { for htlc_with_hash in sources.drain(..) { if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); } - self.fail_htlc_backwards_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc_with_hash), payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: Vec::new() }); + self.fail_htlc_backwards_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc_with_hash), payment_hash, HTLCFailReason::Reason { failure_code: if reason == PaymentFailReason::PreimageUnknown {0x4000 | 15} else {0x4000 | 16}, data: Vec::new() }); } true } else { false } @@ -1414,19 +1387,20 @@ impl ChannelManager { /// to fail and take the channel_state lock for each iteration (as we take ownership and may /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to /// still-available channels. - fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) { + fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) { match source { HTLCSource::OutboundRoute { .. } => { - mem::drop(channel_state); + mem::drop(channel_state_lock); if let &HTLCFailReason::ErrorPacket { ref err } = &onion_error { let (channel_update, payment_retryable) = self.process_onion_failure(&source, err.data.clone()); - let mut pending_events = self.pending_events.lock().unwrap(); - if let Some(channel_update) = channel_update { - pending_events.push(events::Event::PaymentFailureNetworkUpdate { - update: channel_update, - }); + if let Some(update) = channel_update { + self.channel_state.lock().unwrap().pending_msg_events.push( + events::MessageSendEvent::PaymentFailureNetworkUpdate { + update, + } + ); } - pending_events.push(events::Event::PaymentFailed { + self.pending_events.lock().unwrap().push(events::Event::PaymentFailed { payment_hash: payment_hash.clone(), rejected_by_dest: !payment_retryable, }); @@ -1445,35 +1419,21 @@ impl ChannelManager { } }; - let (node_id, fail_msgs) = { - let chan_id = match channel_state.short_to_id.get(&short_channel_id) { - Some(chan_id) => chan_id.clone(), - None => return - }; + let channel_state = channel_state_lock.borrow_parts(); - let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { - Ok(Some((msg, commitment_msg, chan_monitor))) => { - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - (chan.get_their_node_id(), Some((msg, commitment_msg))) - }, - Ok(None) => (chan.get_their_node_id(), None), - Err(_e) => { - //TODO: Do something with e? - return; - }, - } + let chan_id = match channel_state.short_to_id.get(&short_channel_id) { + Some(chan_id) => chan_id.clone(), + None => return }; - match fail_msgs { - Some((msg, commitment_msg)) => { - mem::drop(channel_state); - - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { - node_id, + let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); + match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { + Ok(Some((msg, commitment_msg, chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: Vec::new(), @@ -1484,7 +1444,11 @@ impl ChannelManager { }, }); }, - None => {}, + Ok(None) => {}, + Err(_e) => { + //TODO: Do something with e? + return; + }, } }, } @@ -1511,10 +1475,10 @@ impl ChannelManager { true } else { false } } - fn claim_funds_internal(&self, mut channel_state: MutexGuard, source: HTLCSource, payment_preimage: [u8; 32]) { + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_preimage: [u8; 32]) { match source { HTLCSource::OutboundRoute { .. } => { - mem::drop(channel_state); + mem::drop(channel_state_lock); let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::PaymentSent { payment_preimage @@ -1522,49 +1486,46 @@ impl ChannelManager { }, HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => { //TODO: Delay the claimed_funds relaying just like we do outbound relay! - let (node_id, fulfill_msgs) = { - let chan_id = match channel_state.short_to_id.get(&short_channel_id) { - Some(chan_id) => chan_id.clone(), - None => { - // TODO: There is probably a channel manager somewhere that needs to - // learn the preimage as the channel already hit the chain and that's - // why its missing. - return - } - }; + let channel_state = channel_state_lock.borrow_parts(); + + let chan_id = match channel_state.short_to_id.get(&short_channel_id) { + Some(chan_id) => chan_id.clone(), + None => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel already hit the chain and that's + // why its missing. + return + } + }; - let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { - Ok((msgs, Some(chan_monitor))) => { + let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); + match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { + Ok((msgs, monitor_option)) => { + if let Some(chan_monitor) = monitor_option { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!();// but def dont push the event... } - (chan.get_their_node_id(), msgs) - }, - Ok((msgs, None)) => (chan.get_their_node_id(), msgs), - Err(_e) => { - // TODO: There is probably a channel manager somewhere that needs to - // learn the preimage as the channel may be about to hit the chain. - //TODO: Do something with e? - return - }, - } - }; - - mem::drop(channel_state); - if let Some((msg, commitment_msg)) = fulfill_msgs { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { - node_id: node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed: commitment_msg, } - }); + if let Some((msg, commitment_signed)) = msgs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_their_node_id(), + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); + } + }, + Err(_e) => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel may be about to hit the chain. + //TODO: Do something with e? + return + }, } }, } @@ -1579,7 +1540,6 @@ impl ChannelManager { /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update /// operation. pub fn test_restore_channel_monitor(&self) { - let mut new_events = Vec::new(); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); let mut htlc_failures = Vec::new(); @@ -1588,6 +1548,7 @@ impl ChannelManager { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { if channel.is_awaiting_monitor_update() { let chan_monitor = channel.channel_monitor(); @@ -1599,7 +1560,7 @@ impl ChannelManager { } close_results.push(channel.force_shutdown()); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1616,7 +1577,7 @@ impl ChannelManager { macro_rules! handle_cs { () => { if let Some(update) = commitment_update { - new_events.push(events::Event::UpdateHTLCs { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: channel.get_their_node_id(), updates: update, }); @@ -1624,7 +1585,7 @@ impl ChannelManager { } } macro_rules! handle_raa { () => { if let Some(revoke_and_ack) = raa { - new_events.push(events::Event::SendRevokeAndACK { + pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { node_id: channel.get_their_node_id(), msg: revoke_and_ack, }); @@ -1654,44 +1615,28 @@ impl ChannelManager { for res in close_results.drain(..) { self.finish_force_close_channel(res); } - - self.pending_events.lock().unwrap().append(&mut new_events); } - fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result { + fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash", msg.temporary_channel_id.clone())); } - let mut channel_state = self.channel_state.lock().unwrap(); - if channel_state.by_id.contains_key(&msg.temporary_channel_id) { - return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!", msg.temporary_channel_id.clone())); - } - - let chan_keys = if cfg!(feature = "fuzztarget") { - ChannelKeys { - funding_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]).unwrap(), - revocation_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0]).unwrap(), - payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0]).unwrap(), - delayed_payment_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0]).unwrap(), - htlc_base_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0]).unwrap(), - channel_close_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0]).unwrap(), - channel_monitor_claim_key: SecretKey::from_slice(&self.secp_ctx, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0]).unwrap(), - commitment_seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - } - } else { - let mut key_seed = [0u8; 32]; - rng::fill_bytes(&mut key_seed); - match ChannelKeys::new_from_seed(&key_seed) { - Ok(key) => key, - Err(_) => panic!("RNG is busted!") - } - }; - let channel = Channel::new_from_req(&*self.fee_estimator, chan_keys, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger)) + let channel = Channel::new_from_req(&*self.fee_estimator, &self.keys_manager, their_node_id.clone(), msg, 0, false, self.announce_channels_publicly, Arc::clone(&self.logger)) .map_err(|e| MsgHandleErrInternal::from_chan_no_close(e, msg.temporary_channel_id))?; - let accept_msg = channel.get_accept_channel(); - channel_state.by_id.insert(channel.channel_id(), channel); - Ok(accept_msg) + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + match channel_state.by_id.entry(channel.channel_id()) { + hash_map::Entry::Occupied(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("temporary_channel_id collision!", msg.temporary_channel_id.clone())), + hash_map::Entry::Vacant(entry) => { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + node_id: their_node_id.clone(), + msg: channel.get_accept_channel(), + }); + entry.insert(channel); + } + } + Ok(()) } fn internal_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { @@ -1721,7 +1666,7 @@ impl ChannelManager { Ok(()) } - fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result { + fn internal_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let (chan, funding_msg, monitor_update) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { @@ -1747,16 +1692,21 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(monitor_update.get_funding_txo().unwrap(), monitor_update) { unimplemented!(); } - let mut channel_state = self.channel_state.lock().unwrap(); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.entry(funding_msg.channel_id) { hash_map::Entry::Occupied(_) => { return Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id", funding_msg.channel_id)) }, hash_map::Entry::Vacant(e) => { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { + node_id: their_node_id.clone(), + msg: funding_msg, + }); e.insert(chan); } } - Ok(funding_msg) + Ok(()) } fn internal_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { @@ -1785,8 +1735,9 @@ impl ChannelManager { Ok(()) } - fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result, MsgHandleErrInternal> { - let mut channel_state = self.channel_state.lock().unwrap(); + fn internal_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), MsgHandleErrInternal> { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { if chan.get_their_node_id() != *their_node_id { @@ -1795,10 +1746,16 @@ impl ChannelManager { } chan.funding_locked(&msg) .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; - return Ok(self.get_announcement_sigs(chan)); + if let Some(announcement_sigs) = self.get_announcement_sigs(chan) { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: their_node_id.clone(), + msg: announcement_sigs, + }); + } + Ok(()) }, - None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) - }; + None => Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) + } } fn internal_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option, Option), MsgHandleErrInternal> { @@ -1829,8 +1786,8 @@ impl ChannelManager { } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update(&chan) { - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1869,8 +1826,8 @@ impl ChannelManager { } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update(&chan) { - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2250,42 +2207,43 @@ impl ChannelManager { } fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { - let (chan_announcement, chan_update) = { - let mut channel_state = self.channel_state.lock().unwrap(); - match channel_state.by_id.get_mut(&msg.channel_id) { - Some(chan) => { - if chan.get_their_node_id() != *their_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); - } - if !chan.is_usable() { - return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)})); - } + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); - let our_node_id = self.get_our_node_id(); - let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()) - .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; + match channel_state.by_id.get_mut(&msg.channel_id) { + Some(chan) => { + if chan.get_their_node_id() != *their_node_id { + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); + } + if !chan.is_usable() { + return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)})); + } + + let our_node_id = self.get_our_node_id(); + let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()) + .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; - let were_node_one = announcement.node_id_1 == our_node_id; - let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap(); - let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id); - secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action); - secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action); + let were_node_one = announcement.node_id_1 == our_node_id; + let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap(); + let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id); + secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action); + secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action); - let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); + let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); - (msgs::ChannelAnnouncement { + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg: msgs::ChannelAnnouncement { node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature }, node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig }, bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature }, bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig }, contents: announcement, - }, self.get_channel_update(chan).unwrap()) // can only fail if we're not in a ready state - }, - None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) - } - }; - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update }); + }, + update_msg: self.get_channel_update(chan).unwrap(), // can only fail if we're not in a ready state + }); + }, + None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) + } Ok(()) } @@ -2319,7 +2277,9 @@ impl ChannelManager { /// Note: This API is likely to change! #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { - let mut channel_state = self.channel_state.lock().unwrap(); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + match channel_state.by_id.get_mut(&channel_id) { None => return Err(APIError::APIMisuseError{err: "Failed to find corresponding channel"}), Some(chan) => { @@ -2336,8 +2296,7 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), @@ -2355,10 +2314,19 @@ impl ChannelManager { } } +impl events::MessageSendEventsProvider for ChannelManager { + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut ret = Vec::new(); + let mut channel_state = self.channel_state.lock().unwrap(); + mem::swap(&mut ret, &mut channel_state.pending_msg_events); + ret + } +} + impl events::EventsProvider for ChannelManager { fn get_and_clear_pending_events(&self) -> Vec { - let mut pending_events = self.pending_events.lock().unwrap(); let mut ret = Vec::new(); + let mut pending_events = self.pending_events.lock().unwrap(); mem::swap(&mut ret, &mut *pending_events); ret } @@ -2366,24 +2334,28 @@ impl events::EventsProvider for ChannelManager { impl ChainListener for ChannelManager { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); if let Ok(Some(funding_locked)) = chan_res { - let announcement_sigs = self.get_announcement_sigs(channel); - new_events.push(events::Event::SendFundingLocked { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { node_id: channel.get_their_node_id(), msg: funding_locked, - announcement_sigs: announcement_sigs }); + if let Some(announcement_sigs) = self.get_announcement_sigs(channel) { + pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + node_id: channel.get_their_node_id(), + msg: announcement_sigs, + }); + } short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } else if let Err(e) = chan_res { - new_events.push(events::Event::HandleError { + pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_their_node_id(), action: e.action, }); @@ -2403,7 +2375,7 @@ impl ChainListener for ChannelManager { // some kind of SPV attack, though we expect these to be dropped. failed_channels.push(channel.force_shutdown()); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2422,7 +2394,7 @@ impl ChainListener for ChannelManager { // hurt anything, but does make tests a bit simpler). failed_channels.last_mut().unwrap().0 = Vec::new(); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2434,21 +2406,17 @@ impl ChainListener for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - let mut pending_events = self.pending_events.lock().unwrap(); - for funding_locked in new_events.drain(..) { - pending_events.push(funding_locked); - } self.latest_block_height.store(height as usize, Ordering::Release); } /// We force-close the channel without letting our counterparty participate in the shutdown fn block_disconnected(&self, header: &BlockHeader) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, v| { if v.block_disconnected(header) { if let Some(short_id) = v.get_short_channel_id() { @@ -2456,7 +2424,7 @@ impl ChainListener for ChannelManager { } failed_channels.push(v.force_shutdown()); if let Ok(update) = self.get_channel_update(&v) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2469,12 +2437,6 @@ impl ChainListener for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - if !new_events.is_empty() { - let mut pending_events = self.pending_events.lock().unwrap(); - for funding_locked in new_events.drain(..) { - pending_events.push(funding_locked); - } - } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); } } @@ -2513,7 +2475,7 @@ macro_rules! handle_error { impl ChannelMessageHandler for ChannelManager { //TODO: Handle errors and close channel (or so) - fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result { + fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), HandleError> { handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id) } @@ -2521,7 +2483,7 @@ impl ChannelMessageHandler for ChannelManager { handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id) } - fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result { + fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), HandleError> { handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id) } @@ -2529,7 +2491,7 @@ impl ChannelMessageHandler for ChannelManager { handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id) } - fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result, HandleError> { + fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<(), HandleError> { handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id) } @@ -2578,13 +2540,13 @@ impl ChannelMessageHandler for ChannelManager { } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; if no_connection_possible { channel_state.by_id.retain(|_, chan| { if chan.get_their_node_id() == *their_node_id { @@ -2593,7 +2555,7 @@ impl ChannelMessageHandler for ChannelManager { } failed_channels.push(chan.force_shutdown()); if let Ok(update) = self.get_channel_update(&chan) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2625,12 +2587,6 @@ impl ChannelMessageHandler for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - if !new_events.is_empty() { - let mut pending_events = self.pending_events.lock().unwrap(); - for event in new_events.drain(..) { - pending_events.push(event); - } - } for (chan_update, mut htlc_sources) in failed_payments { for (htlc_source, payment_hash) in htlc_sources.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() }); @@ -2677,13 +2633,15 @@ mod tests { use chain::chaininterface; use chain::transaction::OutPoint; use chain::chaininterface::ChainListener; - use ln::channelmanager::{ChannelManager,OnionKeys}; + use chain::keysinterface::KeysInterface; + use chain::keysinterface; + use ln::channelmanager::{ChannelManager,OnionKeys,PaymentFailReason}; use ln::channelmonitor::{ChannelMonitorUpdateErr, CLTV_CLAIM_BUFFER, HTLC_FAIL_TIMEOUT_BLOCKS}; use ln::router::{Route, RouteHop, Router}; use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; use util::test_utils; - use util::events::{Event, EventsProvider}; + use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::logger::Logger; use util::ser::Writeable; @@ -2888,6 +2846,7 @@ mod tests { fn drop(&mut self) { if !::std::thread::panicking() { // Check that we processed all pending events + assert_eq!(self.node.get_and_clear_pending_msg_events().len(), 0); assert_eq!(self.node.get_and_clear_pending_events().len(), 0); assert_eq!(self.chan_monitor.added_monitors.lock().unwrap().len(), 0); } @@ -2904,20 +2863,26 @@ mod tests { (announcement, as_update, bs_update, channel_id, tx) } + macro_rules! get_event_msg { + ($node: expr, $event_type: path, $node_id: expr) => { + { + let events = $node.node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match events[0] { + $event_type { ref node_id, ref msg } => { + assert_eq!(*node_id, $node_id); + (*msg).clone() + }, + _ => panic!("Unexpected event"), + } + } + } + } + fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> Transaction { node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42).unwrap(); - - let events_1 = node_a.node.get_and_clear_pending_events(); - assert_eq!(events_1.len(), 1); - let accept_chan = match events_1[0] { - Event::SendOpenChannel { ref node_id, ref msg } => { - assert_eq!(*node_id, node_b.node.get_our_node_id()); - node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), msg).unwrap() - }, - _ => panic!("Unexpected event"), - }; - - node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), &accept_chan).unwrap(); + node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id())).unwrap(); + node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id())).unwrap(); let chan_id = *node_a.network_chan_count.borrow(); let tx; @@ -2944,22 +2909,15 @@ mod tests { _ => panic!("Unexpected event"), } - let events_3 = node_a.node.get_and_clear_pending_events(); - assert_eq!(events_3.len(), 1); - let funding_signed = match events_3[0] { - Event::SendFundingCreated { ref node_id, ref msg } => { - assert_eq!(*node_id, node_b.node.get_our_node_id()); - let res = node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), msg).unwrap(); - let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap(); - assert_eq!(added_monitors.len(), 1); - assert_eq!(added_monitors[0].0, funding_output); - added_monitors.clear(); - res - }, - _ => panic!("Unexpected event"), - }; + node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendFundingCreated, node_b.node.get_our_node_id())).unwrap(); + { + let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + assert_eq!(added_monitors[0].0, funding_output); + added_monitors.clear(); + } - node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &funding_signed).unwrap(); + node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id())).unwrap(); { let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap(); assert_eq!(added_monitors.len(), 1); @@ -2982,30 +2940,27 @@ mod tests { fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) { confirm_transaction(&node_b.chain_monitor, &tx, tx.version); - let events_5 = node_b.node.get_and_clear_pending_events(); - assert_eq!(events_5.len(), 1); - match events_5[0] { - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { - assert_eq!(*node_id, node_a.node.get_our_node_id()); - assert!(announcement_sigs.is_none()); - node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), msg).unwrap() - }, - _ => panic!("Unexpected event"), - }; + node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingLocked, node_a.node.get_our_node_id())).unwrap(); let channel_id; confirm_transaction(&node_a.chain_monitor, &tx, tx.version); - let events_6 = node_a.node.get_and_clear_pending_events(); - assert_eq!(events_6.len(), 1); - (match events_6[0] { - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { + let events_6 = node_a.node.get_and_clear_pending_msg_events(); + assert_eq!(events_6.len(), 2); + ((match events_6[0] { + MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { channel_id = msg.channel_id.clone(); assert_eq!(*node_id, node_b.node.get_our_node_id()); - (msg.clone(), announcement_sigs.clone().unwrap()) + msg.clone() + }, + _ => panic!("Unexpected event"), + }, match events_6[1] { + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + assert_eq!(*node_id, node_b.node.get_our_node_id()); + msg.clone() }, _ => panic!("Unexpected event"), - }, channel_id) + }), channel_id) } fn create_chan_between_nodes_with_value_a(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32], Transaction) { @@ -3015,26 +2970,24 @@ mod tests { } fn create_chan_between_nodes_with_value_b(node_a: &Node, node_b: &Node, as_funding_msgs: &(msgs::FundingLocked, msgs::AnnouncementSignatures)) -> (msgs::ChannelAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate) { - let bs_announcement_sigs = { - let bs_announcement_sigs = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &as_funding_msgs.0).unwrap().unwrap(); - node_b.node.handle_announcement_signatures(&node_a.node.get_our_node_id(), &as_funding_msgs.1).unwrap(); - bs_announcement_sigs - }; + node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &as_funding_msgs.0).unwrap(); + let bs_announcement_sigs = get_event_msg!(node_b, MessageSendEvent::SendAnnouncementSignatures, node_a.node.get_our_node_id()); + node_b.node.handle_announcement_signatures(&node_a.node.get_our_node_id(), &as_funding_msgs.1).unwrap(); - let events_7 = node_b.node.get_and_clear_pending_events(); + let events_7 = node_b.node.get_and_clear_pending_msg_events(); assert_eq!(events_7.len(), 1); let (announcement, bs_update) = match events_7[0] { - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { (msg, update_msg) }, _ => panic!("Unexpected event"), }; node_a.node.handle_announcement_signatures(&node_b.node.get_our_node_id(), &bs_announcement_sigs).unwrap(); - let events_8 = node_a.node.get_and_clear_pending_events(); + let events_8 = node_a.node.get_and_clear_pending_msg_events(); assert_eq!(events_8.len(), 1); let as_update = match events_8[0] { - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(*announcement == *msg); update_msg }, @@ -3077,10 +3030,10 @@ mod tests { let (tx_a, tx_b); node_a.close_channel(channel_id).unwrap(); - let events_1 = node_a.get_and_clear_pending_events(); + let events_1 = node_a.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); let shutdown_a = match events_1[0] { - Event::SendShutdown { ref node_id, ref msg } => { + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { assert_eq!(node_id, &node_b.get_our_node_id()); msg.clone() }, @@ -3116,19 +3069,19 @@ mod tests { assert_eq!(tx_a, tx_b); check_spends!(tx_a, funding_tx); - let events_2 = node_a.get_and_clear_pending_events(); + let events_2 = node_a.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let as_update = match events_2[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - let events_3 = node_b.get_and_clear_pending_events(); + let events_3 = node_b.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); let bs_update = match events_3[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), @@ -3151,9 +3104,9 @@ mod tests { SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed } } - fn from_event(event: Event) -> SendEvent { + fn from_event(event: MessageSendEvent) -> SendEvent { match event { - Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates), + MessageSendEvent::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates), _ => panic!("Unexpected event type!"), } } @@ -3183,6 +3136,7 @@ mod tests { check_added_monitors!($node_b, 1); if $fail_backwards { assert!($node_a.node.get_and_clear_pending_events().is_empty()); + assert!($node_a.node.get_and_clear_pending_msg_events().is_empty()); } assert!($node_a.node.handle_revoke_and_ack(&$node_b.node.get_our_node_id(), &bs_revoke_and_ack).unwrap().is_none()); { @@ -3220,7 +3174,7 @@ mod tests { origin_node.node.send_payment(route, our_payment_hash).unwrap(); check_added_monitors!(origin_node, 1); - let mut events = origin_node.node.get_and_clear_pending_events(); + let mut events = origin_node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -3243,9 +3197,9 @@ mod tests { node.node.channel_state.lock().unwrap().next_forward = Instant::now(); node.node.process_pending_htlc_forwards(); - let mut events_2 = node.node.get_and_clear_pending_events(); - assert_eq!(events_2.len(), 1); if idx == expected_route.len() - 1 { + let events_2 = node.node.get_and_clear_pending_events(); + assert_eq!(events_2.len(), 1); match events_2[0] { Event::PaymentReceived { ref payment_hash, amt } => { assert_eq!(our_payment_hash, *payment_hash); @@ -3254,6 +3208,8 @@ mod tests { _ => panic!("Unexpected event"), } } else { + let mut events_2 = node.node.get_and_clear_pending_msg_events(); + assert_eq!(events_2.len(), 1); check_added_monitors!(node, 1); payment_event = SendEvent::from_event(events_2.remove(0)); assert_eq!(payment_event.msgs.len(), 1); @@ -3292,11 +3248,11 @@ mod tests { update_fulfill_dance!(node, prev_node, false); } - let events = node.node.get_and_clear_pending_events(); + let events = node.node.get_and_clear_pending_msg_events(); if !skip_last || idx != expected_route.len() - 1 { assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); assert!(update_fail_htlcs.is_empty()); @@ -3368,7 +3324,7 @@ mod tests { } fn fail_payment_along_route(origin_node: &Node, expected_route: &[&Node], skip_last: bool, our_payment_hash: [u8; 32]) { - assert!(expected_route.last().unwrap().node.fail_htlc_backwards(&our_payment_hash)); + assert!(expected_route.last().unwrap().node.fail_htlc_backwards(&our_payment_hash, PaymentFailReason::PreimageUnknown)); check_added_monitors!(expected_route.last().unwrap(), 1); let mut next_msgs: Option<(msgs::UpdateFailHTLC, msgs::CommitmentSigned)> = None; @@ -3392,11 +3348,11 @@ mod tests { update_fail_dance!(node, prev_node, skip_last && idx == expected_route.len() - 1); } - let events = node.node.get_and_clear_pending_events(); + let events = node.node.get_and_clear_pending_msg_events(); if !skip_last || idx != expected_route.len() - 1 { assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert!(update_fulfill_htlcs.is_empty()); assert_eq!(update_fail_htlcs.len(), 1); @@ -3449,14 +3405,12 @@ mod tests { let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }); let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&logger))); let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}); + let mut seed = [0; 32]; + rng.fill_bytes(&mut seed); + let keys_manager = Arc::new(keysinterface::KeysManager::new(&seed, Network::Testnet, Arc::clone(&logger))); let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone())); - let node_id = { - let mut key_slice = [0; 32]; - rng.fill_bytes(&mut key_slice); - SecretKey::from_slice(&secp_ctx, &key_slice).unwrap() - }; - let node = ChannelManager::new(node_id.clone(), 0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger)).unwrap(); - let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &node_id), chain_monitor.clone(), Arc::clone(&logger)); + let node = ChannelManager::new(0, true, Network::Testnet, feeest.clone(), chan_monitor.clone(), chain_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone()).unwrap(); + let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger)); nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, network_payment_count: payment_count.clone(), network_chan_count: chan_count.clone(), @@ -3504,10 +3458,10 @@ mod tests { nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3521,7 +3475,7 @@ mod tests { check_added_monitors!(nodes[1], 1); let payment_event = { - let mut events_1 = nodes[1].node.get_and_clear_pending_events(); + let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); SendEvent::from_event(events_1.remove(0)) }; @@ -3600,10 +3554,10 @@ mod tests { nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let update_msg = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => { update_fee.as_ref() }, _ => panic!("Unexpected event"), @@ -3617,7 +3571,7 @@ mod tests { check_added_monitors!(nodes[1], 1); let payment_event = { - let mut events_1 = nodes[1].node.get_and_clear_pending_events(); + let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); SendEvent::from_event(events_1.remove(0)) }; @@ -3674,10 +3628,10 @@ mod tests { nodes[0].node.update_fee(channel_id, initial_feerate + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg_1, commitment_signed_1) = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { (update_fee.as_ref().unwrap(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3692,6 +3646,7 @@ mod tests { // transaction: nodes[0].node.update_fee(channel_id, initial_feerate + 40).unwrap(); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // Create the (3) update_fee message that nodes[0] will generate before it does... let mut update_msg_2 = msgs::UpdateFee { @@ -3756,10 +3711,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3804,10 +3759,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3829,8 +3784,8 @@ mod tests { assert_eq!(added_monitors.len(), 0); added_monitors.clear(); } - let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 0); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // node[1] has nothing to do let resp_option = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &revoke_msg).unwrap(); @@ -3921,10 +3876,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3944,10 +3899,10 @@ mod tests { // Create and deliver (4)... nodes[0].node.update_fee(channel_id, feerate+30).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -4218,19 +4173,19 @@ mod tests { } fn get_announce_close_broadcast_events(nodes: &Vec, a: usize, b: usize) { - let events_1 = nodes[a].node.get_and_clear_pending_events(); + let events_1 = nodes[a].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); let as_update = match events_1[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - let events_2 = nodes[b].node.get_and_clear_pending_events(); + let events_2 = nodes[b].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let bs_update = match events_2[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), @@ -4289,7 +4244,7 @@ mod tests { macro_rules! expect_forward { ($node: expr) => {{ - let mut events = $node.node.get_and_clear_pending_events(); + let mut events = $node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); check_added_monitors!($node, 1); let payment_event = SendEvent::from_event(events.remove(0)); @@ -4372,7 +4327,7 @@ mod tests { nodes[0].node.send_payment(route_1, our_payment_hash_1).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -4448,8 +4403,8 @@ mod tests { // this will also stuck in the holding cell nodes[0].node.send_payment(route_22, our_payment_hash_22).unwrap(); check_added_monitors!(nodes[0], 0); - let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 0); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // flush the pending htlc let (as_revoke_and_ack, as_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event_1.commitment_msg).unwrap(); @@ -4568,10 +4523,10 @@ mod tests { assert!($node.node.claim_funds($preimage)); check_added_monitors!($node, 1); - let events = $node.node.get_and_clear_pending_events(); + let events = $node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => { assert!(update_add_htlcs.is_empty()); assert!(update_fail_htlcs.is_empty()); assert_eq!(*node_id, $prev_node.node.get_our_node_id()); @@ -4862,10 +4817,10 @@ mod tests { route_payment(&nodes[0], &[&nodes[1]], 10000000); nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id); { - let events = nodes[0].node.get_and_clear_pending_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4879,10 +4834,10 @@ mod tests { nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]); { - let events = nodes[1].node.get_and_clear_pending_events(); + let events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4909,7 +4864,7 @@ mod tests { nodes[0].node.send_payment(route, our_payment_hash).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -4927,7 +4882,7 @@ mod tests { nodes[1].node.channel_state.lock().unwrap().next_forward = Instant::now(); nodes[1].node.process_pending_htlc_forwards(); - let mut events_2 = nodes[1].node.get_and_clear_pending_events(); + let mut events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); payment_event = SendEvent::from_event(events_2.remove(0)); assert_eq!(payment_event.msgs.len(), 1); @@ -4942,10 +4897,10 @@ mod tests { // transaction and ensure nodes[1] doesn't fail-backwards (this was originally a bug!). nodes[2].node.force_close_channel(&payment_event.commitment_msg.channel_id); - let events_3 = nodes[2].node.get_and_clear_pending_events(); + let events_3 = nodes[2].node.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); match events_3[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4963,11 +4918,11 @@ mod tests { let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]); - let events_4 = nodes[1].node.get_and_clear_pending_events(); + let events_4 = nodes[1].node.get_and_clear_pending_msg_events(); // Note no UpdateHTLCs event here from nodes[1] to nodes[0]! assert_eq!(events_4.len(), 1); match events_4[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -5012,10 +4967,10 @@ mod tests { nodes[0].node.block_disconnected(&headers.pop().unwrap()); } { - let events = nodes[0].node.get_and_clear_pending_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -5058,9 +5013,14 @@ mod tests { for chan_msgs in resp_1.drain(..) { if pre_all_htlcs { - let a = node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap()); - let _announcement_sigs_opt = a.unwrap(); - //TODO: Test announcement_sigs re-sending when we've implemented it + node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap(); + let announcement_event = node_a.node.get_and_clear_pending_msg_events(); + if !announcement_event.is_empty() { + assert_eq!(announcement_event.len(), 1); + if let MessageSendEvent::SendAnnouncementSignatures { .. } = announcement_event[0] { + //TODO: Test announcement_sigs re-sending + } else { panic!("Unexpected event!"); } + } } else { assert!(chan_msgs.0.is_none()); } @@ -5107,8 +5067,14 @@ mod tests { for chan_msgs in resp_2.drain(..) { if pre_all_htlcs { - let _announcement_sigs_opt = node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap(); - //TODO: Test announcement_sigs re-sending when we've implemented it + node_b.node.handle_funding_locked(&node_a.node.get_our_node_id(), &chan_msgs.0.unwrap()).unwrap(); + let announcement_event = node_b.node.get_and_clear_pending_msg_events(); + if !announcement_event.is_empty() { + assert_eq!(announcement_event.len(), 1); + if let MessageSendEvent::SendAnnouncementSignatures { .. } = announcement_event[0] { + //TODO: Test announcement_sigs re-sending + } else { panic!("Unexpected event!"); } + } } else { assert!(chan_msgs.0.is_none()); } @@ -5223,7 +5189,7 @@ mod tests { nodes[0].node.send_payment(route.clone(), payment_hash_1).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -5300,10 +5266,10 @@ mod tests { nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - let events_3 = nodes[1].node.get_and_clear_pending_events(); + let events_3 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); let (update_fulfill_htlc, commitment_signed) = match events_3[0] { - Event::UpdateHTLCs { ref node_id, ref updates } => { + MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(updates.update_add_htlcs.is_empty()); assert!(updates.update_fail_htlcs.is_empty()); @@ -5411,23 +5377,21 @@ mod tests { nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); confirm_transaction(&nodes[0].chain_monitor, &tx, tx.version); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - assert!(announcement_sigs.is_none()); }, _ => panic!("Unexpected event"), } confirm_transaction(&nodes[1].chain_monitor, &tx, tx.version); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); match events_2[0] { - Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); - assert!(announcement_sigs.is_none()); }, _ => panic!("Unexpected event"), } @@ -5461,20 +5425,20 @@ mod tests { nodes[0].node.send_payment(route.clone(), payment_hash_2).unwrap(); check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::UpdateHTLCs { .. } => {}, + MessageSendEvent::UpdateHTLCs { .. } => {}, _ => panic!("Unexpected event"), } assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); match events_2[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); @@ -5593,10 +5557,10 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::BroadcastChannelUpdate { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexpected event"), }; @@ -5619,8 +5583,8 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_1.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); if disconnect { @@ -5633,7 +5597,7 @@ mod tests { nodes[0].node.test_restore_channel_monitor(); check_added_monitors!(nodes[0], 1); - let mut events_2 = nodes[0].node.get_and_clear_pending_events(); + let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let payment_event = SendEvent::from_event(events_2.pop().unwrap()); assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id()); @@ -5660,8 +5624,8 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_4 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_4.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); if disconnect { @@ -5675,10 +5639,10 @@ mod tests { nodes[0].node.test_restore_channel_monitor(); check_added_monitors!(nodes[0], 1); - let events_5 = nodes[0].node.get_and_clear_pending_events(); + let events_5 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_5.len(), 1); match events_5[0] { - Event::BroadcastChannelUpdate { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexpected event"), } @@ -5727,18 +5691,18 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_1.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1] // but nodes[0] won't respond since it is frozen. assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); @@ -5796,8 +5760,8 @@ mod tests { } } } let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 { - let events_4 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_4.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); assert_eq!(reestablish_1.len(), 1); @@ -5860,10 +5824,10 @@ mod tests { (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap()) } else { - let mut events_4 = nodes[0].node.get_and_clear_pending_events(); + let mut events_4 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_4.len(), 2); (SendEvent::from_event(events_4.remove(0)), match events_4[0] { - Event::SendRevokeAndACK { ref node_id, ref msg } => { + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); msg.clone() },