X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=23b45077af02bafb0ea53df1f55455260283cf66;hb=11b228b7b0d0f4f8cf0ca4829f89a9b281359606;hp=29c0141539b1a1a76d919938b43abb2b2ecb7616;hpb=341163eeb599a8c5c79e06f0e6d6e8fb2a1e756d;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 29c01415..23b45077 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2042,12 +2042,14 @@ macro_rules! handle_monitor_update_completion { } macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { { - // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in - // any case so that it won't deadlock. - debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); + ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { { debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!($self.logger, "{}", err_str); + panic!("{}", err_str); + }, ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", &$chan.context.channel_id()); @@ -2059,23 +2061,11 @@ macro_rules! handle_new_monitor_update { }, } } }; - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => { - handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, $chan, _internal, $remove, + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => { + handle_new_monitor_update!($self, $update_res, $chan, _internal, handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) }; - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => { - if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { - handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() }) - } else { - // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to - // update). Throwing away a monitor update could be dangerous, so we assert even in - // release builds. - panic!("Initial Monitors should not exist for non-funded channels"); - } - }; - ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { + ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); // During startup, we push monitor updates as background events through to here in @@ -2087,8 +2077,7 @@ macro_rules! handle_new_monitor_update { in_flight_updates.len() - 1 }); let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]); - handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state, - $per_peer_state_lock, $chan, _internal, $remove, + handle_new_monitor_update!($self, update_res, $chan, _internal, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { @@ -2096,17 +2085,6 @@ macro_rules! handle_new_monitor_update { } }) } }; - ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => { - if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() { - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state, - $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() }) - } else { - // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to - // update). Throwing away a monitor update could be dangerous, so we assert even in - // release builds. - panic!("Monitor updates should not exist for non-funded channels"); - } - } } macro_rules! process_events_body { @@ -2551,7 +2529,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry); + peer_state_lock, peer_state, per_peer_state, chan); break; } @@ -2648,8 +2626,13 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } - #[inline] fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { + debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + #[cfg(debug_assertions)] + for (_, peer) in self.per_peer_state.read().unwrap().iter() { + debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); + } + let (monitor_update_option, mut failed_htlcs) = shutdown_res; log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len()); for htlc_source in failed_htlcs.drain(..) { @@ -2675,8 +2658,7 @@ where let peer_state_mutex = per_peer_state.get(peer_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let (update_opt, counterparty_node_id) = { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + let mut peer_state = peer_state_mutex.lock().unwrap(); let closure_reason = if let Some(peer_msg) = peer_msg { ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) } } else { @@ -2686,6 +2668,8 @@ where log_error!(self.logger, "Force-closing channel {}", channel_id); self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason); let mut chan_phase = remove_channel_phase!(self, chan_phase_entry); + mem::drop(peer_state); + mem::drop(per_peer_state); match chan_phase { ChannelPhase::Funded(mut chan) => { self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); @@ -2708,10 +2692,17 @@ where } }; if let Some(update) = update_opt { - let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if + // not try to broadcast it via whatever peer we have. + let per_peer_state = self.per_peer_state.read().unwrap(); + let a_peer_state_opt = per_peer_state.get(peer_node_id) + .ok_or(per_peer_state.values().next()); + if let Ok(a_peer_state_mutex) = a_peer_state_opt { + let mut a_peer_state = a_peer_state_mutex.lock().unwrap(); + a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } } Ok(counterparty_node_id) @@ -3325,7 +3316,7 @@ where }, onion_packet, None, &self.fee_estimator, &self.logger); match break_chan_phase_entry!(self, send_res, chan_phase_entry) { Some(monitor_update) => { - match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) { + match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) { false => { // Note that MonitorUpdateInProgress here indicates (per function // docs) that we will resend the commitment update once monitor @@ -3397,9 +3388,8 @@ where /// In general, a path may raise: /// * [`APIError::InvalidRoute`] when an invalid route or forwarding parameter (cltv_delta, fee, /// node public key) is specified. - /// * [`APIError::ChannelUnavailable`] if the next-hop channel is not available for updates - /// (including due to previous monitor update failure or new permanent monitor update - /// failure). + /// * [`APIError::ChannelUnavailable`] if the next-hop channel is not available as it has been + /// closed, doesn't exist, or the peer is currently disconnected. /// * [`APIError::MonitorUpdateInProgress`] if a new monitor update failure prevented sending the /// relevant updates. /// @@ -4524,9 +4514,13 @@ where let peer_state = &mut *peer_state_lock; match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { hash_map::Entry::Occupied(mut chan_phase) => { - updated_chan = true; - handle_new_monitor_update!(self, funding_txo, update.clone(), - peer_state_lock, peer_state, per_peer_state, chan_phase); + if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { + updated_chan = true; + handle_new_monitor_update!(self, funding_txo, update.clone(), + peer_state_lock, peer_state, per_peer_state, chan); + } else { + debug_assert!(false, "We shouldn't have an update for a non-funded channel"); + } }, hash_map::Entry::Vacant(_) => {}, } @@ -4645,8 +4639,9 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); let mut pending_peers_awaiting_removal = Vec::new(); + let mut shutdown_channels = Vec::new(); - let process_unfunded_channel_tick = | + let mut process_unfunded_channel_tick = | chan_id: &ChannelId, context: &mut ChannelContext, unfunded_context: &mut UnfundedChannelContext, @@ -4659,7 +4654,7 @@ where "Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id); update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed); - self.finish_force_close_channel(context.force_shutdown(false)); + shutdown_channels.push(context.force_shutdown(false)); pending_msg_events.push(MessageSendEvent::HandleError { node_id: counterparty_node_id, action: msgs::ErrorAction::SendErrorMessage { @@ -4852,6 +4847,10 @@ where let _ = handle_error!(self, err, counterparty_node_id); } + for shutdown_res in shutdown_channels { + self.finish_force_close_channel(shutdown_res); + } + self.pending_outbound_payments.remove_stale_payments(&self.pending_events); // Technically we don't need to do this here, but if we have holding cell entries in a @@ -5008,6 +5007,7 @@ where // This ensures that future code doesn't introduce a lock-order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling // this function with any `per_peer_state` peer lock acquired would. + #[cfg(debug_assertions)] for (_, peer) in self.per_peer_state.read().unwrap().iter() { debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); } @@ -5259,7 +5259,7 @@ where } if !during_init { handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan_phase_entry); + peer_state, per_peer_state, chan); } else { // If we're running during init we cannot update a monitor directly - // they probably haven't actually been loaded yet. Instead, push the @@ -5903,7 +5903,6 @@ where // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't // accepted payment from yet. We do, however, need to wait to send our channel_ready // until we have persisted our monitor. - let new_channel_id = funding_msg.channel_id; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { node_id: counterparty_node_id.clone(), msg: funding_msg, @@ -5911,8 +5910,7 @@ where if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) { handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state, - per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR, - { peer_state.channel_by_id.remove(&new_channel_id) }); + per_peer_state, chan, INITIAL_MONITOR); } else { unreachable!("This must be a funded channel as we just inserted it."); } @@ -5947,7 +5945,7 @@ where let monitor = try_chan_phase_entry!(self, chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry); if let Ok(persist_status) = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor) { - handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR); + handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR); Ok(()) } else { try_chan_phase_entry!(self, Err(ChannelError::Close("Channel funding outpoint was a duplicate".to_owned())), chan_phase_entry) @@ -6017,7 +6015,8 @@ where } fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { - let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; + let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new(); + let mut finish_shutdown = None; { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6054,7 +6053,7 @@ where // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry); + peer_state_lock, peer_state, per_peer_state, chan); } }, ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => { @@ -6062,8 +6061,7 @@ where log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); let mut chan = remove_channel_phase!(self, chan_phase_entry); - self.finish_force_close_channel(chan.context_mut().force_shutdown(false)); - return Ok(()); + finish_shutdown = Some(chan.context_mut().force_shutdown(false)); }, } } else { @@ -6075,6 +6073,9 @@ where let reason = HTLCFailReason::from_failure_code(0x4000 | 8); self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } + if let Some(shutdown_res) = finish_shutdown { + self.finish_force_close_channel(shutdown_res); + } Ok(()) } @@ -6306,7 +6307,7 @@ where let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry); if let Some(monitor_update) = monitor_update_opt { handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock, - peer_state, per_peer_state, chan_phase_entry); + peer_state, per_peer_state, chan); } Ok(()) } else { @@ -6480,7 +6481,7 @@ where let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); handle_new_monitor_update!(self, funding_txo, monitor_update, - peer_state_lock, peer_state, per_peer_state, chan_phase_entry); + peer_state_lock, peer_state, per_peer_state, chan); } htlcs_to_fail } else { @@ -6777,10 +6778,8 @@ where if let Some(monitor_update) = monitor_opt { has_monitor_update = true; - let channel_id: ChannelId = *channel_id; handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, - peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING, - peer_state.channel_by_id.remove(&channel_id)); + peer_state_lock, peer_state, per_peer_state, chan); continue 'peer_loop; } } @@ -7089,7 +7088,6 @@ where /// operation. It will double-check that nothing *else* is also blocking the same channel from /// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly. fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { - let mut errors = Vec::new(); loop { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { @@ -7122,7 +7120,7 @@ where log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", channel_funding_outpoint.to_channel_id()); handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update, - peer_state_lck, peer_state, per_peer_state, chan_phase_entry); + peer_state_lck, peer_state, per_peer_state, chan); if further_update_exists { // If there are more `ChannelMonitorUpdate`s to process, restart at the // top of the loop. @@ -7141,10 +7139,6 @@ where } break; } - for (err, counterparty_node_id) in errors { - let res = Err::<(), _>(err); - let _ = handle_error!(self, res, counterparty_node_id); - } } fn handle_post_event_actions(&self, actions: Vec) { @@ -10132,7 +10126,7 @@ mod tests { TEST_FINAL_CLTV, false), 100_000); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -10166,7 +10160,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -10223,7 +10217,7 @@ mod tests { ); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_id_2 = PaymentId([45; 32]); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), @@ -10274,7 +10268,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -10319,7 +10313,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]);