Remove redundant Vec in `get_and_clear_pending_msg_events`
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 771e7c4a8c94100232d7d1ce95b92537302dfb4c..fefa002ee7c932f101d2ea04af1af4bc9b3789c1 100644 (file)
@@ -487,6 +487,10 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
        /// Messages to send to the peer - pushed to in the same lock that they are generated in (except
        /// for broadcast messages, where ordering isn't as strict).
        pub(super) pending_msg_events: Vec<MessageSendEvent>,
+       /// The peer is currently connected (i.e. we've seen a
+       /// [`ChannelMessageHandler::peer_connected`] and no corresponding
+       /// [`ChannelMessageHandler::peer_disconnected`].
+       is_connected: bool,
 }
 
 /// Stores a PaymentSecret and any other data we may need to validate an inbound payment is
@@ -3845,7 +3849,7 @@ where
                let mut expected_amt_msat = None;
                let mut valid_mpp = true;
                let mut errs = Vec::new();
-               let mut per_peer_state = Some(self.per_peer_state.read().unwrap());
+               let per_peer_state = self.per_peer_state.read().unwrap();
                for htlc in sources.iter() {
                        let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) {
                                Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
@@ -3855,13 +3859,13 @@ where
                                }
                        };
 
-                       if let None = per_peer_state.as_ref().unwrap().get(&counterparty_node_id) {
+                       let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+                       if let None = peer_state_mutex_opt {
                                valid_mpp = false;
                                break;
                        }
 
-                       let peer_state_mutex = per_peer_state.as_ref().unwrap().get(&counterparty_node_id).unwrap();
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                       let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
 
                        if let None = peer_state.channel_by_id.get(&chan_id) {
@@ -3890,14 +3894,13 @@ where
 
                        claimable_amt_msat += htlc.value;
                }
+               mem::drop(per_peer_state);
                if sources.is_empty() || expected_amt_msat.is_none() {
-                       mem::drop(per_peer_state);
                        self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                        log_info!(self.logger, "Attempted to claim an incomplete payment which no longer had any available HTLCs!");
                        return;
                }
                if claimable_amt_msat != expected_amt_msat.unwrap() {
-                       mem::drop(per_peer_state);
                        self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
                        log_info!(self.logger, "Attempted to claim an incomplete payment, expected {} msat, had {} available to claim.",
                                expected_amt_msat.unwrap(), claimable_amt_msat);
@@ -3905,8 +3908,7 @@ where
                }
                if valid_mpp {
                        for htlc in sources.drain(..) {
-                               if per_peer_state.is_none() { per_peer_state = Some(self.per_peer_state.read().unwrap()); }
-                               if let Err((pk, err)) = self.claim_funds_from_hop(per_peer_state.take().unwrap(),
+                               if let Err((pk, err)) = self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
                                {
@@ -3918,7 +3920,6 @@ where
                                }
                        }
                }
-               mem::drop(per_peer_state);
                if !valid_mpp {
                        for htlc in sources.drain(..) {
                                let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
@@ -3939,11 +3940,11 @@ where
        }
 
        fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
-               per_peer_state_lock: RwLockReadGuard<HashMap<PublicKey, Mutex<PeerState<<SP::Target as SignerProvider>::Signer>>>>,
                prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
        -> Result<(), (PublicKey, MsgHandleErrInternal)> {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
 
+               let per_peer_state = self.per_peer_state.read().unwrap();
                let chan_id = prev_hop.outpoint.to_channel_id();
 
                let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
@@ -3951,83 +3952,76 @@ where
                        None => None
                };
 
-               let (found_channel, mut peer_state_opt) = if counterparty_node_id_opt.is_some() && per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).is_some() {
-                       let peer_mutex = per_peer_state_lock.get(&counterparty_node_id_opt.unwrap()).unwrap();
-                       let peer_state = peer_mutex.lock().unwrap();
-                       let found_channel = peer_state.channel_by_id.contains_key(&chan_id);
-                       (found_channel, Some(peer_state))
-               }  else { (false, None) };
-
-               if found_channel {
-                       let peer_state = &mut *peer_state_opt.as_mut().unwrap();
-                       if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
-                               let counterparty_node_id = chan.get().get_counterparty_node_id();
-                               match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
-                                       Ok(msgs_monitor_option) => {
-                                               if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
-                                                       match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
-                                                               ChannelMonitorUpdateStatus::Completed => {},
-                                                               e => {
-                                                                       log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
-                                                                               "Failed to update channel monitor with preimage {:?}: {:?}",
-                                                                               payment_preimage, e);
-                                                                       let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
-                                                                       mem::drop(peer_state_opt);
-                                                                       mem::drop(per_peer_state_lock);
-                                                                       self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                                                       return Err((counterparty_node_id, err));
-                                                               }
-                                                       }
-                                                       if let Some((msg, commitment_signed)) = msgs {
-                                                               log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
-                                                                       log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
-                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                       node_id: counterparty_node_id,
-                                                                       updates: msgs::CommitmentUpdate {
-                                                                               update_add_htlcs: Vec::new(),
-                                                                               update_fulfill_htlcs: vec![msg],
-                                                                               update_fail_htlcs: Vec::new(),
-                                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                                               update_fee: None,
-                                                                               commitment_signed,
-                                                                       }
-                                                               });
-                                                       }
-                                                       mem::drop(peer_state_opt);
-                                                       mem::drop(per_peer_state_lock);
-                                                       self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
-                                                       Ok(())
-                                               } else {
-                                                       Ok(())
-                                               }
-                                       },
-                                       Err((e, monitor_update)) => {
+               let mut peer_state_opt = counterparty_node_id_opt.as_ref().map(
+                       |counterparty_node_id| per_peer_state.get(counterparty_node_id).map(
+                               |peer_mutex| peer_mutex.lock().unwrap()
+                       )
+               ).unwrap_or(None);
+
+               if let Some(hash_map::Entry::Occupied(mut chan)) = peer_state_opt.as_mut().map(|peer_state| peer_state.channel_by_id.entry(chan_id))
+               {
+                       let counterparty_node_id = chan.get().get_counterparty_node_id();
+                       match chan.get_mut().get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger) {
+                               Ok(msgs_monitor_option) => {
+                                       if let UpdateFulfillCommitFetch::NewClaim { msgs, htlc_value_msat, monitor_update } = msgs_monitor_option {
                                                match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
                                                        ChannelMonitorUpdateStatus::Completed => {},
                                                        e => {
-                                                               // TODO: This needs to be handled somehow - if we receive a monitor update
-                                                               // with a preimage we *must* somehow manage to propagate it to the upstream
-                                                               // channel, or we must have an ability to receive the same update and try
-                                                               // again on restart.
-                                                               log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
-                                                                       "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
+                                                               log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Debug },
+                                                                       "Failed to update channel monitor with preimage {:?}: {:?}",
                                                                        payment_preimage, e);
-                                                       },
+                                                               let err = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, msgs.is_some()).unwrap_err();
+                                                               mem::drop(peer_state_opt);
+                                                               mem::drop(per_peer_state);
+                                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
+                                                               return Err((counterparty_node_id, err));
+                                                       }
                                                }
