/// called [`funding_transaction_generated`] for outbound channels) being closed.
///
/// Note that you can be a bit lazier about writing out `ChannelManager` than you can be with
-/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST write each monitor update out to disk before
-/// returning from [`chain::Watch::watch_channel`]/[`update_channel`], with ChannelManagers, writing updates
-/// happens out-of-band (and will prevent any other `ChannelManager` operations from occurring during
-/// the serialization process). If the deserialized version is out-of-date compared to the
-/// [`ChannelMonitor`] passed by reference to [`read`], those channels will be force-closed based on the
-/// `ChannelMonitor` state and no funds will be lost (mod on-chain transaction fees).
+/// [`ChannelMonitor`]. With [`ChannelMonitor`] you MUST durably write each
+/// [`ChannelMonitorUpdate`] before returning from
+/// [`chain::Watch::watch_channel`]/[`update_channel`] or before completing async writes. With
+/// `ChannelManager`s, writing updates happens out-of-band (and will prevent any other
+/// `ChannelManager` operations from occurring during the serialization process). If the
+/// deserialized version is out-of-date compared to the [`ChannelMonitor`] passed by reference to
+/// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds
+/// will be lost (modulo on-chain transaction fees).
///
/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which
/// tells you the last block hash which was connected. You should get the best block tip before using the manager.
}
macro_rules! handle_new_monitor_update {
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, _internal, $remove: expr, $completed: expr) => { {
- // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
- // any case so that it won't deadlock.
- debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
+ ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { {
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
+ ChannelMonitorUpdateStatus::UnrecoverableError => {
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+ log_error!($self.logger, "{}", err_str);
+ panic!("{}", err_str);
+ },
ChannelMonitorUpdateStatus::InProgress => {
log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
&$chan.context.channel_id());
- Ok(false)
- },
- ChannelMonitorUpdateStatus::PermanentFailure => {
- log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
- &$chan.context.channel_id());
- update_maps_on_chan_removal!($self, &$chan.context);
- let res = Err(MsgHandleErrInternal::from_finish_shutdown(
- "ChannelMonitor storage failure".to_owned(), $chan.context.channel_id(),
- $chan.context.get_user_id(), $chan.context.force_shutdown(false),
- $self.get_channel_update_for_broadcast(&$chan).ok(), $chan.context.get_value_satoshis()));
- $remove;
- res
+ false
},
ChannelMonitorUpdateStatus::Completed => {
$completed;
- Ok(true)
+ true
},
}
} };
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING_INITIAL_MONITOR, $remove: expr) => {
- handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, $chan, _internal, $remove,
+ ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => {
+ handle_new_monitor_update!($self, $update_res, $chan, _internal,
handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
};
- ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr, INITIAL_MONITOR) => {
- if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
- handle_new_monitor_update!($self, $update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, chan, MANUALLY_REMOVING_INITIAL_MONITOR, { $chan_entry.remove() })
- } else {
- // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
- // update).
- debug_assert!(false);
- let channel_id = *$chan_entry.key();
- let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
- "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
- $chan_entry.get_mut(), &channel_id);
- $chan_entry.remove();
- Err(err)
- }
- };
- ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+ ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
.or_insert_with(Vec::new);
// During startup, we push monitor updates as background events through to here in
in_flight_updates.len() - 1
});
let update_res = $self.chain_monitor.update_channel($funding_txo, &in_flight_updates[idx]);
- handle_new_monitor_update!($self, update_res, $peer_state_lock, $peer_state,
- $per_peer_state_lock, $chan, _internal, $remove,
+ handle_new_monitor_update!($self, update_res, $chan, _internal,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
}
})
} };
- ($self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
- if let ChannelPhase::Funded(chan) = $chan_entry.get_mut() {
- handle_new_monitor_update!($self, $funding_txo, $update, $peer_state_lock, $peer_state,
- $per_peer_state_lock, chan, MANUALLY_REMOVING, { $chan_entry.remove() })
- } else {
- // We're not supposed to handle monitor updates for unfunded channels (they have no monitors to
- // update).
- debug_assert!(false);
- let channel_id = *$chan_entry.key();
- let (_, err) = convert_chan_phase_err!($self, ChannelError::Close(
- "Cannot update monitor for unfunded channels as they don't have monitors yet".into()),
- $chan_entry.get_mut(), &channel_id);
- $chan_entry.remove();
- Err(err)
- }
- }
}
macro_rules! process_events_body {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
- let result: Result<(), _> = loop {
- {
- let per_peer_state = self.per_peer_state.read().unwrap();
+ loop {
+ let per_peer_state = self.per_peer_state.read().unwrap();
- let peer_state_mutex = per_peer_state.get(counterparty_node_id)
- .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
+ let peer_state_mutex = per_peer_state.get(counterparty_node_id)
+ .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
- match peer_state.channel_by_id.entry(channel_id.clone()) {
- hash_map::Entry::Occupied(mut chan_phase_entry) => {
- if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
- let funding_txo_opt = chan.context.get_funding_txo();
- let their_features = &peer_state.latest_features;
- let (shutdown_msg, mut monitor_update_opt, htlcs) =
- chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
- failed_htlcs = htlcs;
+ match peer_state.channel_by_id.entry(channel_id.clone()) {
+ hash_map::Entry::Occupied(mut chan_phase_entry) => {
+ if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+ let funding_txo_opt = chan.context.get_funding_txo();
+ let their_features = &peer_state.latest_features;
+ let (shutdown_msg, mut monitor_update_opt, htlcs) =
+ chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
+ failed_htlcs = htlcs;
+
+ // We can send the `shutdown` message before updating the `ChannelMonitor`
+ // here as we don't need the monitor update to complete until we send a
+ // `shutdown_signed`, which we'll delay if we're pending a monitor update.
+ peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
+ node_id: *counterparty_node_id,
+ msg: shutdown_msg,
+ });
- // We can send the `shutdown` message before updating the `ChannelMonitor`
- // here as we don't need the monitor update to complete until we send a
- // `shutdown_signed`, which we'll delay if we're pending a monitor update.
- peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
- node_id: *counterparty_node_id,
- msg: shutdown_msg,
- });
+ debug_assert!(monitor_update_opt.is_none() || !chan.is_shutdown(),
+ "We can't both complete shutdown and generate a monitor update");
- // Update the monitor with the shutdown script if necessary.
- if let Some(monitor_update) = monitor_update_opt.take() {
- break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
- }
+ // Update the monitor with the shutdown script if necessary.
+ if let Some(monitor_update) = monitor_update_opt.take() {
+ handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
+ break;
+ }
- if chan.is_shutdown() {
- if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
- if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: channel_update
- });
- }
- self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+ if chan.is_shutdown() {
+ if let ChannelPhase::Funded(chan) = remove_channel_phase!(self, chan_phase_entry) {
+ if let Ok(channel_update) = self.get_channel_update_for_broadcast(&chan) {
+ peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: channel_update
+ });
}
+ self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
}
- break Ok(());
}
- },
- hash_map::Entry::Vacant(_) => (),
- }
+ break;
+ }
+ },
+ hash_map::Entry::Vacant(_) => {
+ // If we reach this point, it means that the channel_id either refers to an unfunded channel or
+ // it does not exist for this peer. Either way, we can attempt to force-close it.
+ //
+ // An appropriate error will be returned for non-existence of the channel if that's the case.
+ return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
+ },
}
- // If we reach this point, it means that the channel_id either refers to an unfunded channel or
- // it does not exist for this peer. Either way, we can attempt to force-close it.
- //
- // An appropriate error will be returned for non-existence of the channel if that's the case.
- return self.force_close_channel_with_peer(&channel_id, counterparty_node_id, None, false).map(|_| ())
- };
+ }
for htlc_source in failed_htlcs.drain(..) {
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
}
- let _ = handle_error!(self, result, *counterparty_node_id);
Ok(())
}
self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
}
- #[inline]
fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
+ debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+ #[cfg(debug_assertions)]
+ for (_, peer) in self.per_peer_state.read().unwrap().iter() {
+ debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
+ }
+
let (monitor_update_option, mut failed_htlcs) = shutdown_res;
log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
for htlc_source in failed_htlcs.drain(..) {
let peer_state_mutex = per_peer_state.get(peer_node_id)
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
let (update_opt, counterparty_node_id) = {
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
+ let mut peer_state = peer_state_mutex.lock().unwrap();
let closure_reason = if let Some(peer_msg) = peer_msg {
ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }
} else {
log_error!(self.logger, "Force-closing channel {}", channel_id);
self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason);
let mut chan_phase = remove_channel_phase!(self, chan_phase_entry);
+ mem::drop(peer_state);
+ mem::drop(per_peer_state);
match chan_phase {
ChannelPhase::Funded(mut chan) => {
self.finish_force_close_channel(chan.context.force_shutdown(broadcast));
}
};
if let Some(update) = update_opt {
- let mut peer_state = peer_state_mutex.lock().unwrap();
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
- });
+ // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
+ // not try to broadcast it via whatever peer we have.
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let a_peer_state_opt = per_peer_state.get(peer_node_id)
+ .ok_or(per_peer_state.values().next());
+ if let Ok(a_peer_state_mutex) = a_peer_state_opt {
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
+ a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
}
Ok(counterparty_node_id)
}, onion_packet, None, &self.fee_estimator, &self.logger);
match break_chan_phase_entry!(self, send_res, chan_phase_entry) {
Some(monitor_update) => {
- match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan_phase_entry) {
- Err(e) => break Err(e),
- Ok(false) => {
+ match handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan) {
+ false => {
// Note that MonitorUpdateInProgress here indicates (per function
// docs) that we will resend the commitment update once monitor
// updating completes. Therefore, we must return an error
// MonitorUpdateInProgress, below.
return Err(APIError::MonitorUpdateInProgress);
},
- Ok(true) => {},
+ true => {},
}
},
None => {},
},
BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => {
let mut updated_chan = false;
- let res = {
+ {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) {
hash_map::Entry::Occupied(mut chan_phase) => {
- updated_chan = true;
- handle_new_monitor_update!(self, funding_txo, update.clone(),
- peer_state_lock, peer_state, per_peer_state, chan_phase).map(|_| ())
+ if let ChannelPhase::Funded(chan) = chan_phase.get_mut() {
+ updated_chan = true;
+ handle_new_monitor_update!(self, funding_txo, update.clone(),
+ peer_state_lock, peer_state, per_peer_state, chan);
+ } else {
+ debug_assert!(false, "We shouldn't have an update for a non-funded channel");
+ }
},
- hash_map::Entry::Vacant(_) => Ok(()),
+ hash_map::Entry::Vacant(_) => {},
}
- } else { Ok(()) }
- };
+ }
+ }
if !updated_chan {
// TODO: Track this as in-flight even though the channel is closed.
let _ = self.chain_monitor.update_channel(funding_txo, &update);
}
- // TODO: If this channel has since closed, we're likely providing a payment
- // preimage update, which we must ensure is durable! We currently don't,
- // however, ensure that.
- if res.is_err() {
- log_error!(self.logger,
- "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!");
- }
- let _ = handle_error!(self, res, counterparty_node_id);
},
BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
let per_peer_state = self.per_peer_state.read().unwrap();
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
let mut timed_out_mpp_htlcs = Vec::new();
let mut pending_peers_awaiting_removal = Vec::new();
+ let mut shutdown_channels = Vec::new();
- let process_unfunded_channel_tick = |
+ let mut process_unfunded_channel_tick = |
chan_id: &ChannelId,
context: &mut ChannelContext<SP>,
unfunded_context: &mut UnfundedChannelContext,
"Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id);
update_maps_on_chan_removal!(self, &context);
self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed);
- self.finish_force_close_channel(context.force_shutdown(false));
+ shutdown_channels.push(context.force_shutdown(false));
pending_msg_events.push(MessageSendEvent::HandleError {
node_id: counterparty_node_id,
action: msgs::ErrorAction::SendErrorMessage {
let _ = handle_error!(self, err, counterparty_node_id);
}
+ for shutdown_res in shutdown_channels {
+ self.finish_force_close_channel(shutdown_res);
+ }
+
self.pending_outbound_payments.remove_stale_payments(&self.pending_events);
// Technically we don't need to do this here, but if we have holding cell entries in a
// This ensures that future code doesn't introduce a lock-order requirement for
// `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
// this function with any `per_peer_state` peer lock acquired would.
+ #[cfg(debug_assertions)]
for (_, peer) in self.per_peer_state.read().unwrap().iter() {
debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
}
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
if !during_init {
- let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan_phase_entry);
- if let Err(e) = res {
- // TODO: This is a *critical* error - we probably updated the outbound edge
- // of the HTLC's monitor with a preimage. We should retry this monitor
- // update over and over again until morale improves.
- log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
- return Err((counterparty_node_id, e));
- }
+ handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
+ peer_state, per_peer_state, chan);
} else {
// If we're running during init we cannot update a monitor directly -
// they probably haven't actually been loaded yet. Instead, push the
Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
},
hash_map::Entry::Vacant(e) => {
- match self.id_to_peer.lock().unwrap().entry(chan.context.channel_id()) {
+ let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
+ match id_to_peer_lock.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id))
},
hash_map::Entry::Vacant(i_e) => {
- i_e.insert(chan.context.get_counterparty_node_id());
- }
- }
-
- // There's no problem signing a counterparty's funding transaction if our monitor
- // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
- // accepted payment from yet. We do, however, need to wait to send our channel_ready
- // until we have persisted our monitor.
- let new_channel_id = funding_msg.channel_id;
- peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
- node_id: counterparty_node_id.clone(),
- msg: funding_msg,
- });
+ let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
+ if let Ok(persist_state) = monitor_res {
+ i_e.insert(chan.context.get_counterparty_node_id());
+ mem::drop(id_to_peer_lock);
+
+ // There's no problem signing a counterparty's funding transaction if our monitor
+ // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
+ // accepted payment from yet. We do, however, need to wait to send our channel_ready
+ // until we have persisted our monitor.
+ peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+ node_id: counterparty_node_id.clone(),
+ msg: funding_msg,
+ });
- let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
-
- if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
- let mut res = handle_new_monitor_update!(self, monitor_res, peer_state_lock, peer_state,
- per_peer_state, chan, MANUALLY_REMOVING_INITIAL_MONITOR,
- { peer_state.channel_by_id.remove(&new_channel_id) });
-
- // Note that we reply with the new channel_id in error messages if we gave up on the
- // channel, not the temporary_channel_id. This is compatible with ourselves, but the
- // spec is somewhat ambiguous here. Not a huge deal since we'll send error messages for
- // any messages referencing a previously-closed channel anyway.
- // We do not propagate the monitor update to the user as it would be for a monitor
- // that we didn't manage to store (and that we don't care about - we don't respond
- // with the funding_signed so the channel can never go on chain).
- if let Err(MsgHandleErrInternal { shutdown_finish: Some((res, _)), .. }) = &mut res {
- res.0 = None;
+ if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
+ handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
+ per_peer_state, chan, INITIAL_MONITOR);
+ } else {
+ unreachable!("This must be a funded channel as we just inserted it.");
+ }
+ Ok(())
+ } else {
+ log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
+ return Err(MsgHandleErrInternal::send_err_msg_no_close(
+ "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
+ funding_msg.channel_id));
+ }
}
- res.map(|_| ())
- } else {
- unreachable!("This must be a funded channel as we just inserted it.");
}
}
}
ChannelPhase::Funded(ref mut chan) => {
let monitor = try_chan_phase_entry!(self,
chan.funding_signed(&msg, best_block, &self.signer_provider, &self.logger), chan_phase_entry);
- let update_res = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor);
- let mut res = handle_new_monitor_update!(self, update_res, peer_state_lock, peer_state, per_peer_state, chan_phase_entry, INITIAL_MONITOR);
- if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
- // We weren't able to watch the channel to begin with, so no updates should be made on
- // it. Previously, full_stack_target found an (unreachable) panic when the
- // monitor update contained within `shutdown_finish` was applied.
- if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
- shutdown_finish.0.take();
- }
+ if let Ok(persist_status) = self.chain_monitor.watch_channel(chan.context.get_funding_txo().unwrap(), monitor) {
+ handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, chan, INITIAL_MONITOR);
+ Ok(())
+ } else {
+ try_chan_phase_entry!(self, Err(ChannelError::Close("Channel funding outpoint was a duplicate".to_owned())), chan_phase_entry)
}
- res.map(|_| ())
},
_ => {
return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id));
}
fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> {
- let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>;
- let result: Result<(), _> = loop {
+ let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new();
+ let mut finish_shutdown = None;
+ {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
}
// Update the monitor with the shutdown script if necessary.
if let Some(monitor_update) = monitor_update_opt {
- break handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ());
+ handle_new_monitor_update!(self, funding_txo_opt.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
}
- break Ok(());
},
ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) => {
let context = phase.context_mut();
log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
let mut chan = remove_channel_phase!(self, chan_phase_entry);
- self.finish_force_close_channel(chan.context_mut().force_shutdown(false));
- return Ok(());
+ finish_shutdown = Some(chan.context_mut().force_shutdown(false));
},
}
} else {
return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
- };
+ }
for htlc_source in dropped_htlcs.drain(..) {
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id: msg.channel_id };
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
}
+ if let Some(shutdown_res) = finish_shutdown {
+ self.finish_force_close_channel(shutdown_res);
+ }
- result
+ Ok(())
}
fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
let monitor_update_opt = try_chan_phase_entry!(self, chan.commitment_signed(&msg, &self.logger), chan_phase_entry);
if let Some(monitor_update) = monitor_update_opt {
handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update, peer_state_lock,
- peer_state, per_peer_state, chan_phase_entry).map(|_| ())
- } else { Ok(()) }
+ peer_state, per_peer_state, chan);
+ }
+ Ok(())
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a commitment_signed message for an unfunded channel!".into())), chan_phase_entry);
}
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
- let (htlcs_to_fail, res) = {
+ let htlcs_to_fail = {
let per_peer_state = self.per_peer_state.read().unwrap();
let mut peer_state_lock = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
} else { false };
let (htlcs_to_fail, monitor_update_opt) = try_chan_phase_entry!(self,
chan.revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan_phase_entry);
- let res = if let Some(monitor_update) = monitor_update_opt {
+ if let Some(monitor_update) = monitor_update_opt {
let funding_txo = funding_txo_opt
.expect("Funding outpoint must have been set for RAA handling to succeed");
handle_new_monitor_update!(self, funding_txo, monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan_phase_entry).map(|_| ())
- } else { Ok(()) };
- (htlcs_to_fail, res)
+ peer_state_lock, peer_state, per_peer_state, chan);
+ }
+ htlcs_to_fail
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a revoke_and_ack message for an unfunded channel!".into())), chan_phase_entry);
}
};
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id, counterparty_node_id);
- res
+ Ok(())
}
fn internal_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver);
}
},
- MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
- MonitorEvent::UpdateFailed(funding_outpoint) => {
+ MonitorEvent::HolderForceClosed(funding_outpoint) => {
let counterparty_node_id_opt = match counterparty_node_id {
Some(cp_id) => Some(cp_id),
None => {
msg: update
});
}
- let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
- ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
- } else {
- ClosureReason::CommitmentTxConfirmed
- };
- self.issue_channel_close_events(&chan.context, reason);
+ self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.context.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
fn check_free_holding_cells(&self) -> bool {
let mut has_monitor_update = false;
let mut failed_htlcs = Vec::new();
- let mut handle_errors = Vec::new();
// Walk our list of channels and find any that need to update. Note that when we do find an
// update, if it includes actions that must be taken afterwards, we have to drop the
if let Some(monitor_update) = monitor_opt {
has_monitor_update = true;
- let channel_id: ChannelId = *channel_id;
- let res = handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
- peer_state_lock, peer_state, per_peer_state, chan, MANUALLY_REMOVING,
- peer_state.channel_by_id.remove(&channel_id));
- if res.is_err() {
- handle_errors.push((counterparty_node_id, res));
- }
+ handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
+ peer_state_lock, peer_state, per_peer_state, chan);
continue 'peer_loop;
}
}
break 'peer_loop;
}
- let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
+ let has_update = has_monitor_update || !failed_htlcs.is_empty();
for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) {
self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id);
}
- for (counterparty_node_id, err) in handle_errors.drain(..) {
- let _ = handle_error!(self, err, counterparty_node_id);
- }
-
has_update
}
/// operation. It will double-check that nothing *else* is also blocking the same channel from
/// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
- let mut errors = Vec::new();
loop {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() {
log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor",
channel_funding_outpoint.to_channel_id());
- if let Err(e) = handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
- peer_state_lck, peer_state, per_peer_state, chan_phase_entry)
- {
- errors.push((e, counterparty_node_id));
- }
+ handle_new_monitor_update!(self, channel_funding_outpoint, monitor_update,
+ peer_state_lck, peer_state, per_peer_state, chan);
if further_update_exists {
// If there are more `ChannelMonitorUpdate`s to process, restart at the
// top of the loop.
}
break;
}
- for (err, counterparty_node_id) in errors {
- let res = Err::<(), _>(err);
- let _ = handle_error!(self, res, counterparty_node_id);
- }
}
fn handle_post_event_actions(&self, actions: Vec<EventCompletionAction>) {
TEST_FINAL_CLTV, false), 100_000);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
let payment_preimage = PaymentPreimage([42; 32]);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap();
);
let route = find_route(
&nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph,
- None, nodes[0].logger, &scorer, &(), &random_seed_bytes
+ None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let payment_id_2 = PaymentId([45; 32]);
nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage),
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &(), &random_seed_bytes
+ nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);
let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes();
let route = find_route(
&payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::<Vec<_>>()),
- nodes[0].logger, &scorer, &(), &random_seed_bytes
+ nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes
).unwrap();
let test_preimage = PaymentPreimage([42; 32]);