Merge pull request #2996 from shaavan/issue2882.2 upstream/main
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Fri, 31 May 2024 21:57:11 +0000 (14:57 -0700)
committerGitHub <noreply@github.com>
Fri, 31 May 2024 21:57:11 +0000 (14:57 -0700)
Allow responding asynchronously to OnionMessage

1  2 
lightning/src/onion_message/functional_tests.rs

index 9850e53642c705ac21b950db1e6f68cf4e7822db,0c3c67d4483914c63beec442ab7efd7728cd9080..08be1b2c5027d64bdf599faca92f0b2ed510ef5c
@@@ -19,11 -19,11 +19,11 @@@ use crate::routing::test_utils::{add_ch
  use crate::sign::{NodeSigner, Recipient};
  use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer};
  use crate::util::test_utils;
- use super::messenger::{CustomOnionMessageHandler, DefaultMessageRouter, Destination, OnionMessagePath, OnionMessenger, PendingOnionMessage, Responder, ResponseInstruction, SendError};
+ use super::messenger::{CustomOnionMessageHandler, DefaultMessageRouter, Destination, OnionMessagePath, OnionMessenger, PendingOnionMessage, Responder, ResponseInstruction, SendError, SendSuccess};
  use super::offers::{OffersMessage, OffersMessageHandler};
  use super::packet::{OnionMessageContents, Packet};
  
 -use bitcoin::network::constants::Network;
 +use bitcoin::network::Network;
  use bitcoin::hashes::hex::FromHex;
  use bitcoin::secp256k1::{All, PublicKey, Secp256k1, SecretKey};
  
@@@ -81,20 -81,20 +81,20 @@@ impl OffersMessageHandler for TestOffer
  
  #[derive(Clone, Debug, PartialEq)]
  enum TestCustomMessage {
-       Request,
-       Response,
+       Ping,
+       Pong,
  }
  
- const CUSTOM_REQUEST_MESSAGE_TYPE: u64 = 4242;
- const CUSTOM_RESPONSE_MESSAGE_TYPE: u64 = 4343;
- const CUSTOM_REQUEST_MESSAGE_CONTENTS: [u8; 32] = [42; 32];
- const CUSTOM_RESPONSE_MESSAGE_CONTENTS: [u8; 32] = [43; 32];
+ const CUSTOM_PING_MESSAGE_TYPE: u64 = 4242;
+ const CUSTOM_PONG_MESSAGE_TYPE: u64 = 4343;
+ const CUSTOM_PING_MESSAGE_CONTENTS: [u8; 32] = [42; 32];
+ const CUSTOM_PONG_MESSAGE_CONTENTS: [u8; 32] = [43; 32];
  
  impl OnionMessageContents for TestCustomMessage {
        fn tlv_type(&self) -> u64 {
                match self {
-                       TestCustomMessage::Request => CUSTOM_REQUEST_MESSAGE_TYPE,
-                       TestCustomMessage::Response => CUSTOM_RESPONSE_MESSAGE_TYPE,
+                       TestCustomMessage::Ping => CUSTOM_PING_MESSAGE_TYPE,
+                       TestCustomMessage::Pong => CUSTOM_PONG_MESSAGE_TYPE,
                }
        }
        fn msg_type(&self) -> &'static str {
  impl Writeable for TestCustomMessage {
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
                match self {
-                       TestCustomMessage::Request => Ok(CUSTOM_REQUEST_MESSAGE_CONTENTS.write(w)?),
-                       TestCustomMessage::Response => Ok(CUSTOM_RESPONSE_MESSAGE_CONTENTS.write(w)?),
+                       TestCustomMessage::Ping => Ok(CUSTOM_PING_MESSAGE_CONTENTS.write(w)?),
+                       TestCustomMessage::Pong => Ok(CUSTOM_PONG_MESSAGE_CONTENTS.write(w)?),
                }
        }
  }
  
  struct TestCustomMessageHandler {
-       expected_messages: Mutex<VecDeque<TestCustomMessage>>,
+       expectations: Mutex<VecDeque<OnHandleCustomMessage>>,
+ }
+ struct OnHandleCustomMessage {
+       expect: TestCustomMessage,
+       include_reply_path: bool,
  }
  
  impl TestCustomMessageHandler {
        fn new() -> Self {
-               Self { expected_messages: Mutex::new(VecDeque::new()) }
+               Self { expectations: Mutex::new(VecDeque::new()) }
        }
  
        fn expect_message(&self, message: TestCustomMessage) {
-               self.expected_messages.lock().unwrap().push_back(message);
+               self.expectations.lock().unwrap().push_back(
+                       OnHandleCustomMessage {
+                               expect: message,
+                               include_reply_path: false,
+                       }
+               );
+       }
+       fn expect_message_and_response(&self, message: TestCustomMessage) {
+               self.expectations.lock().unwrap().push_back(
+                       OnHandleCustomMessage {
+                               expect: message,
+                               include_reply_path: true,
+                       }
+               );
+       }
+       fn get_next_expectation(&self) -> OnHandleCustomMessage {
+               self.expectations.lock().unwrap().pop_front().expect("No expectations remaining")
        }
  }
  