-                                               let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
-                                               if drop {
-                                                       chan.remove_entry();
+                                               if let Some((msg, commitment_signed)) = msgs {
+                                                       log_debug!(self.logger, "Claiming funds for HTLC with preimage {} resulted in a commitment_signed for channel {}",
+                                                               log_bytes!(payment_preimage.0), log_bytes!(chan.get().channel_id()));
+                                                       peer_state_opt.as_mut().unwrap().pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                               node_id: counterparty_node_id,
+                                                               updates: msgs::CommitmentUpdate {
+                                                                       update_add_htlcs: Vec::new(),
+                                                                       update_fulfill_htlcs: vec![msg],
+                                                                       update_fail_htlcs: Vec::new(),
+                                                                       update_fail_malformed_htlcs: Vec::new(),
+                                                                       update_fee: None,
+                                                                       commitment_signed,
+                                                               }
+                                                       });
                                                }
                                                mem::drop(peer_state_opt);
-                                               mem::drop(per_peer_state_lock);
-                                               self.handle_monitor_update_completion_actions(completion_action(None));
-                                               Err((counterparty_node_id, res))
-                                       },
-                               }
-                       } else {
-                               // We've held the peer_state mutex since finding the channel and setting
-                               // found_channel to true, so the channel can't have been dropped.
-                               unreachable!()
+                                               mem::drop(per_peer_state);
+                                               self.handle_monitor_update_completion_actions(completion_action(Some(htlc_value_msat)));
+                                               Ok(())
+                                       } else {
+                                               Ok(())
+                                       }
+                               },
+                               Err((e, monitor_update)) => {
+                                       match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), &monitor_update) {
+                                               ChannelMonitorUpdateStatus::Completed => {},
+                                               e => {
+                                                       // TODO: This needs to be handled somehow - if we receive a monitor update
+                                                       // with a preimage we *must* somehow manage to propagate it to the upstream
+                                                       // channel, or we must have an ability to receive the same update and try
+                                                       // again on restart.
+                                                       log_given_level!(self.logger, if e == ChannelMonitorUpdateStatus::PermanentFailure { Level::Error } else { Level::Info },
+                                                               "Failed to update channel monitor with preimage {:?} immediately prior to force-close: {:?}",
+                                                               payment_preimage, e);
+                                               },
+                                       }
+                                       let (drop, res) = convert_chan_err!(self, e, chan.get_mut(), &chan_id);
+                                       if drop {
+                                               chan.remove_entry();
+                                       }
+                                       mem::drop(peer_state_opt);
+                                       mem::drop(per_peer_state);
+                                       self.handle_monitor_update_completion_actions(completion_action(None));
+                                       Err((counterparty_node_id, res))
+                               },
                        }
                } else {
                        let preimage_update = ChannelMonitorUpdate {
@@ -4048,7 +4042,7 @@ where
                                        payment_preimage, update_res);
                        }
                        mem::drop(peer_state_opt);
