Merge pull request #2006 from TheBlueMatt/2023-02-no-recursive-read-locks
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 48c73a5be1edf31d32251e748062762be7c07a31..0757e117ce2e7660648856053070c12773b2b6b1 100644 (file)
@@ -1417,7 +1417,7 @@ macro_rules! emit_channel_ready_event {
 }
 
 macro_rules! handle_monitor_update_completion {
-       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr) => { {
+       ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
                let mut updates = $chan.monitor_updating_restored(&$self.logger,
                        &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
                        $self.best_block.read().unwrap().height());
@@ -1450,6 +1450,7 @@ macro_rules! handle_monitor_update_completion {
 
                let channel_id = $chan.channel_id();
                core::mem::drop($peer_state_lock);
+               core::mem::drop($per_peer_state_lock);
 
                $self.handle_monitor_update_completion_actions(update_actions);
 
@@ -1465,7 +1466,7 @@ macro_rules! handle_monitor_update_completion {
 }
 
 macro_rules! handle_new_monitor_update {
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
                // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
                // any case so that it won't deadlock.
                debug_assert!($self.id_to_peer.try_lock().is_ok());
@@ -1492,14 +1493,14 @@ macro_rules! handle_new_monitor_update {
                                        .update_id == $update_id) &&
                                        $chan.get_latest_monitor_update_id() == $update_id
                                {
-                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $chan);
+                                       handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
                                }
                                Ok(())
                        },
                }
        } };
-       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $chan_entry: expr) => {
-               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+       ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
+               handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
        }
 }
 
@@ -1835,7 +1836,7 @@ where
                                        if let Some(monitor_update) = monitor_update_opt.take() {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
                                        }
 
                                        if chan_entry.get().is_shutdown() {
@@ -2464,7 +2465,7 @@ where
                                        Some(monitor_update) => {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo, monitor_update);
-                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan) {
+                                               if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan) {
                                                        break Err(e);
                                                }
                                                if update_res == ChannelMonitorUpdateStatus::InProgress {
@@ -3991,7 +3992,8 @@ where
                        )
                ).unwrap_or(None);
 
-               if let Some(mut peer_state_lock) = peer_state_opt.take() {
+               if peer_state_opt.is_some() {
+                       let mut peer_state_lock = peer_state_opt.unwrap();
                        let peer_state = &mut *peer_state_lock;
                        if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
                                let counterparty_node_id = chan.get().get_counterparty_node_id();
@@ -4006,7 +4008,7 @@ where
                                        let update_id = monitor_update.update_id;
                                        let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, monitor_update);
                                        let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                               peer_state, chan);
+                                               peer_state, per_peer_state, chan);
                                        if let Err(e) = res {
                                                // TODO: This is a *critical* error - we probably updated the outbound edge
                                                // of the HTLC's monitor with a preimage. We should retry this monitor
@@ -4207,7 +4209,7 @@ where
                if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
                        return;
                }
-               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, channel.get_mut());
+               handle_monitor_update_completion!(self, highest_applied_update_id, peer_state_lock, peer_state, per_peer_state, channel.get_mut());
        }
 
        /// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
@@ -4513,7 +4515,8 @@ where
                                let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
 
                                let chan = e.insert(chan);
-                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
+                               let mut res = handle_new_monitor_update!(self, monitor_res, 0, peer_state_lock, peer_state,
+                                       per_peer_state, chan, MANUALLY_REMOVING, { peer_state.channel_by_id.remove(&new_channel_id) });
 
                                // Note that we reply with the new channel_id in error messages if we gave up on the
                                // channel, not the temporary_channel_id. This is compatible with ourselves, but the
@@ -4546,7 +4549,7 @@ where
                                let monitor = try_chan_entry!(self,
                                        chan.get_mut().funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan);
                                let update_res = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor);
-                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, chan);
+                               let mut res = handle_new_monitor_update!(self, update_res, 0, peer_state_lock, peer_state, per_peer_state, chan);
                                if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
                                        // We weren't able to watch the channel to begin with, so no updates should be made on
                                        // it. Previously, full_stack_target found an (unreachable) panic when the