@@@ -132,38 -155,45 +155,45 @@@ impl Drop for TestCustomMessageHandler 
                                return;
                        }
                }
-               assert!(self.expected_messages.lock().unwrap().is_empty());
+               assert!(self.expectations.lock().unwrap().is_empty());
        }
  }
  
  impl CustomOnionMessageHandler for TestCustomMessageHandler {
        type CustomMessage = TestCustomMessage;
        fn handle_custom_message(&self, msg: Self::CustomMessage, responder: Option<Responder>) -> ResponseInstruction<Self::CustomMessage> {
-               match self.expected_messages.lock().unwrap().pop_front() {
-                       Some(expected_msg) => assert_eq!(expected_msg, msg),
-                       None => panic!("Unexpected message: {:?}", msg),
-               }
-               let response_option = match msg {
-                       TestCustomMessage::Request => Some(TestCustomMessage::Response),
-                       TestCustomMessage::Response => None,
+               let expectation = self.get_next_expectation();
+               assert_eq!(msg, expectation.expect);
+               let response = match msg {
+                       TestCustomMessage::Ping => TestCustomMessage::Pong,
+                       TestCustomMessage::Pong => TestCustomMessage::Ping,
                };
-               if let (Some(response), Some(responder)) = (response_option, responder) {
-                       responder.respond(response)
-               } else {
-                       ResponseInstruction::NoResponse
+               // Sanity check: expecting to include reply path when responder is absent should panic.
+               if expectation.include_reply_path && responder.is_none() {
+                       panic!("Expected to include a reply_path, but the responder was absent.")
+               }
+               match responder {
+                       Some(responder) if expectation.include_reply_path => {
+                               responder.respond_with_reply_path(response)
+                       },
+                       Some(responder) => responder.respond(response),
+                       None => ResponseInstruction::NoResponse,
                }
        }
        fn read_custom_message<R: io::Read>(&self, message_type: u64, buffer: &mut R) -> Result<Option<Self::CustomMessage>, DecodeError> where Self: Sized {
                match message_type {
-                       CUSTOM_REQUEST_MESSAGE_TYPE => {
+                       CUSTOM_PING_MESSAGE_TYPE => {
                                let buf = read_to_end(buffer)?;
-                               assert_eq!(buf, CUSTOM_REQUEST_MESSAGE_CONTENTS);
-                               Ok(Some(TestCustomMessage::Request))
+                               assert_eq!(buf, CUSTOM_PING_MESSAGE_CONTENTS);
+                               Ok(Some(TestCustomMessage::Ping))
                        },
-                       CUSTOM_RESPONSE_MESSAGE_TYPE => {
+                       CUSTOM_PONG_MESSAGE_TYPE => {
                                let buf = read_to_end(buffer)?;
-                               assert_eq!(buf, CUSTOM_RESPONSE_MESSAGE_CONTENTS);
-                               Ok(Some(TestCustomMessage::Response))
+                               assert_eq!(buf, CUSTOM_PONG_MESSAGE_CONTENTS);
+                               Ok(Some(TestCustomMessage::Pong))
                        },
                        _ => Ok(None),
                }
@@@ -298,18 -328,18 +328,18 @@@ fn pass_along_path(path: &Vec<Messenger
  #[test]
  fn one_unblinded_hop() {
        let nodes = create_nodes(2);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let destination = Destination::Node(nodes[1].node_id);
        nodes[0].messenger.send_onion_message(test_msg, destination, None).unwrap();
-       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  }
  
  #[test]
  fn two_unblinded_hops() {
        let nodes = create_nodes(3);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let path = OnionMessagePath {
                intermediate_nodes: vec![nodes[1].node_id],
        };
  
        nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
-       nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[2].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  }
  
  #[test]
  fn one_blinded_hop() {
        let nodes = create_nodes(2);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let secp_ctx = Secp256k1::new();
        let blinded_path = BlindedPath::new_for_message(&[], nodes[1].node_id, &*nodes[1].entropy_source, &secp_ctx).unwrap();
        let destination = Destination::BlindedPath(blinded_path);
        nodes[0].messenger.send_onion_message(test_msg, destination, None).unwrap();
-       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  }
  
  #[test]
  fn two_unblinded_two_blinded() {
        let nodes = create_nodes(5);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let secp_ctx = Secp256k1::new();
        let intermediate_nodes = [ForwardNode { node_id: nodes[3].node_id, short_channel_id: None }];
        };
  
        nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
-       nodes[4].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[4].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  }
  
  #[test]
  fn three_blinded_hops() {
        let nodes = create_nodes(4);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let secp_ctx = Secp256k1::new();
        let intermediate_nodes = [
        let destination = Destination::BlindedPath(blinded_path);
  
        nodes[0].messenger.send_onion_message(test_msg, destination, None).unwrap();
-       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Pong);
+       pass_along_path(&nodes);
+ }
+ #[test]
+ fn async_response_over_one_blinded_hop() {
+       // Simulate an asynchronous interaction between two nodes, Alice and Bob.
+       // 1. Set up the network with two nodes: Alice and Bob.
+       let nodes = create_nodes(2);
+       let alice = &nodes[0];
+       let bob = &nodes[1];
+       // 2. Define the message sent from Bob to Alice.
+       let message = TestCustomMessage::Ping;
+       let path_id = Some([2; 32]);
+       // 3. Simulate the creation of a Blinded Reply path provided by Bob.
+       let secp_ctx = Secp256k1::new();
+       let reply_path = BlindedPath::new_for_message(&[], nodes[1].node_id, &*nodes[1].entropy_source, &secp_ctx).unwrap();
+       // 4. Create a responder using the reply path for Alice.
+       let responder = Some(Responder::new(reply_path, path_id));
+       // 5. Expect Alice to receive the message and create a response instruction for it.
+       alice.custom_message_handler.expect_message(message.clone());
+       let response_instruction = nodes[0].custom_message_handler.handle_custom_message(message, responder);
+       // 6. Simulate Alice asynchronously responding back to Bob with a response.
+       assert_eq!(
+               nodes[0].messenger.handle_onion_message_response(response_instruction),
+               Ok(Some(SendSuccess::Buffered)),
+       );
+       bob.custom_message_handler.expect_message(TestCustomMessage::Pong);
+       pass_along_path(&nodes);
+ }
+ #[test]
+ fn async_response_with_reply_path_succeeds() {
+       // Simulate an asynchronous interaction between two nodes, Alice and Bob.
+       // Create a channel between the two nodes to establish them as announced nodes,
+       // which allows the creation of the reply_path for successful communication.
+       let mut nodes = create_nodes(2);
+       let alice = &nodes[0];
+       let bob = &nodes[1];
+       let secp_ctx = Secp256k1::new();
+       add_channel_to_graph(alice, bob, &secp_ctx, 24);
+       // Alice receives a message from Bob with an added reply_path for responding back.
+       let message = TestCustomMessage::Ping;
+       let path_id = Some([2; 32]);
+       let reply_path = BlindedPath::new_for_message(&[], bob.node_id, &*bob.entropy_source, &secp_ctx).unwrap();
+       // Alice asynchronously responds to Bob, expecting a response back from him.
+       let responder = Responder::new(reply_path, path_id);
+       alice.custom_message_handler.expect_message_and_response(message.clone());
+       let response_instruction = alice.custom_message_handler.handle_custom_message(message, Some(responder));
+       assert_eq!(
+               alice.messenger.handle_onion_message_response(response_instruction),
+               Ok(Some(SendSuccess::Buffered)),
+       );
+       // Set Bob's expectation and pass the Onion Message along the path.
+       bob.custom_message_handler.expect_message(TestCustomMessage::Pong);
+       pass_along_path(&nodes);
+       // Bob responds back to Alice using the reply_path she included with the OnionMessage.
+       // Set Alice's expectation and reverse the path for the response.
+       alice.custom_message_handler.expect_message(TestCustomMessage::Ping);
+       nodes.reverse();
        pass_along_path(&nodes);
  }
  
+ #[test]
+ fn async_response_with_reply_path_fails() {
+       // Simulate an asynchronous interaction between two unannounced nodes, Alice and Bob.
+       // Since the nodes are unannounced, attempting to respond using a reply_path
+       // will fail, leading to an expected failure in communication.
+       let nodes = create_nodes(2);
+       let alice = &nodes[0];
+       let bob = &nodes[1];
+       let secp_ctx = Secp256k1::new();
+       // Alice receives a message from Bob with an added reply_path for responding back.
+       let message = TestCustomMessage::Ping;
+       let path_id = Some([2; 32]);
+       let reply_path = BlindedPath::new_for_message(&[], bob.node_id, &*bob.entropy_source, &secp_ctx).unwrap();
+       // Alice tries to asynchronously respond to Bob, but fails because the nodes are unannounced.
+       // Therefore, the reply_path cannot be used for the response.
+       let responder = Responder::new(reply_path, path_id);
+       alice.custom_message_handler.expect_message_and_response(message.clone());
+       let response_instruction = alice.custom_message_handler.handle_custom_message(message, Some(responder));
+       assert_eq!(
+               alice.messenger.handle_onion_message_response(response_instruction),
+               Err(SendError::PathNotFound),
+       );
+ }
  #[test]
  fn too_big_packet_error() {
        // Make sure we error as expected if a packet is too big to send.
        let nodes = create_nodes(2);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let hop_node_id = nodes[1].node_id;
        let hops = vec![hop_node_id; 400];
@@@ -394,7 -526,7 +526,7 @@@ fn we_are_intro_node() 
        // If we are sending straight to a blinded path and we are the introduction node, we need to
        // advance the blinded path by 1 hop so the second hop is the new introduction node.
        let mut nodes = create_nodes(3);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let secp_ctx = Secp256k1::new();
        let intermediate_nodes = [
        let destination = Destination::BlindedPath(blinded_path);
  
        nodes[0].messenger.send_onion_message(test_msg.clone(), destination, None).unwrap();
-       nodes[2].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[2].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  
        // Try with a two-hop blinded path where we are the introduction node.
        let blinded_path = BlindedPath::new_for_message(&intermediate_nodes, nodes[1].node_id, &*nodes[1].entropy_source, &secp_ctx).unwrap();
        let destination = Destination::BlindedPath(blinded_path);
        nodes[0].messenger.send_onion_message(test_msg, destination, None).unwrap();
-       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[1].custom_message_handler.expect_message(TestCustomMessage::Pong);
        nodes.remove(2);
        pass_along_path(&nodes);
  }
  fn invalid_blinded_path_error() {
        // Make sure we error as expected if a provided blinded path has 0 hops.
        let nodes = create_nodes(3);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let secp_ctx = Secp256k1::new();
        let intermediate_nodes = [ForwardNode { node_id: nodes[1].node_id, short_channel_id: None }];
  #[test]
  fn reply_path() {
        let mut nodes = create_nodes(4);
-       let test_msg = TestCustomMessage::Request;
+       let test_msg = TestCustomMessage::Ping;
        let secp_ctx = Secp256k1::new();
  
        // Destination::Node
        ];
        let reply_path = BlindedPath::new_for_message(&intermediate_nodes, nodes[0].node_id, &*nodes[0].entropy_source, &secp_ctx).unwrap();
        nodes[0].messenger.send_onion_message_using_path(path, test_msg.clone(), Some(reply_path)).unwrap();
-       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request);
+       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Ping);
        pass_along_path(&nodes);
        // Make sure the last node successfully decoded the reply path.
-       nodes[0].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[0].custom_message_handler.expect_message(TestCustomMessage::Pong);
        nodes.reverse();
        pass_along_path(&nodes);
  
        let reply_path = BlindedPath::new_for_message(&intermediate_nodes, nodes[0].node_id, &*nodes[0].entropy_source, &secp_ctx).unwrap();
  
        nodes[0].messenger.send_onion_message(test_msg, destination, Some(reply_path)).unwrap();
-       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Request);
+       nodes[3].custom_message_handler.expect_message(TestCustomMessage::Ping);
        pass_along_path(&nodes);
  
        // Make sure the last node successfully decoded the reply path.
-       nodes[0].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[0].custom_message_handler.expect_message(TestCustomMessage::Pong);
        nodes.reverse();
        pass_along_path(&nodes);
  }
@@@ -510,7 -642,7 +642,7 @@@ fn invalid_custom_message_type() 
  #[test]
  fn peer_buffer_full() {
        let nodes = create_nodes(2);
-       let test_msg = TestCustomMessage::Request;
+       let test_msg = TestCustomMessage::Ping;
        let destination = Destination::Node(nodes[1].node_id);
        for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
                nodes[0].messenger.send_onion_message(test_msg.clone(), destination.clone(), None).unwrap();
@@@ -525,7 -657,7 +657,7 @@@ fn many_hops() 
        // of size [`crate::onion_message::packet::BIG_PACKET_HOP_DATA_LEN`].
        let num_nodes: usize = 25;
        let nodes = create_nodes(num_nodes as u8);
-       let test_msg = TestCustomMessage::Response;
+       let test_msg = TestCustomMessage::Pong;
  
        let mut intermediate_nodes = vec![];
        for i in 1..(num_nodes-1) {
                first_node_addresses: None,
        };
        nodes[0].messenger.send_onion_message_using_path(path, test_msg, None).unwrap();
-       nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Response);
+       nodes[num_nodes-1].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&nodes);
  }
  
  #[test]
  fn requests_peer_connection_for_buffered_messages() {
        let nodes = create_nodes(3);
-       let message = TestCustomMessage::Request;
+       let message = TestCustomMessage::Ping;
        let secp_ctx = Secp256k1::new();
        add_channel_to_graph(&nodes[0], &nodes[1], &secp_ctx, 42);
  
  #[test]
  fn drops_buffered_messages_waiting_for_peer_connection() {
        let nodes = create_nodes(3);
-       let message = TestCustomMessage::Request;
+       let message = TestCustomMessage::Ping;
        let secp_ctx = Secp256k1::new();
        add_channel_to_graph(&nodes[0], &nodes[1], &secp_ctx, 42);
  
@@@ -635,7 -767,7 +767,7 @@@ fn intercept_offline_peer_oms() 
                }
        }
  
-       let message = TestCustomMessage::Response;
+       let message = TestCustomMessage::Pong;
        let secp_ctx = Secp256k1::new();
        let intermediate_nodes = [ForwardNode { node_id: nodes[1].node_id, short_channel_id: None }];
        let blinded_path = BlindedPath::new_for_message(
        }
  
        nodes[1].messenger.forward_onion_message(onion_message, &final_node_vec[0].node_id).unwrap();
-       final_node_vec[0].custom_message_handler.expect_message(TestCustomMessage::Response);
+       final_node_vec[0].custom_message_handler.expect_message(TestCustomMessage::Pong);
        pass_along_path(&vec![nodes.remove(1), final_node_vec.remove(0)]);
  }