+macro_rules! handle_monitor_update_completion {
+ ($self: ident, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
+ let mut updates = $chan.monitor_updating_restored(&$self.logger,
+ &$self.node_signer, $self.genesis_hash, &$self.default_configuration,
+ $self.best_block.read().unwrap().height());
+ let counterparty_node_id = $chan.get_counterparty_node_id();
+ let channel_update = if updates.channel_ready.is_some() && $chan.is_usable() {
+ // We only send a channel_update in the case where we are just now sending a
+ // channel_ready and the channel is in a usable state. We may re-send a
+ // channel_update later through the announcement_signatures process for public
+ // channels, but there's no reason not to just inform our counterparty of our fees
+ // now.
+ if let Ok(msg) = $self.get_channel_update_for_unicast($chan) {
+ Some(events::MessageSendEvent::SendChannelUpdate {
+ node_id: counterparty_node_id,
+ msg,
+ })
+ } else { None }
+ } else { None };
+
+ let update_actions = $peer_state.monitor_update_blocked_actions
+ .remove(&$chan.channel_id()).unwrap_or(Vec::new());
+
+ let htlc_forwards = $self.handle_channel_resumption(
+ &mut $peer_state.pending_msg_events, $chan, updates.raa,
+ updates.commitment_update, updates.order, updates.accepted_htlcs,
+ updates.funding_broadcastable, updates.channel_ready,
+ updates.announcement_sigs);
+ if let Some(upd) = channel_update {
+ $peer_state.pending_msg_events.push(upd);
+ }
+
+ let channel_id = $chan.channel_id();
+ core::mem::drop($peer_state_lock);
+ core::mem::drop($per_peer_state_lock);
+
+ $self.handle_monitor_update_completion_actions(update_actions);
+
+ if let Some(forwards) = htlc_forwards {
+ $self.forward_htlcs(&mut [forwards][..]);
+ }
+ $self.finalize_claims(updates.finalized_claimed_htlcs);
+ for failure in updates.failed_htlcs.drain(..) {
+ let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
+ $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver);
+ }
+ } }
+}
+
+macro_rules! handle_new_monitor_update {
+ ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { {
+ // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
+ // any case so that it won't deadlock.
+ debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
+ match $update_res {
+ ChannelMonitorUpdateStatus::InProgress => {
+ log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
+ log_bytes!($chan.channel_id()[..]));
+ Ok(())
+ },
+ ChannelMonitorUpdateStatus::PermanentFailure => {
+ log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateStatus::PermanentFailure",
+ log_bytes!($chan.channel_id()[..]));
+ update_maps_on_chan_removal!($self, $chan);
+ let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown(
+ "ChannelMonitor storage failure".to_owned(), $chan.channel_id(),
+ $chan.get_user_id(), $chan.force_shutdown(false),
+ $self.get_channel_update_for_broadcast(&$chan).ok()));
+ $remove;
+ res
+ },
+ ChannelMonitorUpdateStatus::Completed => {
+ if ($update_id == 0 || $chan.get_next_monitor_update()
+ .expect("We can't be processing a monitor update if it isn't queued")
+ .update_id == $update_id) &&
+ $chan.get_latest_monitor_update_id() == $update_id
+ {
+ handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
+ }
+ Ok(())
+ },
+ }
+ } };
+ ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan_entry: expr) => {
+ handle_new_monitor_update!($self, $update_res, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan_entry.get_mut(), MANUALLY_REMOVING, $chan_entry.remove_entry())
+ }
+}
+
+macro_rules! process_events_body {
+ ($self: expr, $event_to_handle: expr, $handle_event: expr) => {
+ // We'll acquire our total consistency lock until the returned future completes so that
+ // we can be sure no other persists happen while processing events.
+ let _read_guard = $self.total_consistency_lock.read().unwrap();
+
+ let mut result = NotifyOption::SkipPersist;
+
+ // TODO: This behavior should be documented. It's unintuitive that we query
+ // ChannelMonitors when clearing other events.
+ if $self.process_pending_monitor_events() {
+ result = NotifyOption::DoPersist;
+ }
+
+ let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
+ if !pending_events.is_empty() {
+ result = NotifyOption::DoPersist;
+ }
+
+ for event in pending_events {
+ $event_to_handle = event;
+ $handle_event;
+ }
+
+ if result == NotifyOption::DoPersist {
+ $self.persistence_notifier.notify();
+ }
+ }
+}
+
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>