@@ -4642,7 +4645,7 @@ where
                                        if let Some(monitor_update) = monitor_update_opt {
                                                let update_id = monitor_update.update_id;
                                                let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
-                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
+                                               break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, per_peer_state, chan_entry);
                                        }
                                        break Ok(());
                                },
@@ -4834,7 +4837,7 @@ where
                                let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
                                let update_id = monitor_update.update_id;
                                handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                       peer_state, chan)
+                                       peer_state, per_peer_state, chan)
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
                }
@@ -4940,12 +4943,11 @@ where
        fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
                let (htlcs_to_fail, res) = {
                        let per_peer_state = self.per_peer_state.read().unwrap();
-                       let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+                       let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
                                .ok_or_else(|| {
                                        debug_assert!(false);
                                        MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
-                               })?;
-                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               }).map(|mtx| mtx.lock().unwrap())?;
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
@@ -4953,8 +4955,8 @@ where
                                        let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan);
                                        let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update);
                                        let update_id = monitor_update.update_id;
-                                       let res = handle_new_monitor_update!(self, update_res, update_id, peer_state_lock,
-                                               peer_state, chan);
+                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                               peer_state_lock, peer_state, per_peer_state, chan);
                                        (htlcs_to_fail, res)
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5211,38 +5213,45 @@ where
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();
                let mut handle_errors = Vec::new();
-               let per_peer_state = self.per_peer_state.read().unwrap();
-
-               for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
-                       'chan_loop: loop {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
-                               for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
-                                       let counterparty_node_id = chan.get_counterparty_node_id();
-                                       let funding_txo = chan.get_funding_txo();
-                                       let (monitor_opt, holding_cell_failed_htlcs) =
-                                               chan.maybe_free_holding_cell_htlcs(&self.logger);
-                                       if !holding_cell_failed_htlcs.is_empty() {
-                                               failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
-                                       }
-                                       if let Some(monitor_update) = monitor_opt {
-                                               has_monitor_update = true;
 
-                                               let update_res = self.chain_monitor.update_channel(
-                                                       funding_txo.expect("channel is live"), monitor_update);
-                                               let update_id = monitor_update.update_id;
-                                               let channel_id: [u8; 32] = *channel_id;
-                                               let res = handle_new_monitor_update!(self, update_res, update_id,
-                                                       peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
-                                                       peer_state.channel_by_id.remove(&channel_id));
-                                               if res.is_err() {
-                                                       handle_errors.push((counterparty_node_id, res));
+               // Walk our list of channels and find any that need to update. Note that when we do find an
+               // update, if it includes actions that must be taken afterwards, we have to drop the
+               // per-peer state lock as well as the top level per_peer_state lock. Thus, we loop until we
+               // manage to go through all our peers without finding a single channel to update.
+               'peer_loop: loop {
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+                               'chan_loop: loop {
+                                       let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                                       let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
+                                       for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
+                                               let counterparty_node_id = chan.get_counterparty_node_id();
+                                               let funding_txo = chan.get_funding_txo();
+                                               let (monitor_opt, holding_cell_failed_htlcs) =
+                                                       chan.maybe_free_holding_cell_htlcs(&self.logger);
+                                               if !holding_cell_failed_htlcs.is_empty() {
+                                                       failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
+                                               }
+                                               if let Some(monitor_update) = monitor_opt {
+                                                       has_monitor_update = true;
+
+                                                       let update_res = self.chain_monitor.update_channel(
+                                                               funding_txo.expect("channel is live"), monitor_update);
+                                                       let update_id = monitor_update.update_id;
+                                                       let channel_id: [u8; 32] = *channel_id;
+                                                       let res = handle_new_monitor_update!(self, update_res, update_id,
+                                                               peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
+                                                               peer_state.channel_by_id.remove(&channel_id));
+                                                       if res.is_err() {
+                                                               handle_errors.push((counterparty_node_id, res));
+                                                       }
+                                                       continue 'peer_loop;
                                                }
-                                               continue 'chan_loop;
                                        }
+                                       break 'chan_loop;
                                }
-                               break 'chan_loop;
                        }
+                       break 'peer_loop;
                }
 
                let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
