From 5d7db56ba2cbde7650438d1ed24ea42347984563 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Viktor=20Tigerstr=C3=B6m?= <11711198+ViktorTigerstrom@users.noreply.github.com> Date: Mon, 19 Dec 2022 20:51:07 +0100 Subject: [PATCH] Store `pending_msg_events` per peer --- lightning/src/ln/channelmanager.rs | 281 ++++++++++++---------- lightning/src/ln/functional_test_utils.rs | 18 +- 2 files changed, 165 insertions(+), 134 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fefcefacc..fa754f872 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -436,9 +436,6 @@ struct ClaimablePayments { // Note this is only exposed in cfg(test): pub(super) struct ChannelHolder { - /// Messages to send to peers - pushed to in the same lock that they are generated in (except - /// for broadcast messages, where ordering isn't as strict). - pub(super) pending_msg_events: Vec, } /// Events which we process internally but cannot be procsesed immediately at the generation site @@ -470,6 +467,9 @@ pub(super) struct PeerState { pub(super) channel_by_id: HashMap<[u8; 32], Channel>, /// The latest `InitFeatures` we heard from the peer. latest_features: InitFeatures, + /// Messages to send to the peer - pushed to in the same lock that they are generated in (except + /// for broadcast messages, where ordering isn't as strict). + pub(super) pending_msg_events: Vec, } /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is @@ -1165,6 +1165,10 @@ macro_rules! handle_error { // entering the macro. assert!($self.channel_state.try_lock().is_ok()); assert!($self.pending_events.try_lock().is_ok()); + #[cfg(feature = "std")] + { + assert!($self.per_peer_state.try_write().is_ok()); + } } let mut msg_events = Vec::with_capacity(2); @@ -1194,7 +1198,31 @@ macro_rules! handle_error { } if !msg_events.is_empty() { - $self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events); + let per_peer_state = $self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + peer_state.pending_msg_events.append(&mut msg_events); + } + #[cfg(debug_assertions)] + { + if let None = per_peer_state.get(&$counterparty_node_id) { + // This shouldn't occour in tests unless an unkown counterparty_node_id + // has been passed to our message handling functions. + let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", $counterparty_node_id); + match err.action { + msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { ref channel_id, ref data } + } + => { + assert_eq!(*data, expected_error_str); + if let Some((err_channel_id, _user_channel_id)) = chan_id { + assert_eq!(*channel_id, err_channel_id); + } + } + _ => panic!("Unexpected event"), + } + } + } } // Return error in case higher-API need one @@ -1429,7 +1457,6 @@ where best_block: RwLock::new(params.best_block), channel_state: Mutex::new(ChannelHolder{ - pending_msg_events: Vec::new(), }), outbound_scid_aliases: Mutex::new(HashSet::new()), pending_inbound_payments: Mutex::new(HashMap::new()), @@ -1560,7 +1587,7 @@ where hash_map::Entry::Vacant(entry) => { entry.insert(channel); } } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: their_network_key, msg: res, }); @@ -1702,7 +1729,7 @@ where } } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, msg: shutdown_msg }); @@ -1710,7 +1737,7 @@ where if chan_entry.get().is_shutdown() { let channel = remove_channel!(self, chan_entry); if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) { - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: channel_update }); } @@ -1799,13 +1826,13 @@ where /// user closes, which will be re-exposed as the `ChannelClosed` reason. fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool) -> Result { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(peer_node_id); let mut chan = { - let per_peer_state = self.per_peer_state.read().unwrap(); - if let None = per_peer_state.get(peer_node_id) { + if let None = peer_state_mutex_opt { return Err(APIError::APIMisuseError{ err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) }); } - let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(chan) = peer_state.channel_by_id.entry(channel_id.clone()) { if let Some(peer_msg) = peer_msg { @@ -1821,8 +1848,8 @@ where log_error!(self.logger, "Force-closing channel {}", log_bytes!(channel_id[..])); self.finish_force_close_channel(chan.force_shutdown(broadcast)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1834,14 +1861,18 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) { Ok(counterparty_node_id) => { - self.channel_state.lock().unwrap().pending_msg_events.push( - events::MessageSendEvent::HandleError { - node_id: counterparty_node_id, - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } - }, - } - ); + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + peer_state.pending_msg_events.push( + events::MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() } + }, + } + ); + } Ok(()) }, Err(e) => Err(e) @@ -2339,7 +2370,7 @@ where } log_debug!(self.logger, "Sending payment along path resulted in a commitment_signed for channel {}", log_bytes!(chan_id)); - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: path.first().unwrap().pubkey, updates: msgs::CommitmentUpdate { update_add_htlcs: vec![update_add], @@ -2565,7 +2596,7 @@ where } }; - channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { node_id: chan.get_counterparty_node_id(), msg, }); @@ -2726,9 +2757,9 @@ where continue; } if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { node_id: channel.get_counterparty_node_id(), msg, }); @@ -3352,11 +3383,11 @@ where { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; let per_peer_state = self.per_peer_state.read().unwrap(); for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|chan_id, chan| { let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate); if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; } @@ -3797,7 +3828,8 @@ where } else { (false, None) }; if found_channel { - if let hash_map::Entry::Occupied(mut chan) = peer_state_opt.as_mut().unwrap().channel_by_id.entry(chan_id) { + let peer_state = &mut *peer_state_opt.as_mut().unwrap(); + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) { let counterparty_node_id = chan.get().get_counterparty_node_id(); match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) { Ok(msgs_monitor_option) => { @@ -3819,8 +3851,8 @@ where if let Some((msg, commitment_signed)) = msgs { log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}", log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id())); - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: chan.get().get_counterparty_node_id(), + peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: counterparty_node_id, updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), update_fulfill_htlcs: vec![msg], @@ -4045,11 +4077,11 @@ where }; let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock; + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if let None = peer_state_mutex_opt { return } + peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; let mut channel = { - if let None = per_peer_state.get(&counterparty_node_id) { return } - let peer_state_mutex = per_peer_state.get(&counterparty_node_id).unwrap(); - peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()){ hash_map::Entry::Occupied(chan) => chan, hash_map::Entry::Vacant(_) => return, @@ -4073,9 +4105,9 @@ where }) } else { None } } else { None }; - htlc_forwards = self.handle_channel_resumption(&mut channel_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); + htlc_forwards = self.handle_channel_resumption(&mut peer_state.pending_msg_events, channel.get_mut(), updates.raa, updates.commitment_update, updates.order, updates.accepted_htlcs, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); if let Some(upd) = channel_update { - channel_state.pending_msg_events.push(upd); + peer_state.pending_msg_events.push(upd); } (updates.failed_htlcs, updates.finalized_claimed_htlcs, counterparty_node_id) @@ -4158,12 +4190,12 @@ where msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), } } }; - channel_state.pending_msg_events.push(send_msg_err_event); + peer_state.pending_msg_events.push(send_msg_err_event); let _ = remove_channel!(self, channel); return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() }); } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: channel.get().get_counterparty_node_id(), msg: channel.get_mut().accept_inbound_channel(user_channel_id), }); @@ -4218,7 +4250,7 @@ where if channel.get_channel_type().requires_zero_conf() { return Err(MsgHandleErrInternal::send_err_msg_no_close("No zero confirmation channels accepted".to_owned(), msg.temporary_channel_id.clone())); } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { node_id: counterparty_node_id.clone(), msg: channel.accept_inbound_channel(user_channel_id), }); @@ -4334,12 +4366,12 @@ where i_e.insert(chan.get_counterparty_node_id()); } } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { node_id: counterparty_node_id.clone(), msg: funding_msg, }); if let Some(msg) = channel_ready { - send_channel_ready!(self, channel_state.pending_msg_events, chan, msg); + send_channel_ready!(self, peer_state.pending_msg_events, chan, msg); } e.insert(chan); } @@ -4382,7 +4414,7 @@ where }, } if let Some(msg) = channel_ready { - send_channel_ready!(self, channel_state.pending_msg_events, chan.get(), msg); + send_channel_ready!(self, peer_state.pending_msg_events, chan.get(), msg); } funding_tx }, @@ -4410,7 +4442,7 @@ where self.genesis_hash.clone(), &self.best_block.read().unwrap(), &self.logger), chan); if let Some(announcement_sigs) = announcement_sigs_opt { log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(chan.get().channel_id())); - channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures { node_id: counterparty_node_id.clone(), msg: announcement_sigs, }); @@ -4422,7 +4454,7 @@ where // announcement_signatures. log_trace!(self.logger, "Sending private initial channel_update for our counterparty on channel {}", log_bytes!(chan.get().channel_id())); if let Ok(msg) = self.get_channel_update_for_unicast(chan.get()) { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { node_id: counterparty_node_id.clone(), msg, }); @@ -4473,7 +4505,7 @@ where } if let Some(msg) = shutdown { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: *counterparty_node_id, msg, }); @@ -4495,21 +4527,19 @@ where } fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if let None = peer_state_mutex_opt { + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) + } let (tx, chan_option) = { - let mut channel_state_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_state_lock; - let per_peer_state = self.per_peer_state.read().unwrap(); - if let None = per_peer_state.get(counterparty_node_id) { - return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)) - } - let peer_state_mutex = per_peer_state.get(counterparty_node_id).unwrap(); - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_entry) => { let (closing_signed, tx) = try_chan_entry!(self, chan_entry.get_mut().closing_signed(&self.fee_estimator, &msg), chan_entry); if let Some(msg) = closing_signed { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { node_id: counterparty_node_id.clone(), msg, }); @@ -4532,8 +4562,9 @@ where } if let Some(chan) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut channel_state = self.channel_state.lock().unwrap(); - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -4679,12 +4710,12 @@ where return Err(e); } - channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { node_id: counterparty_node_id.clone(), msg: revoke_and_ack, }); if let Some(msg) = commitment_signed { - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: counterparty_node_id.clone(), updates: msgs::CommitmentUpdate { update_add_htlcs: Vec::new(), @@ -4831,7 +4862,7 @@ where } else { unreachable!(); } } if let Some(updates) = raa_updates.commitment_update { - channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { node_id: counterparty_node_id.clone(), updates, }); @@ -4896,7 +4927,7 @@ where return Err(MsgHandleErrInternal::from_no_close(LightningError{err: "Got an announcement_signatures before we were ready for it".to_owned(), action: msgs::ErrorAction::IgnoreError})); } - channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { + peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { msg: try_chan_entry!(self, chan.get_mut().announcement_signatures( self.get_our_node_id(), self.genesis_hash.clone(), self.best_block.read().unwrap().height(), msg), chan), // Note that announcement_signatures fails if the channel cannot be announced, @@ -4974,7 +5005,7 @@ where &*self.best_block.read().unwrap()), chan); let mut channel_update = None; if let Some(msg) = responses.shutdown_msg { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { node_id: counterparty_node_id.clone(), msg, }); @@ -4991,10 +5022,10 @@ where } let need_lnd_workaround = chan.get_mut().workaround_lnd_bug_4006.take(); htlc_forwards = self.handle_channel_resumption( - &mut channel_state.pending_msg_events, chan.get_mut(), responses.raa, responses.commitment_update, responses.order, + &mut peer_state.pending_msg_events, chan.get_mut(), responses.raa, responses.commitment_update, responses.order, Vec::new(), None, responses.channel_ready, responses.announcement_sigs); if let Some(upd) = channel_update { - channel_state.pending_msg_events.push(upd); + peer_state.pending_msg_events.push(upd); } need_lnd_workaround }, @@ -5047,9 +5078,9 @@ where if let Some(counterparty_node_id) = counterparty_node_id_opt { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { - let pending_msg_events = &mut channel_state.pending_msg_events; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) { let mut chan = remove_channel!(self, chan_entry); failed_channels.push(chan.force_shutdown(false)); @@ -5106,12 +5137,12 @@ where { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|channel_id, chan| { match chan.maybe_free_holding_cell_htlcs(&self.logger) { Ok((commitment_opt, holding_cell_failed_htlcs)) => { @@ -5172,12 +5203,12 @@ where { let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|channel_id, chan| { match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { Ok((msg_opt, tx_opt)) => { @@ -5525,6 +5556,19 @@ where R::Target: Router, L::Target: Logger, { + /// Returns `MessageSendEvent`s strictly ordered per-peer, in the order they were generated. + /// The returned array will contain `MessageSendEvent`s for different peers if + /// `MessageSendEvent`s to more than one peer exists, but `MessageSendEvent`s to the same peer + /// is always placed next to each other. + /// + /// Note that that while `MessageSendEvent`s are strictly ordered per-peer, the peer order for + /// the chunks of `MessageSendEvent`s for different peers is random. I.e. if the array contains + /// `MessageSendEvent`s for both `node_a` and `node_b`, the `MessageSendEvent`s for `node_a` + /// will randomly be placed first or last in the returned array. + /// + /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate` + /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among + /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { @@ -5544,8 +5588,16 @@ where } let mut pending_events = Vec::new(); - let mut channel_state = self.channel_state.lock().unwrap(); - mem::swap(&mut pending_events, &mut channel_state.pending_msg_events); + let per_peer_state = self.per_peer_state.read().unwrap(); + for (_cp_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if peer_state.pending_msg_events.len() > 0 { + let mut peer_pending_events = Vec::new(); + mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events); + pending_events.append(&mut peer_pending_events); + } + } if !pending_events.is_empty() { events.replace(pending_events); @@ -5746,11 +5798,11 @@ where { let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|_, channel| { let res = f(channel); if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { @@ -6036,12 +6088,12 @@ where let mut channel_state = self.channel_state.lock().unwrap(); let mut per_peer_state = self.per_peer_state.write().unwrap(); { - let pending_msg_events = &mut channel_state.pending_msg_events; log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates. We believe we {} make future connections to this peer.", log_pubkey!(counterparty_node_id), if no_connection_possible { "cannot" } else { "can" }); if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|_, chan| { chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); if chan.is_shutdown() { @@ -6053,31 +6105,31 @@ where } true }); + pending_msg_events.retain(|msg| { + match msg { + &events::MessageSendEvent::SendAcceptChannel { .. } => false, + &events::MessageSendEvent::SendOpenChannel { .. } => false, + &events::MessageSendEvent::SendFundingCreated { .. } => false, + &events::MessageSendEvent::SendFundingSigned { .. } => false, + &events::MessageSendEvent::SendChannelReady { .. } => false, + &events::MessageSendEvent::SendAnnouncementSignatures { .. } => false, + &events::MessageSendEvent::UpdateHTLCs { .. } => false, + &events::MessageSendEvent::SendRevokeAndACK { .. } => false, + &events::MessageSendEvent::SendClosingSigned { .. } => false, + &events::MessageSendEvent::SendShutdown { .. } => false, + &events::MessageSendEvent::SendChannelReestablish { .. } => false, + &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, + &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, + &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::SendChannelUpdate { .. } => false, + &events::MessageSendEvent::HandleError { .. } => false, + &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, + &events::MessageSendEvent::SendShortIdsQuery { .. } => false, + &events::MessageSendEvent::SendReplyChannelRange { .. } => false, + &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, + } + }); } - pending_msg_events.retain(|msg| { - match msg { - &events::MessageSendEvent::SendAcceptChannel { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendOpenChannel { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendFundingCreated { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendFundingSigned { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelReady { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::UpdateHTLCs { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendRevokeAndACK { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendClosingSigned { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendShutdown { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelReestablish { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelAnnouncement { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, - &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, - &events::MessageSendEvent::SendChannelUpdate { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id, - &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, - &events::MessageSendEvent::SendShortIdsQuery { .. } => false, - &events::MessageSendEvent::SendReplyChannelRange { .. } => false, - &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false, - } - }); mem::drop(channel_state); } if no_channels_remain { @@ -6107,6 +6159,7 @@ where e.insert(Mutex::new(PeerState { channel_by_id: HashMap::new(), latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), })); }, hash_map::Entry::Occupied(e) => { @@ -6117,12 +6170,12 @@ where let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; - let pending_msg_events = &mut channel_state.pending_msg_events; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; peer_state.channel_by_id.retain(|_, chan| { let retain = if chan.get_counterparty_node_id() == *counterparty_node_id { if !chan.have_received_message() { @@ -6183,7 +6236,7 @@ where let peer_state = &mut *peer_state_lock; if let Some(chan) = peer_state.channel_by_id.get_mut(&msg.channel_id) { if let Ok(msg) = chan.maybe_handle_error_without_close(self.genesis_hash) { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendOpenChannel { node_id: *counterparty_node_id, msg, }); @@ -7100,6 +7153,7 @@ where let peer_state = PeerState { channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()), latest_features: Readable::read(reader)?, + pending_msg_events: Vec::new(), }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); } @@ -7433,7 +7487,6 @@ where best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)), channel_state: Mutex::new(ChannelHolder { - pending_msg_events: Vec::new(), }), inbound_payment_key: expanded_inbound_key, pending_inbound_payments: Mutex::new(pending_inbound_payments), @@ -8064,23 +8117,6 @@ mod tests { check_closed_event!(nodes[1], 1, ClosureReason::CooperativeClosure); } - fn check_unkown_peer_msg_event<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>, closing_node_id: PublicKey, closed_channel_id: [u8; 32]){ - let close_msg_ev = node.node.get_and_clear_pending_msg_events(); - let expected_error_str = format!("Can't find a peer matching the passed counterparty node_id {}", closing_node_id); - match close_msg_ev[0] { - MessageSendEvent::HandleError { - ref node_id, action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { ref channel_id, ref data } - } - } => { - assert_eq!(*node_id, closing_node_id); - assert_eq!(*data, expected_error_str); - assert_eq!(*channel_id, closed_channel_id); - } - _ => panic!("Unexpected event"), - } - } - fn check_not_connected_to_peer_error(res_err: Result, expected_public_key: PublicKey) { let expected_message = format!("Not connected to node: {}", expected_public_key); check_api_misuse_error_message(expected_message, res_err) @@ -8225,23 +8261,18 @@ mod tests { check_not_connected_to_peer_error(nodes[0].node.create_channel(unkown_public_key, 1_000_000, 500_000_000, 42, None), unkown_public_key); nodes[1].node.handle_open_channel(&unkown_public_key, channelmanager::provided_init_features(), &open_channel_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, open_channel_msg.temporary_channel_id); nodes[0].node.handle_accept_channel(&unkown_public_key, channelmanager::provided_init_features(), &accept_channel_msg); - check_unkown_peer_msg_event(&nodes[0], unkown_public_key, open_channel_msg.temporary_channel_id); check_unkown_peer_error(nodes[0].node.accept_inbound_channel(&open_channel_msg.temporary_channel_id, &unkown_public_key, 42), unkown_public_key); + nodes[1].node.handle_funding_created(&unkown_public_key, &funding_created_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, open_channel_msg.temporary_channel_id); nodes[0].node.handle_funding_signed(&unkown_public_key, &funding_signed_msg); - check_unkown_peer_msg_event(&nodes[0], unkown_public_key, channel_id); nodes[0].node.handle_channel_ready(&unkown_public_key, &channel_ready_msg); - check_unkown_peer_msg_event(&nodes[0], unkown_public_key, channel_id); nodes[1].node.handle_announcement_signatures(&unkown_public_key, &announcement_signatures_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); check_unkown_peer_error(nodes[0].node.close_channel(&channel_id, &unkown_public_key), unkown_public_key); @@ -8254,34 +8285,24 @@ mod tests { check_unkown_peer_error(nodes[0].node.update_channel_config(&unkown_public_key, &[channel_id], &ChannelConfig::default()), unkown_public_key); nodes[0].node.handle_shutdown(&unkown_public_key, &channelmanager::provided_init_features(), &shutdown_msg); - check_unkown_peer_msg_event(&nodes[0], unkown_public_key, channel_id); nodes[1].node.handle_closing_signed(&unkown_public_key, &closing_signed_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[0].node.handle_channel_reestablish(&unkown_public_key, &channel_reestablish_msg); - check_unkown_peer_msg_event(&nodes[0], unkown_public_key, channel_id); nodes[1].node.handle_update_add_htlc(&unkown_public_key, &update_add_htlc_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_commitment_signed(&unkown_public_key, &commitment_signed_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_update_fail_malformed_htlc(&unkown_public_key, &malformed_update_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_update_fail_htlc(&unkown_public_key, &fail_update_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_update_fulfill_htlc(&unkown_public_key, &fulfill_update_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_revoke_and_ack(&unkown_public_key, &revoke_and_ack_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); nodes[1].node.handle_update_fee(&unkown_public_key, &update_fee_msg); - check_unkown_peer_msg_event(&nodes[1], unkown_public_key, channel_id); } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 4430128ea..e813d012f 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1324,10 +1324,20 @@ macro_rules! commitment_signed_dance { check_added_monitors!($node_a, 1); let channel_state = $node_a.node.channel_state.lock().unwrap(); - assert_eq!(channel_state.pending_msg_events.len(), 1); - if let MessageSendEvent::UpdateHTLCs { ref node_id, .. } = channel_state.pending_msg_events[0] { - assert_ne!(*node_id, $node_b.node.get_our_node_id()); - } else { panic!("Unexpected event"); } + let node_a_per_peer_state = $node_a.node.per_peer_state.read().unwrap(); + let mut number_of_msg_events = 0; + for (cp_id, peer_state_mutex) in node_a_per_peer_state.iter() { + let peer_state = peer_state_mutex.lock().unwrap(); + let cp_pending_msg_events = &peer_state.pending_msg_events; + number_of_msg_events += cp_pending_msg_events.len(); + if cp_pending_msg_events.len() == 1 { + if let MessageSendEvent::UpdateHTLCs { .. } = cp_pending_msg_events[0] { + assert_ne!(*cp_id, $node_b.node.get_our_node_id()); + } else { panic!("Unexpected event"); } + } + } + // Expecting the failure backwards event to the previous hop (not `node_b`) + assert_eq!(number_of_msg_events, 1); } else { assert!($node_a.node.get_and_clear_pending_msg_events().is_empty()); } -- 2.39.5