From e397cb99601e9d2849bbc3aad2b0df8bc8b7f522 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 19 Oct 2018 16:25:32 -0400 Subject: [PATCH] Split Event, move MessageSendEvent push() inside channel_state lock --- fuzz/fuzz_targets/full_stack_target.rs | 5 +- src/ln/channelmanager.rs | 620 ++++++++++++------------- src/ln/msgs.rs | 2 +- src/ln/peer_handler.rs | 69 +-- src/util/events.rs | 37 +- src/util/test_utils.rs | 6 +- 6 files changed, 326 insertions(+), 413 deletions(-) diff --git a/fuzz/fuzz_targets/full_stack_target.rs b/fuzz/fuzz_targets/full_stack_target.rs index ed5001a3e..bf6de14ae 100644 --- a/fuzz/fuzz_targets/full_stack_target.rs +++ b/fuzz/fuzz_targets/full_stack_target.rs @@ -460,7 +460,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { _ => return, } loss_detector.handler.process_events(); - for event in loss_detector.handler.get_and_clear_pending_events() { + for event in loss_detector.manager.get_and_clear_pending_events() { match event { Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, output_script, .. } => { pending_funding_generation.push((temporary_channel_id, channel_value_satoshis, output_script)); @@ -473,11 +473,10 @@ pub fn do_test(data: &[u8], logger: &Arc) { }, Event::PaymentSent {..} => {}, Event::PaymentFailed {..} => {}, - Event::PendingHTLCsForwardable {..} => { should_forward = true; }, - _ => panic!("Unknown event"), + Event::SpendableOutputs {..} => {}, } } } diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index c71013f95..bd663c924 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -257,6 +257,9 @@ struct ChannelHolder { /// guarantees are made about the channels given here actually existing anymore by the time you /// go to read them! claimable_htlcs: HashMap<[u8; 32], Vec>, + /// Messages to send to peers - pushed to in the same lock that they are generated in (except + /// for broadcast messages, where ordering isn't as strict). + pending_msg_events: Vec, } struct MutChannelHolder<'a> { by_id: &'a mut HashMap<[u8; 32], Channel>, @@ -264,6 +267,7 @@ struct MutChannelHolder<'a> { next_forward: &'a mut Instant, forward_htlcs: &'a mut HashMap>, claimable_htlcs: &'a mut HashMap<[u8; 32], Vec>, + pending_msg_events: &'a mut Vec, } impl ChannelHolder { fn borrow_parts(&mut self) -> MutChannelHolder { @@ -273,6 +277,7 @@ impl ChannelHolder { next_forward: &mut self.next_forward, forward_htlcs: &mut self.forward_htlcs, claimable_htlcs: &mut self.claimable_htlcs, + pending_msg_events: &mut self.pending_msg_events, } } } @@ -397,6 +402,7 @@ impl ChannelManager { next_forward: Instant::now(), forward_htlcs: HashMap::new(), claimable_htlcs: HashMap::new(), + pending_msg_events: Vec::new(), }), our_network_key: keys_manager.get_node_secret(), @@ -418,7 +424,7 @@ impl ChannelManager { /// create_channel call. Note that user_channel_id defaults to 0 for inbound channels, so you /// may wish to avoid using 0 for user_id here. /// - /// If successful, will generate a SendOpenChannel event, so you should probably poll + /// If successful, will generate a SendOpenChannel message event, so you should probably poll /// PeerManager::process_events afterwards. /// /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat being greater than channel_value_satoshis * 1k @@ -436,9 +442,7 @@ impl ChannelManager { }, hash_map::Entry::Vacant(entry) => { entry.insert(channel); } } - - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::SendOpenChannel { + channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: their_network_key, msg: res, }); @@ -488,25 +492,29 @@ impl ChannelManager { /// will be accepted on the given channel, and after additional timeout/the closing of all /// pending HTLCs, the channel will be closed on chain. /// - /// May generate a SendShutdown event on success, which should be relayed. + /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let (mut res, node_id, chan_option) = { + let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); match channel_state.by_id.entry(channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { - let res = chan_entry.get_mut().get_shutdown()?; + let (shutdown_msg, failed_htlcs) = chan_entry.get_mut().get_shutdown()?; + channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: chan_entry.get().get_their_node_id(), + msg: shutdown_msg + }); if chan_entry.get().is_shutdown() { if let Some(short_id) = chan_entry.get().get_short_channel_id() { channel_state.short_to_id.remove(&short_id); } - (res, chan_entry.get().get_their_node_id(), Some(chan_entry.remove_entry().1)) - } else { (res, chan_entry.get().get_their_node_id(), None) } + (failed_htlcs, Some(chan_entry.remove_entry().1)) + } else { (failed_htlcs, None) } }, hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable{err: "No such channel"}) } }; - for htlc_source in res.1.drain(..) { + for htlc_source in failed_htlcs.drain(..) { // unknown_next_peer...I dunno who that is anymore.... self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 10, data: Vec::new() }); } @@ -516,16 +524,12 @@ impl ChannelManager { } else { None } } else { None }; - let mut events = self.pending_events.lock().unwrap(); if let Some(update) = chan_update { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } - events.push(events::Event::SendShutdown { - node_id, - msg: res.0 - }); Ok(()) } @@ -565,9 +569,9 @@ impl ChannelManager { } }; self.finish_force_close_channel(chan.force_shutdown()); - let mut events = self.pending_events.lock().unwrap(); if let Ok(update) = self.get_channel_update(&chan) { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -594,9 +598,9 @@ impl ChannelManager { }; mem::drop(channel_state_lock); self.finish_force_close_channel(chan.force_shutdown()); - let mut events = self.pending_events.lock().unwrap(); if let Ok(update) = self.get_channel_update(&chan) { - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1096,7 +1100,7 @@ impl ChannelManager { /// payment_preimage tracking (which you should already be doing as they represent "proof of /// payment") and prevent double-sends yourself. /// - /// May generate a SendHTLCs event on success, which should be relayed. + /// May generate a SendHTLCs message event on success, which should be relayed. /// /// Raises APIError::RoutError when invalid route or forward parameter /// (cltv_delta, fee, node public key) is specified @@ -1124,66 +1128,52 @@ impl ChannelManager { let (onion_payloads, htlc_msat, htlc_cltv) = ChannelManager::build_onion_payloads(&route, cur_height)?; let onion_packet = ChannelManager::construct_onion_packet(onion_payloads, onion_keys, &payment_hash); - let (first_hop_node_id, update_add, commitment_signed) = { - let mut channel_state = self.channel_state.lock().unwrap(); - - let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { - None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), - Some(id) => id.clone(), - }; - - let res = { - let res = { - let chan = channel_state.by_id.get_mut(&id).unwrap(); - if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { - return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); - } - if chan.is_awaiting_monitor_update() { - return Err(APIError::MonitorUpdateFailed); - } - if !chan.is_live() { - return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); - } - chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { - route: route.clone(), - session_priv: session_priv.clone(), - first_hop_htlc_msat: htlc_msat, - }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? - }; - match res { - Some((update_add, commitment_signed, chan_monitor)) => { - if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst); - return Err(APIError::MonitorUpdateFailed); - } - Some((update_add, commitment_signed)) - }, - None => None, - } - }; + let mut channel_state = self.channel_state.lock().unwrap(); - let first_hop_node_id = route.hops.first().unwrap().pubkey; + let id = match channel_state.short_to_id.get(&route.hops.first().unwrap().short_channel_id) { + None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}), + Some(id) => id.clone(), + }; - match res { - Some((update_add, commitment_signed)) => { - (first_hop_node_id, update_add, commitment_signed) - }, - None => return Ok(()), + let res = { + let chan = channel_state.by_id.get_mut(&id).unwrap(); + if chan.get_their_node_id() != route.hops.first().unwrap().pubkey { + return Err(APIError::RouteError{err: "Node ID mismatch on first hop!"}); } + if chan.is_awaiting_monitor_update() { + return Err(APIError::MonitorUpdateFailed); + } + if !chan.is_live() { + return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected!"}); + } + chan.send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute { + route: route.clone(), + session_priv: session_priv.clone(), + first_hop_htlc_msat: htlc_msat, + }, onion_packet).map_err(|he| APIError::ChannelUnavailable{err: he.err})? }; + match res { + Some((update_add, commitment_signed, chan_monitor)) => { + if let Err(e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + self.handle_monitor_update_fail(channel_state, &id, e, RAACommitmentOrder::CommitmentFirst); + return Err(APIError::MonitorUpdateFailed); + } - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::UpdateHTLCs { - node_id: first_hop_node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: vec![update_add], - update_fulfill_htlcs: Vec::new(), - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed, + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: route.hops.first().unwrap().pubkey, + updates: msgs::CommitmentUpdate { + update_add_htlcs: vec![update_add], + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + }, + }); }, - }); + None => {}, + } + Ok(()) } @@ -1194,15 +1184,6 @@ impl ChannelManager { /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { - macro_rules! add_pending_event { - ($event: expr) => { - { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push($event); - } - } - } - let (chan, msg, chan_monitor) = { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.remove(temporary_channel_id) { @@ -1213,8 +1194,7 @@ impl ChannelManager { }, Err(e) => { log_error!(self, "Got bad signatures: {}!", e.err); - mem::drop(channel_state); - add_pending_event!(events::Event::HandleError { + channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.get_their_node_id(), action: e.action, }); @@ -1230,12 +1210,12 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } - add_pending_event!(events::Event::SendFundingCreated { + + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { node_id: chan.get_their_node_id(), msg: msg, }); - - let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(chan.channel_id()) { hash_map::Entry::Occupied(_) => { panic!("Generated duplicate funding txid?"); @@ -1344,7 +1324,7 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor) { unimplemented!();// but def dont push the event... } - new_events.push(events::Event::UpdateHTLCs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: forward_chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: add_htlc_msgs, @@ -1407,19 +1387,20 @@ impl ChannelManager { /// to fail and take the channel_state lock for each iteration (as we take ownership and may /// drop it). In other words, no assumptions are made that entries in claimable_htlcs point to /// still-available channels. - fn fail_htlc_backwards_internal(&self, mut channel_state: MutexGuard, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) { + fn fail_htlc_backwards_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_hash: &[u8; 32], onion_error: HTLCFailReason) { match source { HTLCSource::OutboundRoute { .. } => { - mem::drop(channel_state); + mem::drop(channel_state_lock); if let &HTLCFailReason::ErrorPacket { ref err } = &onion_error { let (channel_update, payment_retryable) = self.process_onion_failure(&source, err.data.clone()); - let mut pending_events = self.pending_events.lock().unwrap(); - if let Some(channel_update) = channel_update { - pending_events.push(events::Event::PaymentFailureNetworkUpdate { - update: channel_update, - }); + if let Some(update) = channel_update { + self.channel_state.lock().unwrap().pending_msg_events.push( + events::MessageSendEvent::PaymentFailureNetworkUpdate { + update, + } + ); } - pending_events.push(events::Event::PaymentFailed { + self.pending_events.lock().unwrap().push(events::Event::PaymentFailed { payment_hash: payment_hash.clone(), rejected_by_dest: !payment_retryable, }); @@ -1438,35 +1419,21 @@ impl ChannelManager { } }; - let (node_id, fail_msgs) = { - let chan_id = match channel_state.short_to_id.get(&short_channel_id) { - Some(chan_id) => chan_id.clone(), - None => return - }; + let channel_state = channel_state_lock.borrow_parts(); - let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { - Ok(Some((msg, commitment_msg, chan_monitor))) => { - if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { - unimplemented!(); - } - (chan.get_their_node_id(), Some((msg, commitment_msg))) - }, - Ok(None) => (chan.get_their_node_id(), None), - Err(_e) => { - //TODO: Do something with e? - return; - }, - } + let chan_id = match channel_state.short_to_id.get(&short_channel_id) { + Some(chan_id) => chan_id.clone(), + None => return }; - match fail_msgs { - Some((msg, commitment_msg)) => { - mem::drop(channel_state); - - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { - node_id, + let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); + match chan.get_update_fail_htlc_and_commit(htlc_id, err_packet) { + Ok(Some((msg, commitment_msg, chan_monitor))) => { + if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { + unimplemented!(); + } + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: Vec::new(), @@ -1477,7 +1444,11 @@ impl ChannelManager { }, }); }, - None => {}, + Ok(None) => {}, + Err(_e) => { + //TODO: Do something with e? + return; + }, } }, } @@ -1504,10 +1475,10 @@ impl ChannelManager { true } else { false } } - fn claim_funds_internal(&self, mut channel_state: MutexGuard, source: HTLCSource, payment_preimage: [u8; 32]) { + fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard, source: HTLCSource, payment_preimage: [u8; 32]) { match source { HTLCSource::OutboundRoute { .. } => { - mem::drop(channel_state); + mem::drop(channel_state_lock); let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push(events::Event::PaymentSent { payment_preimage @@ -1515,49 +1486,46 @@ impl ChannelManager { }, HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id, htlc_id, .. }) => { //TODO: Delay the claimed_funds relaying just like we do outbound relay! - let (node_id, fulfill_msgs) = { - let chan_id = match channel_state.short_to_id.get(&short_channel_id) { - Some(chan_id) => chan_id.clone(), - None => { - // TODO: There is probably a channel manager somewhere that needs to - // learn the preimage as the channel already hit the chain and that's - // why its missing. - return - } - }; + let channel_state = channel_state_lock.borrow_parts(); + + let chan_id = match channel_state.short_to_id.get(&short_channel_id) { + Some(chan_id) => chan_id.clone(), + None => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel already hit the chain and that's + // why its missing. + return + } + }; - let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { - Ok((msgs, Some(chan_monitor))) => { + let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); + match chan.get_update_fulfill_htlc_and_commit(htlc_id, payment_preimage) { + Ok((msgs, monitor_option)) => { + if let Some(chan_monitor) = monitor_option { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!();// but def dont push the event... } - (chan.get_their_node_id(), msgs) - }, - Ok((msgs, None)) => (chan.get_their_node_id(), msgs), - Err(_e) => { - // TODO: There is probably a channel manager somewhere that needs to - // learn the preimage as the channel may be about to hit the chain. - //TODO: Do something with e? - return - }, - } - }; - - mem::drop(channel_state); - if let Some((msg, commitment_msg)) = fulfill_msgs { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { - node_id: node_id, - updates: msgs::CommitmentUpdate { - update_add_htlcs: Vec::new(), - update_fulfill_htlcs: vec![msg], - update_fail_htlcs: Vec::new(), - update_fail_malformed_htlcs: Vec::new(), - update_fee: None, - commitment_signed: commitment_msg, } - }); + if let Some((msg, commitment_signed)) = msgs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: chan.get_their_node_id(), + updates: msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: vec![msg], + update_fail_htlcs: Vec::new(), + update_fail_malformed_htlcs: Vec::new(), + update_fee: None, + commitment_signed, + } + }); + } + }, + Err(_e) => { + // TODO: There is probably a channel manager somewhere that needs to + // learn the preimage as the channel may be about to hit the chain. + //TODO: Do something with e? + return + }, } }, } @@ -1572,7 +1540,6 @@ impl ChannelManager { /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update /// operation. pub fn test_restore_channel_monitor(&self) { - let mut new_events = Vec::new(); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); let mut htlc_failures = Vec::new(); @@ -1581,6 +1548,7 @@ impl ChannelManager { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { if channel.is_awaiting_monitor_update() { let chan_monitor = channel.channel_monitor(); @@ -1592,7 +1560,7 @@ impl ChannelManager { } close_results.push(channel.force_shutdown()); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1609,7 +1577,7 @@ impl ChannelManager { macro_rules! handle_cs { () => { if let Some(update) = commitment_update { - new_events.push(events::Event::UpdateHTLCs { + pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: channel.get_their_node_id(), updates: update, }); @@ -1617,7 +1585,7 @@ impl ChannelManager { } } macro_rules! handle_raa { () => { if let Some(revoke_and_ack) = raa { - new_events.push(events::Event::SendRevokeAndACK { + pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { node_id: channel.get_their_node_id(), msg: revoke_and_ack, }); @@ -1647,8 +1615,6 @@ impl ChannelManager { for res in close_results.drain(..) { self.finish_force_close_channel(res); } - - self.pending_events.lock().unwrap().append(&mut new_events); } fn internal_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result { @@ -1802,8 +1768,8 @@ impl ChannelManager { } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update(&chan) { - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1842,8 +1808,8 @@ impl ChannelManager { } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update(&chan) { - let mut events = self.pending_events.lock().unwrap(); - events.push(events::Event::BroadcastChannelUpdate { + let mut channel_state = self.channel_state.lock().unwrap(); + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2223,42 +2189,43 @@ impl ChannelManager { } fn internal_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { - let (chan_announcement, chan_update) = { - let mut channel_state = self.channel_state.lock().unwrap(); - match channel_state.by_id.get_mut(&msg.channel_id) { - Some(chan) => { - if chan.get_their_node_id() != *their_node_id { - return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); - } - if !chan.is_usable() { - return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)})); - } + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); - let our_node_id = self.get_our_node_id(); - let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()) - .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; + match channel_state.by_id.get_mut(&msg.channel_id) { + Some(chan) => { + if chan.get_their_node_id() != *their_node_id { + return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!", msg.channel_id)); + } + if !chan.is_usable() { + return Err(MsgHandleErrInternal::from_no_close(HandleError{err: "Got an announcement_signatures before we were ready for it", action: Some(msgs::ErrorAction::IgnoreError)})); + } - let were_node_one = announcement.node_id_1 == our_node_id; - let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap(); - let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id); - secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action); - secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action); + let our_node_id = self.get_our_node_id(); + let (announcement, our_bitcoin_sig) = chan.get_channel_announcement(our_node_id.clone(), self.genesis_hash.clone()) + .map_err(|e| MsgHandleErrInternal::from_chan_maybe_close(e, msg.channel_id))?; - let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); + let were_node_one = announcement.node_id_1 == our_node_id; + let msghash = Message::from_slice(&Sha256dHash::from_data(&announcement.encode()[..])[..]).unwrap(); + let bad_sig_action = MsgHandleErrInternal::send_err_msg_close_chan("Bad announcement_signatures node_signature", msg.channel_id); + secp_call!(self.secp_ctx.verify(&msghash, &msg.node_signature, if were_node_one { &announcement.node_id_2 } else { &announcement.node_id_1 }), bad_sig_action); + secp_call!(self.secp_ctx.verify(&msghash, &msg.bitcoin_signature, if were_node_one { &announcement.bitcoin_key_2 } else { &announcement.bitcoin_key_1 }), bad_sig_action); - (msgs::ChannelAnnouncement { + let our_node_sig = self.secp_ctx.sign(&msghash, &self.our_network_key); + + channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + msg: msgs::ChannelAnnouncement { node_signature_1: if were_node_one { our_node_sig } else { msg.node_signature }, node_signature_2: if were_node_one { msg.node_signature } else { our_node_sig }, bitcoin_signature_1: if were_node_one { our_bitcoin_sig } else { msg.bitcoin_signature }, bitcoin_signature_2: if were_node_one { msg.bitcoin_signature } else { our_bitcoin_sig }, contents: announcement, - }, self.get_channel_update(chan).unwrap()) // can only fail if we're not in a ready state - }, - None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) - } - }; - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::BroadcastChannelAnnouncement { msg: chan_announcement, update_msg: chan_update }); + }, + update_msg: self.get_channel_update(chan).unwrap(), // can only fail if we're not in a ready state + }); + }, + None => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel", msg.channel_id)) + } Ok(()) } @@ -2292,7 +2259,9 @@ impl ChannelManager { /// Note: This API is likely to change! #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> { - let mut channel_state = self.channel_state.lock().unwrap(); + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = channel_state_lock.borrow_parts(); + match channel_state.by_id.get_mut(&channel_id) { None => return Err(APIError::APIMisuseError{err: "Failed to find corresponding channel"}), Some(chan) => { @@ -2309,8 +2278,7 @@ impl ChannelManager { if let Err(_e) = self.monitor.add_update_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) { unimplemented!(); } - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::UpdateHTLCs { + channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: chan.get_their_node_id(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), @@ -2328,10 +2296,19 @@ impl ChannelManager { } } +impl events::MessageSendEventsProvider for ChannelManager { + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut ret = Vec::new(); + let mut channel_state = self.channel_state.lock().unwrap(); + mem::swap(&mut ret, &mut channel_state.pending_msg_events); + ret + } +} + impl events::EventsProvider for ChannelManager { fn get_and_clear_pending_events(&self) -> Vec { - let mut pending_events = self.pending_events.lock().unwrap(); let mut ret = Vec::new(); + let mut pending_events = self.pending_events.lock().unwrap(); mem::swap(&mut ret, &mut *pending_events); ret } @@ -2339,24 +2316,24 @@ impl events::EventsProvider for ChannelManager { impl ChainListener for ChannelManager { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, channel| { let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched); if let Ok(Some(funding_locked)) = chan_res { let announcement_sigs = self.get_announcement_sigs(channel); - new_events.push(events::Event::SendFundingLocked { + pending_msg_events.push(events::MessageSendEvent::SendFundingLocked { node_id: channel.get_their_node_id(), msg: funding_locked, announcement_sigs: announcement_sigs }); short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id()); } else if let Err(e) = chan_res { - new_events.push(events::Event::HandleError { + pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: channel.get_their_node_id(), action: e.action, }); @@ -2376,7 +2353,7 @@ impl ChainListener for ChannelManager { // some kind of SPV attack, though we expect these to be dropped. failed_channels.push(channel.force_shutdown()); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2395,7 +2372,7 @@ impl ChainListener for ChannelManager { // hurt anything, but does make tests a bit simpler). failed_channels.last_mut().unwrap().0 = Vec::new(); if let Ok(update) = self.get_channel_update(&channel) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2407,21 +2384,17 @@ impl ChainListener for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - let mut pending_events = self.pending_events.lock().unwrap(); - for funding_locked in new_events.drain(..) { - pending_events.push(funding_locked); - } self.latest_block_height.store(height as usize, Ordering::Release); } /// We force-close the channel without letting our counterparty participate in the shutdown fn block_disconnected(&self, header: &BlockHeader) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = channel_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; channel_state.by_id.retain(|_, v| { if v.block_disconnected(header) { if let Some(short_id) = v.get_short_channel_id() { @@ -2429,7 +2402,7 @@ impl ChainListener for ChannelManager { } failed_channels.push(v.force_shutdown()); if let Ok(update) = self.get_channel_update(&v) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2442,12 +2415,6 @@ impl ChainListener for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - if !new_events.is_empty() { - let mut pending_events = self.pending_events.lock().unwrap(); - for funding_locked in new_events.drain(..) { - pending_events.push(funding_locked); - } - } self.latest_block_height.fetch_sub(1, Ordering::AcqRel); } } @@ -2551,13 +2518,13 @@ impl ChannelMessageHandler for ChannelManager { } fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) { - let mut new_events = Vec::new(); let mut failed_channels = Vec::new(); let mut failed_payments = Vec::new(); { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = channel_state_lock.borrow_parts(); let short_to_id = channel_state.short_to_id; + let pending_msg_events = channel_state.pending_msg_events; if no_connection_possible { channel_state.by_id.retain(|_, chan| { if chan.get_their_node_id() == *their_node_id { @@ -2566,7 +2533,7 @@ impl ChannelMessageHandler for ChannelManager { } failed_channels.push(chan.force_shutdown()); if let Ok(update) = self.get_channel_update(&chan) { - new_events.push(events::Event::BroadcastChannelUpdate { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2598,12 +2565,6 @@ impl ChannelMessageHandler for ChannelManager { for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } - if !new_events.is_empty() { - let mut pending_events = self.pending_events.lock().unwrap(); - for event in new_events.drain(..) { - pending_events.push(event); - } - } for (chan_update, mut htlc_sources) in failed_payments { for (htlc_source, payment_hash) in htlc_sources.drain(..) { self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() }); @@ -2658,7 +2619,7 @@ mod tests { use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; use util::test_utils; - use util::events::{Event, EventsProvider}; + use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::logger::Logger; use util::ser::Writeable; @@ -2863,6 +2824,7 @@ mod tests { fn drop(&mut self) { if !::std::thread::panicking() { // Check that we processed all pending events + assert_eq!(self.node.get_and_clear_pending_msg_events().len(), 0); assert_eq!(self.node.get_and_clear_pending_events().len(), 0); assert_eq!(self.chan_monitor.added_monitors.lock().unwrap().len(), 0); } @@ -2882,10 +2844,10 @@ mod tests { fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, channel_value: u64, push_msat: u64) -> Transaction { node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42).unwrap(); - let events_1 = node_a.node.get_and_clear_pending_events(); + let events_1 = node_a.node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); let accept_chan = match events_1[0] { - Event::SendOpenChannel { ref node_id, ref msg } => { + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { assert_eq!(*node_id, node_b.node.get_our_node_id()); node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), msg).unwrap() }, @@ -2919,10 +2881,10 @@ mod tests { _ => panic!("Unexpected event"), } - let events_3 = node_a.node.get_and_clear_pending_events(); + let events_3 = node_a.node.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); let funding_signed = match events_3[0] { - Event::SendFundingCreated { ref node_id, ref msg } => { + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { assert_eq!(*node_id, node_b.node.get_our_node_id()); let res = node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), msg).unwrap(); let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap(); @@ -2957,10 +2919,10 @@ mod tests { fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) { confirm_transaction(&node_b.chain_monitor, &tx, tx.version); - let events_5 = node_b.node.get_and_clear_pending_events(); + let events_5 = node_b.node.get_and_clear_pending_msg_events(); assert_eq!(events_5.len(), 1); match events_5[0] { - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { assert_eq!(*node_id, node_a.node.get_our_node_id()); assert!(announcement_sigs.is_none()); node_a.node.handle_funding_locked(&node_b.node.get_our_node_id(), msg).unwrap() @@ -2971,10 +2933,10 @@ mod tests { let channel_id; confirm_transaction(&node_a.chain_monitor, &tx, tx.version); - let events_6 = node_a.node.get_and_clear_pending_events(); + let events_6 = node_a.node.get_and_clear_pending_msg_events(); assert_eq!(events_6.len(), 1); (match events_6[0] { - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { channel_id = msg.channel_id.clone(); assert_eq!(*node_id, node_b.node.get_our_node_id()); (msg.clone(), announcement_sigs.clone().unwrap()) @@ -2996,20 +2958,20 @@ mod tests { bs_announcement_sigs }; - let events_7 = node_b.node.get_and_clear_pending_events(); + let events_7 = node_b.node.get_and_clear_pending_msg_events(); assert_eq!(events_7.len(), 1); let (announcement, bs_update) = match events_7[0] { - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { (msg, update_msg) }, _ => panic!("Unexpected event"), }; node_a.node.handle_announcement_signatures(&node_b.node.get_our_node_id(), &bs_announcement_sigs).unwrap(); - let events_8 = node_a.node.get_and_clear_pending_events(); + let events_8 = node_a.node.get_and_clear_pending_msg_events(); assert_eq!(events_8.len(), 1); let as_update = match events_8[0] { - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(*announcement == *msg); update_msg }, @@ -3052,10 +3014,10 @@ mod tests { let (tx_a, tx_b); node_a.close_channel(channel_id).unwrap(); - let events_1 = node_a.get_and_clear_pending_events(); + let events_1 = node_a.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); let shutdown_a = match events_1[0] { - Event::SendShutdown { ref node_id, ref msg } => { + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { assert_eq!(node_id, &node_b.get_our_node_id()); msg.clone() }, @@ -3091,19 +3053,19 @@ mod tests { assert_eq!(tx_a, tx_b); check_spends!(tx_a, funding_tx); - let events_2 = node_a.get_and_clear_pending_events(); + let events_2 = node_a.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let as_update = match events_2[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - let events_3 = node_b.get_and_clear_pending_events(); + let events_3 = node_b.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); let bs_update = match events_3[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), @@ -3126,9 +3088,9 @@ mod tests { SendEvent { node_id: node_id, msgs: updates.update_add_htlcs, commitment_msg: updates.commitment_signed } } - fn from_event(event: Event) -> SendEvent { + fn from_event(event: MessageSendEvent) -> SendEvent { match event { - Event::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates), + MessageSendEvent::UpdateHTLCs { node_id, updates } => SendEvent::from_commitment_update(node_id, updates), _ => panic!("Unexpected event type!"), } } @@ -3158,6 +3120,7 @@ mod tests { check_added_monitors!($node_b, 1); if $fail_backwards { assert!($node_a.node.get_and_clear_pending_events().is_empty()); + assert!($node_a.node.get_and_clear_pending_msg_events().is_empty()); } assert!($node_a.node.handle_revoke_and_ack(&$node_b.node.get_our_node_id(), &bs_revoke_and_ack).unwrap().is_none()); { @@ -3195,7 +3158,7 @@ mod tests { origin_node.node.send_payment(route, our_payment_hash).unwrap(); check_added_monitors!(origin_node, 1); - let mut events = origin_node.node.get_and_clear_pending_events(); + let mut events = origin_node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -3218,9 +3181,9 @@ mod tests { node.node.channel_state.lock().unwrap().next_forward = Instant::now(); node.node.process_pending_htlc_forwards(); - let mut events_2 = node.node.get_and_clear_pending_events(); - assert_eq!(events_2.len(), 1); if idx == expected_route.len() - 1 { + let events_2 = node.node.get_and_clear_pending_events(); + assert_eq!(events_2.len(), 1); match events_2[0] { Event::PaymentReceived { ref payment_hash, amt } => { assert_eq!(our_payment_hash, *payment_hash); @@ -3229,6 +3192,8 @@ mod tests { _ => panic!("Unexpected event"), } } else { + let mut events_2 = node.node.get_and_clear_pending_msg_events(); + assert_eq!(events_2.len(), 1); check_added_monitors!(node, 1); payment_event = SendEvent::from_event(events_2.remove(0)); assert_eq!(payment_event.msgs.len(), 1); @@ -3267,11 +3232,11 @@ mod tests { update_fulfill_dance!(node, prev_node, false); } - let events = node.node.get_and_clear_pending_events(); + let events = node.node.get_and_clear_pending_msg_events(); if !skip_last || idx != expected_route.len() - 1 { assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); assert!(update_fail_htlcs.is_empty()); @@ -3367,11 +3332,11 @@ mod tests { update_fail_dance!(node, prev_node, skip_last && idx == expected_route.len() - 1); } - let events = node.node.get_and_clear_pending_events(); + let events = node.node.get_and_clear_pending_msg_events(); if !skip_last || idx != expected_route.len() - 1 { assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert!(update_add_htlcs.is_empty()); assert!(update_fulfill_htlcs.is_empty()); assert_eq!(update_fail_htlcs.len(), 1); @@ -3477,10 +3442,10 @@ mod tests { nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3494,7 +3459,7 @@ mod tests { check_added_monitors!(nodes[1], 1); let payment_event = { - let mut events_1 = nodes[1].node.get_and_clear_pending_events(); + let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); SendEvent::from_event(events_1.remove(0)) }; @@ -3573,10 +3538,10 @@ mod tests { nodes[0].node.update_fee(channel_id, get_feerate!(nodes[0]) + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let update_msg = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, .. }, .. } => { update_fee.as_ref() }, _ => panic!("Unexpected event"), @@ -3590,7 +3555,7 @@ mod tests { check_added_monitors!(nodes[1], 1); let payment_event = { - let mut events_1 = nodes[1].node.get_and_clear_pending_events(); + let mut events_1 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); SendEvent::from_event(events_1.remove(0)) }; @@ -3647,10 +3612,10 @@ mod tests { nodes[0].node.update_fee(channel_id, initial_feerate + 20).unwrap(); check_added_monitors!(nodes[0], 1); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg_1, commitment_signed_1) = match events_0[0] { // (1) - Event::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { + MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => { (update_fee.as_ref().unwrap(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3665,6 +3630,7 @@ mod tests { // transaction: nodes[0].node.update_fee(channel_id, initial_feerate + 40).unwrap(); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // Create the (3) update_fee message that nodes[0] will generate before it does... let mut update_msg_2 = msgs::UpdateFee { @@ -3729,10 +3695,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3777,10 +3743,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3802,8 +3768,8 @@ mod tests { assert_eq!(added_monitors.len(), 0); added_monitors.clear(); } - let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 0); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // node[1] has nothing to do let resp_option = nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &revoke_msg).unwrap(); @@ -3894,10 +3860,10 @@ mod tests { let feerate = get_feerate!(nodes[0]); nodes[0].node.update_fee(channel_id, feerate+20).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -3917,10 +3883,10 @@ mod tests { // Create and deliver (4)... nodes[0].node.update_fee(channel_id, feerate+30).unwrap(); - let events_0 = nodes[0].node.get_and_clear_pending_events(); + let events_0 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_0.len(), 1); let (update_msg, commitment_signed) = match events_0[0] { - Event::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { node_id:_, updates: msgs::CommitmentUpdate { update_add_htlcs:_, update_fulfill_htlcs:_, update_fail_htlcs:_, update_fail_malformed_htlcs:_, ref update_fee, ref commitment_signed } } => { (update_fee.as_ref(), commitment_signed) }, _ => panic!("Unexpected event"), @@ -4191,19 +4157,19 @@ mod tests { } fn get_announce_close_broadcast_events(nodes: &Vec, a: usize, b: usize) { - let events_1 = nodes[a].node.get_and_clear_pending_events(); + let events_1 = nodes[a].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); let as_update = match events_1[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - let events_2 = nodes[b].node.get_and_clear_pending_events(); + let events_2 = nodes[b].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let bs_update = match events_2[0] { - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), @@ -4262,7 +4228,7 @@ mod tests { macro_rules! expect_forward { ($node: expr) => {{ - let mut events = $node.node.get_and_clear_pending_events(); + let mut events = $node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); check_added_monitors!($node, 1); let payment_event = SendEvent::from_event(events.remove(0)); @@ -4345,7 +4311,7 @@ mod tests { nodes[0].node.send_payment(route_1, our_payment_hash_1).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -4421,8 +4387,8 @@ mod tests { // this will also stuck in the holding cell nodes[0].node.send_payment(route_22, our_payment_hash_22).unwrap(); check_added_monitors!(nodes[0], 0); - let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 0); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); // flush the pending htlc let (as_revoke_and_ack, as_commitment_signed) = nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event_1.commitment_msg).unwrap(); @@ -4541,10 +4507,10 @@ mod tests { assert!($node.node.claim_funds($preimage)); check_added_monitors!($node, 1); - let events = $node.node.get_and_clear_pending_events(); + let events = $node.node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fail_htlcs, .. } } => { assert!(update_add_htlcs.is_empty()); assert!(update_fail_htlcs.is_empty()); assert_eq!(*node_id, $prev_node.node.get_our_node_id()); @@ -4835,10 +4801,10 @@ mod tests { route_payment(&nodes[0], &[&nodes[1]], 10000000); nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id); { - let events = nodes[0].node.get_and_clear_pending_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4852,10 +4818,10 @@ mod tests { nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]); { - let events = nodes[1].node.get_and_clear_pending_events(); + let events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4882,7 +4848,7 @@ mod tests { nodes[0].node.send_payment(route, our_payment_hash).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -4900,7 +4866,7 @@ mod tests { nodes[1].node.channel_state.lock().unwrap().next_forward = Instant::now(); nodes[1].node.process_pending_htlc_forwards(); - let mut events_2 = nodes[1].node.get_and_clear_pending_events(); + let mut events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); payment_event = SendEvent::from_event(events_2.remove(0)); assert_eq!(payment_event.msgs.len(), 1); @@ -4915,10 +4881,10 @@ mod tests { // transaction and ensure nodes[1] doesn't fail-backwards (this was originally a bug!). nodes[2].node.force_close_channel(&payment_event.commitment_msg.channel_id); - let events_3 = nodes[2].node.get_and_clear_pending_events(); + let events_3 = nodes[2].node.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); match events_3[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4936,11 +4902,11 @@ mod tests { let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }; nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]); - let events_4 = nodes[1].node.get_and_clear_pending_events(); + let events_4 = nodes[1].node.get_and_clear_pending_msg_events(); // Note no UpdateHTLCs event here from nodes[1] to nodes[0]! assert_eq!(events_4.len(), 1); match events_4[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -4985,10 +4951,10 @@ mod tests { nodes[0].node.block_disconnected(&headers.pop().unwrap()); } { - let events = nodes[0].node.get_and_clear_pending_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); match events[0] { - Event::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { + MessageSendEvent::BroadcastChannelUpdate { msg: msgs::ChannelUpdate { contents: msgs::UnsignedChannelUpdate { flags, .. }, .. } } => { assert_eq!(flags & 0b10, 0b10); }, _ => panic!("Unexpected event"), @@ -5196,7 +5162,7 @@ mod tests { nodes[0].node.send_payment(route.clone(), payment_hash_1).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_events(); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); SendEvent::from_event(events.remove(0)) }; @@ -5273,10 +5239,10 @@ mod tests { nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - let events_3 = nodes[1].node.get_and_clear_pending_events(); + let events_3 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_3.len(), 1); let (update_fulfill_htlc, commitment_signed) = match events_3[0] { - Event::UpdateHTLCs { ref node_id, ref updates } => { + MessageSendEvent::UpdateHTLCs { ref node_id, ref updates } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(updates.update_add_htlcs.is_empty()); assert!(updates.update_fail_htlcs.is_empty()); @@ -5384,10 +5350,10 @@ mod tests { nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false); confirm_transaction(&nodes[0].chain_monitor, &tx, tx.version); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); assert!(announcement_sigs.is_none()); }, @@ -5395,10 +5361,10 @@ mod tests { } confirm_transaction(&nodes[1].chain_monitor, &tx, tx.version); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); match events_2[0] { - Event::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, msg: _, ref announcement_sigs } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(announcement_sigs.is_none()); }, @@ -5434,20 +5400,20 @@ mod tests { nodes[0].node.send_payment(route.clone(), payment_hash_2).unwrap(); check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::UpdateHTLCs { .. } => {}, + MessageSendEvent::UpdateHTLCs { .. } => {}, _ => panic!("Unexpected event"), } assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); match events_2[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); @@ -5566,10 +5532,10 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_1) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); + let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 1); match events_1[0] { - Event::BroadcastChannelUpdate { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexpected event"), }; @@ -5592,8 +5558,8 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_1) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_1.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); if disconnect { @@ -5606,7 +5572,7 @@ mod tests { nodes[0].node.test_restore_channel_monitor(); check_added_monitors!(nodes[0], 1); - let mut events_2 = nodes[0].node.get_and_clear_pending_events(); + let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let payment_event = SendEvent::from_event(events_2.pop().unwrap()); assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id()); @@ -5633,8 +5599,8 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route, payment_hash_2) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_4 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_4.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); if disconnect { @@ -5648,10 +5614,10 @@ mod tests { nodes[0].node.test_restore_channel_monitor(); check_added_monitors!(nodes[0], 1); - let events_5 = nodes[0].node.get_and_clear_pending_events(); + let events_5 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_5.len(), 1); match events_5[0] { - Event::BroadcastChannelUpdate { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexpected event"), } @@ -5700,18 +5666,18 @@ mod tests { if let Err(APIError::MonitorUpdateFailed) = nodes[0].node.send_payment(route.clone(), payment_hash_2) {} else { panic!(); } check_added_monitors!(nodes[0], 1); - let events_1 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_1.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert_eq!(nodes[0].node.list_channels().len(), 1); // Claim the previous payment, which will result in a update_fulfill_htlc/CS from nodes[1] // but nodes[0] won't respond since it is frozen. assert!(nodes[1].node.claim_funds(payment_preimage_1)); check_added_monitors!(nodes[1], 1); - let events_2 = nodes[1].node.get_and_clear_pending_events(); + let events_2 = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), 1); let (bs_initial_fulfill, bs_initial_commitment_signed) = match events_2[0] { - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { assert_eq!(*node_id, nodes[0].node.get_our_node_id()); assert!(update_add_htlcs.is_empty()); assert_eq!(update_fulfill_htlcs.len(), 1); @@ -5769,8 +5735,8 @@ mod tests { } } } let (payment_event, initial_revoke_and_ack) = if disconnect_count & !disconnect_flags > 0 { - let events_4 = nodes[0].node.get_and_clear_pending_events(); - assert!(events_4.is_empty()); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id()); assert_eq!(reestablish_1.len(), 1); @@ -5833,10 +5799,10 @@ mod tests { (SendEvent::from_commitment_update(nodes[1].node.get_our_node_id(), as_resp.2.unwrap()), as_resp.1.unwrap()) } else { - let mut events_4 = nodes[0].node.get_and_clear_pending_events(); + let mut events_4 = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events_4.len(), 2); (SendEvent::from_event(events_4.remove(0)), match events_4[0] { - Event::SendRevokeAndACK { ref node_id, ref msg } => { + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); msg.clone() }, diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index b5db51cf2..29ff1a876 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -521,7 +521,7 @@ pub enum RAACommitmentOrder { /// /// Messages MAY be called in parallel when they originate from different their_node_ids, however /// they MUST NOT be called in parallel when the two calls have the same their_node_id. -pub trait ChannelMessageHandler : events::EventsProvider + Send + Sync { +pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Sync { //Channel init: /// Handle an incoming open_channel message from the given peer. fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &OpenChannel) -> Result; diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 82784efe5..631e48aa1 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -12,13 +12,13 @@ use ln::msgs; use util::ser::{Writeable, Writer, Readable}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use util::byte_utils; -use util::events::{EventsProvider,Event}; +use util::events::{MessageSendEvent}; use util::logger::Logger; use std::collections::{HashMap,hash_map,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{cmp,error,mem,hash,fmt}; +use std::{cmp,error,hash,fmt}; /// Provides references to trait impls which handle different types of messages. pub struct MessageHandler { @@ -127,7 +127,6 @@ impl PeerHolder { pub struct PeerManager { message_handler: MessageHandler, peers: Mutex>, - pending_events: Mutex>, our_node_secret: SecretKey, initial_syncs_sent: AtomicUsize, logger: Arc, @@ -164,7 +163,6 @@ impl PeerManager { PeerManager { message_handler: message_handler, peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }), - pending_events: Mutex::new(Vec::new()), our_node_secret: our_node_secret, initial_syncs_sent: AtomicUsize::new(0), logger, @@ -757,13 +755,12 @@ impl PeerManager { /// Checks for any events generated by our handlers and processes them. May be needed after eg /// calls to ChannelManager::process_pending_htlc_forward. pub fn process_events(&self) { - let mut upstream_events = Vec::new(); { // TODO: There are some DoS attacks here where you can flood someone's outbound send // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); let mut peers = self.peers.lock().unwrap(); for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { @@ -790,15 +787,7 @@ impl PeerManager { } } match event { - Event::FundingGenerationReady {..} => { /* Hand upstream */ }, - Event::FundingBroadcastSafe {..} => { /* Hand upstream */ }, - Event::PaymentReceived {..} => { /* Hand upstream */ }, - Event::PaymentSent {..} => { /* Hand upstream */ }, - Event::PaymentFailed {..} => { /* Hand upstream */ }, - Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ }, - Event::SpendableOutputs { .. } => { /* Hand upstream */ }, - - Event::SendOpenChannel { ref node_id, ref msg } => { + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); @@ -807,9 +796,8 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendFundingCreated { ref node_id, ref msg } => { + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id), @@ -820,9 +808,8 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { + MessageSendEvent::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}", log_pubkey!(node_id), if announcement_sigs.is_some() { " with announcement sigs" } else { "" }, @@ -836,9 +823,8 @@ impl PeerManager { &None => {}, } Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", log_pubkey!(node_id), update_add_htlcs.len(), @@ -865,9 +851,8 @@ impl PeerManager { } peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendRevokeAndACK { ref node_id, ref msg } => { + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); @@ -876,9 +861,8 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendShutdown { ref node_id, ref msg } => { + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); @@ -887,9 +871,8 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { let encoded_msg = encode_msg!(msg, 256); @@ -912,9 +895,8 @@ impl PeerManager { Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); } } - continue; }, - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { let encoded_msg = encode_msg!(msg, 258); @@ -927,13 +909,11 @@ impl PeerManager { Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); } } - continue; }, - Event::PaymentFailureNetworkUpdate { ref update } => { + MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => { self.message_handler.route_handler.handle_htlc_fail_channel_update(update); - continue; }, - Event::HandleError { ref node_id, ref action } => { + MessageSendEvent::HandleError { ref node_id, ref action } => { if let Some(ref action) = *action { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { @@ -955,9 +935,7 @@ impl PeerManager { self.message_handler.chan_handler.peer_disconnected(&node_id, false); } }, - msgs::ErrorAction::IgnoreError => { - continue; - }, + msgs::ErrorAction::IgnoreError => {}, msgs::ErrorAction::SendErrorMessage { ref msg } => { log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), @@ -972,18 +950,10 @@ impl PeerManager { } else { log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id)); } - continue; } } - - upstream_events.push(event); } } - - let mut pending_events = self.pending_events.lock().unwrap(); - for event in upstream_events.drain(..) { - pending_events.push(event); - } } /// Indicates that the given socket descriptor's connection is now closed. @@ -1014,15 +984,6 @@ impl PeerManager { } } -impl EventsProvider for PeerManager { - fn get_and_clear_pending_events(&self) -> Vec { - let mut pending_events = self.pending_events.lock().unwrap(); - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut *pending_events); - ret - } -} - #[cfg(test)] mod tests { use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; @@ -1094,7 +1055,7 @@ mod tests { let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); let chan_handler = test_utils::TestChannelMessageHandler::new(); - chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError { + chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }), }); diff --git a/src/util/events.rs b/src/util/events.rs index 3a077a4b5..261ee57cc 100644 --- a/src/util/events.rs +++ b/src/util/events.rs @@ -24,7 +24,6 @@ use std::time::Instant; /// An Event which you should probably take some action in response to. pub enum Event { - // Events a user will probably have to handle /// Used to indicate that the client should generate a funding transaction with the given /// parameters and then call ChannelManager::funding_transaction_generated. /// Generated in ChannelManager message handling. @@ -97,13 +96,14 @@ pub enum Event { /// The outputs which you should store as spendable by you. outputs: Vec, }, +} - // Events indicating the network loop should send a message to a peer: - // TODO: Move these into a separate struct and make a top-level enum +/// An event generated by ChannelManager which indicates a message should be sent to a peer (or +/// broadcast to most peers). +/// These events are handled by PeerManager::process_events if you are using a PeerManager. +pub enum MessageSendEvent { /// Used to indicate that we've initialted a channel open and should send the open_channel /// message provided to the given peer. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. SendOpenChannel { /// The node_id of the node which should receive this message node_id: PublicKey, @@ -111,8 +111,6 @@ pub enum Event { msg: msgs::OpenChannel, }, /// Used to indicate that a funding_created message should be sent to the peer with the given node_id. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. SendFundingCreated { /// The node_id of the node which should receive this message node_id: PublicKey, @@ -120,8 +118,6 @@ pub enum Event { msg: msgs::FundingCreated, }, /// Used to indicate that a funding_locked message should be sent to the peer with the given node_id. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. SendFundingLocked { /// The node_id of the node which should receive these message(s) node_id: PublicKey, @@ -132,8 +128,6 @@ pub enum Event { }, /// Used to indicate that a series of HTLC update messages, as well as a commitment_signed /// message should be sent to the peer with the given node_id. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. UpdateHTLCs { /// The node_id of the node which should receive these message(s) node_id: PublicKey, @@ -141,8 +135,6 @@ pub enum Event { updates: msgs::CommitmentUpdate, }, /// Used to indicate that a revoke_and_ack message should be sent to the peer with the given node_id. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. SendRevokeAndACK { /// The node_id of the node which should receive this message node_id: PublicKey, @@ -150,8 +142,6 @@ pub enum Event { msg: msgs::RevokeAndACK, }, /// Used to indicate that a shutdown message should be sent to the peer with the given node_id. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. SendShutdown { /// The node_id of the node which should receive this message node_id: PublicKey, @@ -160,8 +150,6 @@ pub enum Event { }, /// Used to indicate that a channel_announcement and channel_update should be broadcast to all /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2). - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. BroadcastChannelAnnouncement { /// The channel_announcement which should be sent. msg: msgs::ChannelAnnouncement, @@ -169,17 +157,11 @@ pub enum Event { update_msg: msgs::ChannelUpdate, }, /// Used to indicate that a channel_update should be broadcast to all peers. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. BroadcastChannelUpdate { /// The channel_update which should be sent. msg: msgs::ChannelUpdate, }, - - //Error handling /// Broadcast an error downstream to be handled - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. HandleError { /// The node_id of the node which should receive this message node_id: PublicKey, @@ -188,14 +170,19 @@ pub enum Event { }, /// When a payment fails we may receive updates back from the hop where it failed. In such /// cases this event is generated so that we can inform the router of this information. - /// - /// This event is handled by PeerManager::process_events if you are using a PeerManager. PaymentFailureNetworkUpdate { /// The channel/node update which should be sent to router update: msgs::HTLCFailChannelUpdate, } } +/// A trait indicating an object may generate message send events +pub trait MessageSendEventsProvider { + /// Gets the list of pending events which were generated by previous actions, clearing the list + /// in the process. + fn get_and_clear_pending_msg_events(&self) -> Vec; +} + /// A trait indicating an object may generate events pub trait EventsProvider { /// Gets the list of pending events which were generated by previous actions, clearing the list diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index 2577bc9f9..31a19776a 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -74,7 +74,7 @@ impl chaininterface::BroadcasterInterface for TestBroadcaster { } pub struct TestChannelMessageHandler { - pub pending_events: Mutex>, + pub pending_events: Mutex>, } impl TestChannelMessageHandler { @@ -141,8 +141,8 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler { fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} } -impl events::EventsProvider for TestChannelMessageHandler { - fn get_and_clear_pending_events(&self) -> Vec { +impl events::MessageSendEventsProvider for TestChannelMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { let mut pending_events = self.pending_events.lock().unwrap(); let mut ret = Vec::new(); mem::swap(&mut ret, &mut *pending_events); -- 2.39.5