+
+ // Release the buffered onion message when reconnected
+ connect_peers(&nodes[0], &nodes[1]);
+ assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_some());
+ assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
+}
+
+#[test]
+fn drops_buffered_messages_waiting_for_peer_connection() {
+ let nodes = create_nodes(3);
+ let message = TestCustomMessage::Request;
+ let secp_ctx = Secp256k1::new();
+ add_channel_to_graph(&nodes[0], &nodes[1], &secp_ctx, 42);
+
+ let blinded_path = BlindedPath::new_for_message(
+ &[nodes[1].node_id, nodes[2].node_id], &*nodes[0].entropy_source, &secp_ctx
+ ).unwrap();
+ let destination = Destination::BlindedPath(blinded_path);
+
+ // Buffer an onion message for a disconnected peer
+ disconnect_peers(&nodes[0], &nodes[1]);
+ nodes[0].messenger.send_onion_message(message, destination, None).unwrap();
+
+ // Release the event so the timer can start ticking
+ let events = release_events(&nodes[0]);
+ assert_eq!(events.len(), 1);
+ match &events[0] {
+ Event::ConnectionNeeded { node_id, .. } => assert_eq!(*node_id, nodes[1].node_id),
+ e => panic!("Unexpected event: {:?}", e),
+ }
+
+ // Drop buffered messages for a disconnected peer after some timer ticks
+ use crate::onion_message::messenger::MAX_TIMER_TICKS;
+ for _ in 0..=MAX_TIMER_TICKS {
+ nodes[0].messenger.timer_tick_occurred();
+ }
+ connect_peers(&nodes[0], &nodes[1]);
+ assert!(nodes[0].messenger.next_onion_message_for_peer(nodes[1].node_id).is_none());
+}
+
+#[test]
+fn intercept_offline_peer_oms() {
+ // Ensure that if OnionMessenger is initialized with
+ // new_with_offline_peer_interception, we will intercept OMs for offline
+ // peers, generate the right events, and forward OMs when they are re-injected
+ // by the user.
+ let node_cfgs = vec![MessengerCfg::new(), MessengerCfg::new().with_offline_peer_interception(), MessengerCfg::new()];
+ let mut nodes = create_nodes_using_cfgs(node_cfgs);
+
+ let peer_conn_evs = release_events(&nodes[1]);
+ assert_eq!(peer_conn_evs.len(), 2);
+ for (i, ev) in peer_conn_evs.iter().enumerate() {
+ match ev {
+ Event::OnionMessagePeerConnected { peer_node_id } => {
+ let node_idx = if i == 0 { 0 } else { 2 };
+ assert_eq!(peer_node_id, &nodes[node_idx].node_id);
+ },
+ _ => panic!()
+ }
+ }
+
+ let message = TestCustomMessage::Response;
+ let secp_ctx = Secp256k1::new();
+ let blinded_path = BlindedPath::new_for_message(
+ &[nodes[1].node_id, nodes[2].node_id], &*nodes[2].entropy_source, &secp_ctx
+ ).unwrap();
+ let destination = Destination::BlindedPath(blinded_path);
+
+ // Disconnect the peers to ensure we intercept the OM.
+ disconnect_peers(&nodes[1], &nodes[2]);
+ nodes[0].messenger.send_onion_message(message, destination, None).unwrap();
+ let mut final_node_vec = nodes.split_off(2);
+ pass_along_path(&nodes);
+
+ let mut events = release_events(&nodes[1]);
+ assert_eq!(events.len(), 1);
+ let onion_message = match events.remove(0) {
+ Event::OnionMessageIntercepted { peer_node_id, message } => {
+ assert_eq!(peer_node_id, final_node_vec[0].node_id);
+ message
+ },
+ _ => panic!()
+ };
+
+ // Ensure that we'll refuse to forward the re-injected OM until after the
+ // outbound peer comes back online.
+ let err = nodes[1].messenger.forward_onion_message(onion_message.clone(), &final_node_vec[0].node_id).unwrap_err();
+ assert_eq!(err, SendError::InvalidFirstHop(final_node_vec[0].node_id));
+
+ connect_peers(&nodes[1], &final_node_vec[0]);
+ let peer_conn_ev = release_events(&nodes[1]);
+ assert_eq!(peer_conn_ev.len(), 1);
+ match peer_conn_ev[0] {
+ Event::OnionMessagePeerConnected { peer_node_id } => {
+ assert_eq!(peer_node_id, final_node_vec[0].node_id);
+ },
+ _ => panic!()