+#[cfg(test)]
+mod tests {
+ use lightning::ln::features::*;
+ use lightning::ln::msgs::*;
+ use lightning::ln::peer_handler::{MessageHandler, PeerManager};
+ use lightning::util::events::*;
+ use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey};
+
+ use tokio::sync::mpsc;
+
+ use std::mem;
+ use std::sync::{Arc, Mutex};
+ use std::time::Duration;
+
+ pub struct TestLogger();
+ impl lightning::util::logger::Logger for TestLogger {
+ fn log(&self, record: &lightning::util::logger::Record) {
+ println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+ }
+ }
+
+ struct MsgHandler{
+ expected_pubkey: PublicKey,
+ pubkey_connected: mpsc::Sender<()>,
+ pubkey_disconnected: mpsc::Sender<()>,
+ msg_events: Mutex<Vec<MessageSendEvent>>,
+ }
+ impl RoutingMessageHandler for MsgHandler {
+ fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
+ fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
+ fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
+ fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
+ fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+ }
+ impl ChannelMessageHandler for MsgHandler {
+ fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
+ fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &AcceptChannel) {}
+ fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &FundingCreated) {}
+ fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &FundingSigned) {}
+ fn handle_funding_locked(&self, _their_node_id: &PublicKey, _msg: &FundingLocked) {}
+ fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {}
+ fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {}
+ fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {}
+ fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {}
+ fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailHTLC) {}
+ fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC) {}
+ fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &CommitmentSigned) {}
+ fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {}
+ fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {}
+ fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {}
+ fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
+ if *their_node_id == self.expected_pubkey {
+ self.pubkey_disconnected.clone().try_send(()).unwrap();
+ }
+ }
+ fn peer_connected(&self, their_node_id: &PublicKey, _msg: &Init) {
+ if *their_node_id == self.expected_pubkey {
+ self.pubkey_connected.clone().try_send(()).unwrap();
+ }
+ }
+ fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {}
+ fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {}
+ }
+ impl MessageSendEventsProvider for MsgHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ let mut ret = Vec::new();
+ mem::swap(&mut *self.msg_events.lock().unwrap(), &mut ret);
+ ret
+ }
+ }
+
+ async fn do_basic_connection_test() {
+ let secp_ctx = Secp256k1::new();
+ let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
+ let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
+ let a_pub = PublicKey::from_secret_key(&secp_ctx, &a_key);
+ let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
+
+ let (a_connected_sender, mut a_connected) = mpsc::channel(1);
+ let (a_disconnected_sender, mut a_disconnected) = mpsc::channel(1);
+ let a_handler = Arc::new(MsgHandler {
+ expected_pubkey: b_pub,
+ pubkey_connected: a_connected_sender,
+ pubkey_disconnected: a_disconnected_sender,
+ msg_events: Mutex::new(Vec::new()),
+ });
+ let a_manager = Arc::new(PeerManager::new(MessageHandler {
+ chan_handler: Arc::clone(&a_handler),
+ route_handler: Arc::clone(&a_handler) as Arc<dyn RoutingMessageHandler>,
+ }, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
+
+ let (b_connected_sender, mut b_connected) = mpsc::channel(1);
+ let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
+ let b_handler = Arc::new(MsgHandler {
+ expected_pubkey: a_pub,
+ pubkey_connected: b_connected_sender,
+ pubkey_disconnected: b_disconnected_sender,
+ msg_events: Mutex::new(Vec::new()),
+ });
+ let b_manager = Arc::new(PeerManager::new(MessageHandler {
+ chan_handler: Arc::clone(&b_handler),
+ route_handler: Arc::clone(&b_handler) as Arc<dyn RoutingMessageHandler>,
+ }, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
+
+ // We bind on localhost, hoping the environment is properly configured with a local
+ // address. This may not always be the case in containers and the like, so if this test is
+ // failing for you check that you have a loopback interface and it is configured with
+ // 127.0.0.1.
+ let (conn_a, conn_b) = if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9735") {
+ (std::net::TcpStream::connect("127.0.0.1:9735").unwrap(), listener.accept().unwrap().0)
+ } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:9999") {
+ (std::net::TcpStream::connect("127.0.0.1:9999").unwrap(), listener.accept().unwrap().0)
+ } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:46926") {
+ (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0)
+ } else { panic!("Failed to bind to v4 localhost on common ports"); };
+
+ let (sender, _receiver) = mpsc::channel(2);
+ let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap());
+ let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap());
+
+ tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap();
+ tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap();
+
+ a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError {
+ node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None }
+ });
+ assert!(a_disconnected.try_recv().is_err());
+ assert!(b_disconnected.try_recv().is_err());
+
+ a_manager.process_events();
+ tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap();
+ tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap();
+
+ fut_a.await;
+ fut_b.await;
+ }
+
+ #[tokio::test(threaded_scheduler)]
+ async fn basic_threaded_connection_test() {
+ do_basic_connection_test().await;
+ }
+ #[tokio::test]
+ async fn basic_unthreaded_connection_test() {
+ do_basic_connection_test().await;
+ }
+}