/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
- is_connected: bool,
+ pub is_connected: bool,
}
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
+ /// Tracks the message events that are to be broadcasted when we are connected to some peer.
+ pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
+
entropy_source: ES,
node_signer: NS,
signer_provider: SP,
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
- let mut msg_events = Vec::with_capacity(2);
+ let mut msg_event = None;
if let Some((shutdown_res, update_option)) = shutdown_finish {
let counterparty_node_id = shutdown_res.counterparty_node_id;
$self.finish_close_channel(shutdown_res);
if let Some(update) = update_option {
- msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
if let msgs::ErrorAction::IgnoreError = err.action {
} else {
- msg_events.push(events::MessageSendEvent::HandleError {
+ msg_event = Some(events::MessageSendEvent::HandleError {
node_id: $counterparty_node_id,
action: err.action.clone()
});
}
- if !msg_events.is_empty() {
+ if let Some(msg_event) = msg_event {
let per_peer_state = $self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
let mut peer_state = peer_state_mutex.lock().unwrap();
- peer_state.pending_msg_events.append(&mut msg_events);
+ peer_state.pending_msg_events.push(msg_event);
}
}
funding_batch_states: Mutex::new(BTreeMap::new()),
pending_offers_messages: Mutex::new(Vec::new()),
+ pending_broadcast_messages: Mutex::new(Vec::new()),
entropy_source,
node_signer,
}
};
if let Some(update) = update_opt {
- // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
- // not try to broadcast it via whatever peer we have.
- let per_peer_state = self.per_peer_state.read().unwrap();
- let a_peer_state_opt = per_peer_state.get(peer_node_id)
- .ok_or(per_peer_state.values().next());
- if let Ok(a_peer_state_mutex) = a_peer_state_opt {
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
- });
- }
+ // If we have some Channel Update to broadcast, we cache it and broadcast it later.
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
+ });
}
Ok(counterparty_node_id)
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
+
for channel_id in channel_ids {
if !peer_state.has_channel(channel_id) {
return Err(APIError::ChannelUnavailable {
}
if let ChannelPhase::Funded(channel) = channel_phase {
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
if n >= DISABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
if n >= ENABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
}
if let Some(ChannelPhase::Funded(chan)) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
/// will randomly be placed first or last in the returned array.
///
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
- /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
+ /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
/// the `MessageSendEvent`s to the specific peer they were generated under.
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let events = RefCell::new(Vec::new());
result = NotifyOption::DoPersist;
}
+ let mut is_any_peer_connected = false;
let mut pending_events = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
if peer_state.pending_msg_events.len() > 0 {
pending_events.append(&mut peer_state.pending_msg_events);
}
+ if peer_state.is_connected {
+ is_any_peer_connected = true
+ }
+ }
+
+ // Ensure that we are connected to some peers before getting broadcast messages.
+ if is_any_peer_connected {
+ let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
+ pending_events.append(&mut broadcast_msgs);
}
if !pending_events.is_empty() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
+
peer_state.channel_by_id.retain(|_, phase| {
match phase {
// Retain unfunded channels.
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
// Gossip
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
- &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
+ // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
+ // This check here is to ensure exhaustivity.
+ &events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
+ debug_assert!(false, "This event shouldn't have been here");
+ false
+ },
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
pending_offers_messages: Mutex::new(Vec::new()),
+ pending_broadcast_messages: Mutex::new(Vec::new()),
+
entropy_source: args.entropy_source,
node_signer: args.node_signer,
signer_provider: args.signer_provider,
}
}
+ #[test]
+ fn test_channel_update_cached() {
+ let chanmon_cfgs = create_chanmon_cfgs(3);
+ let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+ let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+ let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+ nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
+ check_added_monitors!(nodes[0], 1);
+ check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
+
+ // Confirm that the channel_update was not sent immediately to node[1] but was cached.
+ let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
+ assert_eq!(node_1_events.len(), 0);
+
+ {
+ // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+ assert_eq!(pending_broadcast_messages.len(), 1);
+ }
+
+ // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
+ nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+ nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+ nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+ assert_eq!(node_0_events.len(), 0);
+
+ // Now we reconnect to a peer
+ nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
+ features: nodes[2].node.init_features(), networks: None, remote_network_address: None
+ }, true).unwrap();
+ nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+ features: nodes[0].node.init_features(), networks: None, remote_network_address: None
+ }, false).unwrap();
+
+ // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+ assert_eq!(node_0_events.len(), 1);
+ match &node_0_events[0] {
+ MessageSendEvent::BroadcastChannelUpdate { .. } => (),
+ _ => panic!("Unexpected event"),
+ }
+ {
+ // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+ assert_eq!(pending_broadcast_messages.len(), 0);
+ }
+ }
+
#[test]
fn test_drop_disconnected_peers_when_removing_channels() {
let chanmon_cfgs = create_chanmon_cfgs(2);
}}
}
+/// Checks if at least one peer is connected.
+fn is_any_peer_connected(node: &Node) -> bool {
+ let peer_state = node.node.per_peer_state.read().unwrap();
+ for (_, peer_mutex) in peer_state.iter() {
+ let peer = peer_mutex.lock().unwrap();
+ if peer.is_connected { return true; }
+ }
+ false
+}
+
/// Check that a channel's closing channel update has been broadcasted, and optionally
/// check whether an error message event has occurred.
pub fn check_closed_broadcast(node: &Node, num_channels: usize, with_error_msg: bool) -> Vec<msgs::ErrorMessage> {
+ let mut dummy_connected = false;
+ if !is_any_peer_connected(node) {
+ connect_dummy_node(&node);
+ dummy_connected = true;
+ }
let msg_events = node.node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), if with_error_msg { num_channels * 2 } else { num_channels });
+ if dummy_connected {
+ disconnect_dummy_node(&node);
+ }
msg_events.into_iter().filter_map(|msg_event| {
match msg_event {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
nodes
}
+pub fn connect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) {
+ let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap();
+
+ let mut dummy_init_features = InitFeatures::empty();
+ dummy_init_features.set_static_remote_key_required();
+
+ let init_dummy = msgs::Init {
+ features: dummy_init_features,
+ networks: None,
+ remote_network_address: None
+ };
+
+ node.node.peer_connected(&node_id_dummy, &init_dummy, true).unwrap();
+ node.onion_messenger.peer_connected(&node_id_dummy, &init_dummy, true).unwrap();
+}
+
+pub fn disconnect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) {
+ let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap();
+ node.node.peer_disconnected(&node_id_dummy);
+ node.onion_messenger.peer_disconnected(&node_id_dummy);
+}
+
// Note that the following only works for CLTV values up to 128
pub const ACCEPTED_HTLC_SCRIPT_WEIGHT: usize = 137; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test
pub const ACCEPTED_HTLC_SCRIPT_WEIGHT_ANCHORS: usize = 140; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test
}
pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, 'b, 'c>>, a: usize, b: usize, needs_err_handle: bool, expected_error: &str) {
+ let mut dummy_connected = false;
+ if !is_any_peer_connected(&nodes[a]) {
+ connect_dummy_node(&nodes[a]);
+ dummy_connected = true
+ }
+
let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
assert_eq!(events_1.len(), 2);
- let as_update = match events_1[0] {
+ let as_update = match events_1[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
- match events_1[1] {
+ match events_1[0] {
MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
assert_eq!(node_id, nodes[b].node.get_our_node_id());
assert_eq!(msg.data, expected_error);
},
_ => panic!("Unexpected event"),
}
-
+ if dummy_connected {
+ disconnect_dummy_node(&nodes[a]);
+ dummy_connected = false;
+ }
+ if !is_any_peer_connected(&nodes[b]) {
+ connect_dummy_node(&nodes[b]);
+ dummy_connected = true;
+ }
let events_2 = nodes[b].node.get_and_clear_pending_msg_events();
assert_eq!(events_2.len(), if needs_err_handle { 1 } else { 2 });
- let bs_update = match events_2[0] {
+ let bs_update = match events_2.last().unwrap() {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
if !needs_err_handle {
- match events_2[1] {
+ match events_2[0] {
MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
assert_eq!(node_id, nodes[a].node.get_our_node_id());
assert_eq!(msg.data, expected_error);
_ => panic!("Unexpected event"),
}
}
-
+ if dummy_connected {
+ disconnect_dummy_node(&nodes[b]);
+ }
for node in nodes {
node.gossip_sync.handle_channel_update(&as_update).unwrap();
node.gossip_sync.handle_channel_update(&bs_update).unwrap();
connect_blocks(&nodes[3], TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + 1);
let events = nodes[3].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 2);
- let close_chan_update_1 = match events[0] {
+ let close_chan_update_1 = match events[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
- match events[1] {
+ match events[0] {
MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => {
assert_eq!(node_id, nodes[4].node.get_our_node_id());
},
connect_blocks(&nodes[4], TEST_FINAL_CLTV - CLTV_CLAIM_BUFFER + 2);
let events = nodes[4].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 2);
- let close_chan_update_2 = match events[0] {
+ let close_chan_update_2 = match events[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
- match events[1] {
+ match events[0] {
MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => {
assert_eq!(node_id, nodes[3].node.get_our_node_id());
},
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
- match events[1] {
+ match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
mine_transaction(&nodes[1], &commitment_tx[0]);
check_added_monitors!(nodes[1], 1);
let events = nodes[1].node.get_and_clear_pending_msg_events();
- match events[0] {
+ match events[1] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexpected event"),
}
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
- match events[1] {
+ match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
- match events[1] {
+ match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+ // Connect a dummy node for proper future events broadcasting
+ connect_dummy_node(&nodes[0]);
+
create_announced_chan_between_nodes(&nodes, 0, 1);
create_announced_chan_between_nodes(&nodes, 1, 0);
create_announced_chan_between_nodes(&nodes, 0, 1);