@@ -6765,27 +6774,36 @@ impl Readable for HTLCSource {
                        0 => {
                                let mut session_priv: crate::util::ser::RequiredWrapper<SecretKey> = crate::util::ser::RequiredWrapper(None);
                                let mut first_hop_htlc_msat: u64 = 0;
-                               let mut path = Some(Vec::new());
+                               let mut path: Option<Vec<RouteHop>> = Some(Vec::new());
                                let mut payment_id = None;
                                let mut payment_secret = None;
-                               let mut payment_params = None;
+                               let mut payment_params: Option<PaymentParameters> = None;
                                read_tlv_fields!(reader, {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
                                        (3, payment_secret, option),
                                        (4, path, vec_type),
-                                       (5, payment_params, option),
+                                       (5, payment_params, (option: ReadableArgs, 0)),
                                });
                                if payment_id.is_none() {
                                        // For backwards compat, if there was no payment_id written, use the session_priv bytes
                                        // instead.
                                        payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
                                }
+                               if path.is_none() || path.as_ref().unwrap().is_empty() {
+                                       return Err(DecodeError::InvalidValue);
+                               }
+                               let path = path.unwrap();
+                               if let Some(params) = payment_params.as_mut() {
+                                       if params.final_cltv_expiry_delta == 0 {
+                                               params.final_cltv_expiry_delta = path.last().unwrap().cltv_expiry_delta;
+                                       }
+                               }
                                Ok(HTLCSource::OutboundRoute {
                                        session_priv: session_priv.0.unwrap(),
                                        first_hop_htlc_msat,
-                                       path: path.unwrap(),
+                                       path,
                                        payment_id: payment_id.unwrap(),
                                        payment_secret,
                                        payment_params,
@@ -6932,7 +6950,10 @@ where
                let mut monitor_update_blocked_actions_per_peer = None;
                let mut peer_states = Vec::new();
                for (_, peer_state_mutex) in per_peer_state.iter() {
-                       peer_states.push(peer_state_mutex.lock().unwrap());
+                       // Because we're holding the owning `per_peer_state` write lock here there's no chance
+                       // of a lockorder violation deadlock - no other thread can be holding any
+                       // per_peer_state lock at all.
+                       peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
                }
 
                (serializable_peer_count).write(writer)?;
@@ -7520,7 +7541,10 @@ where
                        }
                }
 
-               let pending_outbounds = OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()) };
+               let pending_outbounds = OutboundPayments {
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
+                       retry_lock: Mutex::new(())
+               };
                if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
                        // If we have pending HTLCs to forward, assume we either dropped a
                        // `PendingHTLCsForwardable` or the user received it but never processed it as they
@@ -7983,7 +8007,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(expected_route.last().unwrap().node.get_our_node_id(), TEST_FINAL_CLTV),
                        final_value_msat: 100_000,
-                       final_cltv_expiry_delta: TEST_FINAL_CLTV,
                };
                let route = find_route(
                        &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
@@ -8074,7 +8097,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
@@ -8117,7 +8139,6 @@ mod tests {
                let route_params = RouteParameters {
                        payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                        final_value_msat: 10_000,
-                       final_cltv_expiry_delta: 40,
                };
                let network_graph = nodes[0].network_graph.clone();
                let first_hops = nodes[0].node.list_usable_channels();
@@ -8271,10 +8292,10 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
-
-                       assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
                }
 
+               assert_eq!(nodes[1].node.id_to_peer.lock().unwrap().len(), 0);
+
                let funding_created_msg = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 
                nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
@@ -8282,7 +8303,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // Assert that `nodes[1]`'s `id_to_peer` map is populated with the channel as soon as
                        // as it has the funding transaction.
                        let nodes_1_lock = nodes[1].node.id_to_peer.lock().unwrap();
@@ -8312,7 +8335,9 @@ mod tests {
                        let nodes_0_lock = nodes[0].node.id_to_peer.lock().unwrap();
                        assert_eq!(nodes_0_lock.len(), 1);
                        assert!(nodes_0_lock.contains_key(channel_id));
+               }
 
+               {
                        // At this stage, `nodes[1]` has proposed a fee for the closing transaction in the
                        // `handle_closing_signed` call above. As `nodes[1]` has not yet received the signature
                        // from `nodes[0]` for the closing transaction with the proposed fee, the channel is