{
let peers_lock = self.peers.read().unwrap();
- let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
- events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+ let chan_events = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ let route_events = self.message_handler.route_handler.get_and_clear_pending_msg_events();
let peers = &*peers_lock;
macro_rules! get_peer_for_forwarding {
{
if peers_to_disconnect.get($node_id).is_some() {
// If we've "disconnected" this peer, do not send to it.
- continue;
- }
- let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
- match descriptor_opt {
- Some(descriptor) => match peers.get(&descriptor) {
- Some(peer_mutex) => {
- let peer_lock = peer_mutex.lock().unwrap();
- if !peer_lock.handshake_complete() {
- continue;
+ None
+ } else {
+ let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
+ match descriptor_opt {
+ Some(descriptor) => match peers.get(&descriptor) {
+ Some(peer_mutex) => {
+ let peer_lock = peer_mutex.lock().unwrap();
+ if !peer_lock.handshake_complete() {
+ None
+ } else {
+ Some(peer_lock)
+ }
+ },
+ None => {
+ debug_assert!(false, "Inconsistent peers set state!");
+ None
}
- peer_lock
},
None => {
- debug_assert!(false, "Inconsistent peers set state!");
- continue;
- }
- },
- None => {
- continue;
- },
+ None
+ },
+ }
}
}
}
}
- for event in events_generated.drain(..) {
+ let mut handle_event = |event, from_chan_handler| {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendStfu { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
update_fulfill_htlcs.len(),
update_fail_htlcs.len(),
&commitment_signed.channel_id);
- let mut peer = get_peer_for_forwarding!(node_id);
+ let mut peer = get_peer_for_forwarding!(node_id)?;
for msg in update_add_htlcs {
self.enqueue_message(&mut *peer, msg);
}
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg);
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::HandleError { node_id, action } => {
let logger = WithContext::from(&self.logger, Some(node_id), None, None);
log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
- self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
- self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
msg.first_blocknum,
msg.number_of_blocks,
msg.sync_complete);
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
- self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
}
+ Some(())
+ };
+ for event in chan_events {
+ handle_event(event, true);
+ }
+ for event in route_events {
+ handle_event(event, false);
}
for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
if peers_to_disconnect.get(&node_id).is_some() { continue; }
- self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
+ self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg);
}
for (descriptor, peer_mutex) in peers.iter() {