-                       mem::drop(per_peer_state_lock);
+                       mem::drop(per_peer_state);
                        // Note that we do process the completion action here. This totally could be a
                        // duplicate claim, but we have no way of knowing without interrogating the
                        // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
@@ -4070,7 +4064,7 @@ where
                        },
                        HTLCSource::PreviousHopData(hop_data) => {
                                let prev_outpoint = hop_data.outpoint;
-                               let res = self.claim_funds_from_hop(self.per_peer_state.read().unwrap(), hop_data, payment_preimage,
+                               let res = self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat| {
                                                if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
                                                        let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -5712,9 +5706,7 @@ where
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                if peer_state.pending_msg_events.len() > 0 {
-                                       let mut peer_pending_events = Vec::new();
-                                       mem::swap(&mut peer_pending_events, &mut peer_state.pending_msg_events);
-                                       pending_events.append(&mut peer_pending_events);
+                                       pending_events.append(&mut peer_state.pending_msg_events);
                                }
                        }
 
@@ -6289,6 +6281,8 @@ where
                                                &events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
                                        }
                                });
+                               debug_assert!(peer_state.is_connected, "A disconnected peer cannot disconnect");
+                               peer_state.is_connected = false;
                        }
                }
                if no_channels_remain {
@@ -6319,10 +6313,14 @@ where
                                                channel_by_id: HashMap::new(),
                                                latest_features: init_msg.features.clone(),
                                                pending_msg_events: Vec::new(),
+                                               is_connected: true,
                                        }));
                                },
                                hash_map::Entry::Occupied(e) => {
-                                       e.get().lock().unwrap().latest_features = init_msg.features.clone();
+                                       let mut peer_state = e.get().lock().unwrap();
+                                       peer_state.latest_features = init_msg.features.clone();
+                                       debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice");
+                                       peer_state.is_connected = true;
                                },
                        }
                }
@@ -7341,6 +7339,7 @@ where
                                channel_by_id: peer_channels.remove(&peer_pubkey).unwrap_or(HashMap::new()),
                                latest_features: Readable::read(reader)?,
                                pending_msg_events: Vec::new(),
+                               is_connected: false,
                        };
                        per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
                }
@@ -8054,8 +8053,6 @@ mod tests {
 
                let payer_pubkey = nodes[0].node.get_our_node_id();
                let payee_pubkey = nodes[1].node.get_our_node_id();
-               nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
-               nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]);
                let route_params = RouteParameters {
@@ -8099,8 +8096,6 @@ mod tests {
 
                let payer_pubkey = nodes[0].node.get_our_node_id();
                let payee_pubkey = nodes[1].node.get_our_node_id();
-               nodes[0].node.peer_connected(&payee_pubkey, &msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None }).unwrap();
-               nodes[1].node.peer_connected(&payer_pubkey, &msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None }).unwrap();
 
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1]);
                let route_params = RouteParameters {