From: Elias Rohrer Date: Thu, 18 Jul 2024 08:05:05 +0000 (+0200) Subject: `rustfmt`: Run on `lightning-net-tokio` X-Git-Tag: v0.0.124-beta~32^2~1 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=abb3fc13b26323306c95e883add38eb3c48f8a26;p=rust-lightning `rustfmt`: Run on `lightning-net-tokio` --- diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 35124bab6..a537289c2 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -24,31 +24,30 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] - #![deny(missing_docs)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] use bitcoin::secp256k1::PublicKey; use tokio::net::TcpStream; -use tokio::time; use tokio::sync::mpsc; +use tokio::time; +use lightning::ln::msgs::SocketAddress; use lightning::ln::peer_handler; -use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::peer_handler::APeerManager; -use lightning::ln::msgs::SocketAddress; +use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; -use std::ops::Deref; -use std::task::{self, Poll}; use std::future::Future; +use std::hash::Hash; use std::net::SocketAddr; use std::net::TcpStream as StdTcpStream; -use std::sync::{Arc, Mutex}; +use std::ops::Deref; +use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{self, Poll}; use std::time::Duration; -use std::pin::Pin; -use std::hash::Hash; static ID_COUNTER: AtomicU64 = AtomicU64::new(0); @@ -57,27 +56,34 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0); // define a trivial two- and three- select macro with the specific types we need and just use that. pub(crate) enum SelectorOutput { - A(Option<()>), B(Option<()>), C(tokio::io::Result<()>), + A(Option<()>), + B(Option<()>), + C(tokio::io::Result<()>), } pub(crate) struct TwoSelector< - A: Future> + Unpin, B: Future> + Unpin + A: Future> + Unpin, + B: Future> + Unpin, > { pub a: A, pub b: B, } -impl< - A: Future> + Unpin, B: Future> + Unpin -> Future for TwoSelector { +impl> + Unpin, B: Future> + Unpin> Future + for TwoSelector +{ type Output = SelectorOutput; fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { match Pin::new(&mut self.a).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); }, + Poll::Ready(res) => { + return Poll::Ready(SelectorOutput::A(res)); + }, Poll::Pending => {}, } match Pin::new(&mut self.b).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Ready(res) => { + return Poll::Ready(SelectorOutput::B(res)); + }, Poll::Pending => {}, } Poll::Pending @@ -85,7 +91,9 @@ impl< } pub(crate) struct ThreeSelector< - A: Future> + Unpin, B: Future> + Unpin, C: Future> + Unpin + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, > { pub a: A, pub b: B, @@ -93,20 +101,29 @@ pub(crate) struct ThreeSelector< } impl< - A: Future> + Unpin, B: Future> + Unpin, C: Future> + Unpin -> Future for ThreeSelector { + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + > Future for ThreeSelector +{ type Output = SelectorOutput; fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { match Pin::new(&mut self.a).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); }, + Poll::Ready(res) => { + return Poll::Ready(SelectorOutput::A(res)); + }, Poll::Pending => {}, } match Pin::new(&mut self.b).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Ready(res) => { + return Poll::Ready(SelectorOutput::B(res)); + }, Poll::Pending => {}, } match Pin::new(&mut self.c).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); }, + Poll::Ready(res) => { + return Poll::Ready(SelectorOutput::C(res)); + }, Poll::Pending => {}, } Poll::Pending @@ -140,9 +157,10 @@ struct Connection { } impl Connection { async fn poll_event_process( - peer_manager: PM, - mut event_receiver: mpsc::Receiver<()>, - ) where PM::Target: APeerManager { + peer_manager: PM, mut event_receiver: mpsc::Receiver<()>, + ) where + PM::Target: APeerManager, + { loop { if event_receiver.recv().await.is_none() { return; @@ -152,12 +170,11 @@ impl Connection { } async fn schedule_read( - peer_manager: PM, - us: Arc>, - reader: Arc, - mut read_wake_receiver: mpsc::Receiver<()>, - mut write_avail_receiver: mpsc::Receiver<()>, - ) where PM::Target: APeerManager { + peer_manager: PM, us: Arc>, reader: Arc, + mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>, + ) where + PM::Target: APeerManager, + { // Create a waker to wake up poll_event_process, above let (event_waker, event_receiver) = mpsc::channel(1); tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver)); @@ -178,7 +195,7 @@ impl Connection { // closed. // In this case, we do need to call peer_manager.socket_disconnected() to inform // Rust-Lightning that the socket is gone. - PeerDisconnected + PeerDisconnected, } let disconnect_type = loop { let read_paused = { @@ -193,18 +210,21 @@ impl Connection { TwoSelector { a: Box::pin(write_avail_receiver.recv()), b: Box::pin(read_wake_receiver.recv()), - }.await + } + .await } else { ThreeSelector { a: Box::pin(write_avail_receiver.recv()), b: Box::pin(read_wake_receiver.recv()), c: Box::pin(reader.readable()), - }.await + } + .await }; match select_result { SelectorOutput::A(v) => { assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc! - if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() { + if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() + { break Disconnect::CloseConnection; } }, @@ -215,11 +235,14 @@ impl Connection { debug_assert!(some.is_some()); }, SelectorOutput::C(res) => { - if res.is_err() { break Disconnect::PeerDisconnected; } + if res.is_err() { + break Disconnect::PeerDisconnected; + } match reader.try_read(&mut buf) { Ok(0) => break Disconnect::PeerDisconnected, Ok(len) => { - let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]); + let read_res = + peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]); let mut us_lock = us.lock().unwrap(); match read_res { Ok(pause_read) => { @@ -254,7 +277,9 @@ impl Connection { } } - fn new(stream: StdTcpStream) -> (Arc, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new( + stream: StdTcpStream, + ) -> (Arc, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -276,7 +301,7 @@ impl Connection { read_waker, read_paused: false, rl_requested_disconnect: false, - id + id, })); (tokio_stream, write_receiver, read_receiver, conn) } @@ -284,14 +309,12 @@ impl Connection { fn get_addr_from_stream(stream: &StdTcpStream) -> Option { match stream.peer_addr() { - Ok(SocketAddr::V4(sockaddr)) => Some(SocketAddress::TcpIpV4 { - addr: sockaddr.ip().octets(), - port: sockaddr.port(), - }), - Ok(SocketAddr::V6(sockaddr)) => Some(SocketAddress::TcpIpV6 { - addr: sockaddr.ip().octets(), - port: sockaddr.port(), - }), + Ok(SocketAddr::V4(sockaddr)) => { + Some(SocketAddress::TcpIpV4 { addr: sockaddr.ip().octets(), port: sockaddr.port() }) + }, + Ok(SocketAddr::V6(sockaddr)) => { + Some(SocketAddress::TcpIpV6 { addr: sockaddr.ip().octets(), port: sockaddr.port() }) + }, Err(_) => None, } } @@ -303,17 +326,28 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option { /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. pub fn setup_inbound( - peer_manager: PM, - stream: StdTcpStream, -) -> impl std::future::Future -where PM::Target: APeerManager { + peer_manager: PM, stream: StdTcpStream, +) -> impl std::future::Future +where + PM::Target: APeerManager, +{ let remote_addr = get_addr_from_stream(&stream); let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(test)] let last_us = Arc::clone(&us); - let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() { - let handle = tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)); + let handle_opt = if peer_manager + .as_ref() + .new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr) + .is_ok() + { + let handle = tokio::spawn(Connection::schedule_read( + peer_manager, + us, + reader, + read_receiver, + write_receiver, + )); Some(handle) } else { // Note that we will skip socket_disconnected here, in accordance with the PeerManager @@ -346,16 +380,20 @@ where PM::Target: APeerManager { /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. pub fn setup_outbound( - peer_manager: PM, - their_node_id: PublicKey, - stream: StdTcpStream, -) -> impl std::future::Future -where PM::Target: APeerManager { + peer_manager: PM, their_node_id: PublicKey, stream: StdTcpStream, +) -> impl std::future::Future +where + PM::Target: APeerManager, +{ let remote_addr = get_addr_from_stream(&stream); let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(test)] let last_us = Arc::clone(&us); - let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) { + let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection( + their_node_id, + SocketDescriptor::new(us.clone()), + remote_addr, + ) { let handle = tokio::spawn(async move { // We should essentially always have enough room in a TCP socket buffer to send the // initial 10s of bytes. However, tokio running in single-threaded mode will always @@ -374,15 +412,18 @@ where PM::Target: APeerManager { }, _ => { eprintln!("Failed to write first full message to socket!"); - peer_manager.as_ref().socket_disconnected(&SocketDescriptor::new(Arc::clone(&us))); + peer_manager + .as_ref() + .socket_disconnected(&SocketDescriptor::new(Arc::clone(&us))); break Err(()); - } + }, } } }; let timeout_send_fut = tokio::time::timeout(Duration::from_millis(100), send_fut); if let Ok(Ok(())) = timeout_send_fut.await { - Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await; + Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver) + .await; } }); Some(handle) @@ -421,19 +462,25 @@ where PM::Target: APeerManager { /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to /// make progress. pub async fn connect_outbound( - peer_manager: PM, - their_node_id: PublicKey, - addr: SocketAddr, -) -> Option> -where PM::Target: APeerManager { + peer_manager: PM, their_node_id: PublicKey, addr: SocketAddr, +) -> Option> +where + PM::Target: APeerManager, +{ let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }; if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await { Some(setup_outbound(peer_manager, their_node_id, stream)) - } else { None } + } else { + None + } } -const SOCK_WAKER_VTABLE: task::RawWakerVTable = - task::RawWakerVTable::new(clone_socket_waker, wake_socket_waker, wake_socket_waker_by_ref, drop_socket_waker); +const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new( + clone_socket_waker, + wake_socket_waker, + wake_socket_waker_by_ref, + drop_socket_waker, +); fn clone_socket_waker(orig_ptr: *const ()) -> task::RawWaker { let new_waker = unsafe { Arc::from_raw(orig_ptr as *const mpsc::Sender<()>) }; @@ -504,8 +551,11 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { us.read_paused = false; let _ = us.read_waker.try_send(()); } - if data.is_empty() { return 0; } - let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&self.write_avail_sender)) }; + if data.is_empty() { + return 0; + } + let waker = + unsafe { task::Waker::from_raw(write_avail_to_waker(&self.write_avail_sender)) }; let mut ctx = task::Context::from_waker(&waker); let mut written_len = 0; loop { @@ -515,11 +565,13 @@ impl peer_handler::SocketDescriptor for SocketDescriptor { Ok(res) => { debug_assert_ne!(res, 0); written_len += res; - if written_len == data.len() { return written_len; } + if written_len == data.len() { + return written_len; + } }, Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { continue; - } + }, Err(_) => return written_len, } }, @@ -568,15 +620,15 @@ impl Hash for SocketDescriptor { #[cfg(test)] mod tests { + use bitcoin::blockdata::constants::ChainHash; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use bitcoin::Network; + use lightning::events::*; use lightning::ln::features::*; use lightning::ln::msgs::*; - use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringMessageHandler}; + use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, PeerManager}; use lightning::routing::gossip::NodeId; - use lightning::events::*; use lightning::util::test_utils::TestNodeSigner; - use bitcoin::Network; - use bitcoin::blockdata::constants::ChainHash; - use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey}; use tokio::sync::mpsc; @@ -588,11 +640,18 @@ mod tests { pub struct TestLogger(); impl lightning::util::logger::Logger for TestLogger { fn log(&self, record: lightning::util::logger::Record) { - println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); + println!( + "{:<5} [{} : {}, {}] {}", + record.level.to_string(), + record.module_path, + record.file, + record.line, + record.args + ); } } - struct MsgHandler{ + struct MsgHandler { expected_pubkey: PublicKey, pubkey_connected: mpsc::Sender<()>, pubkey_disconnected: mpsc::Sender<()>, @@ -600,19 +659,63 @@ mod tests { msg_events: Mutex>, } impl RoutingMessageHandler for MsgHandler { - fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result { Ok(false) } - fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result { Ok(false) } - fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { None } - fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option { None } - fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) } - fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } - fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } - fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) } - fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } - fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } - fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } - fn processing_queue_high(&self) -> bool { false } + fn handle_node_announcement( + &self, _msg: &NodeAnnouncement, + ) -> Result { + Ok(false) + } + fn handle_channel_announcement( + &self, _msg: &ChannelAnnouncement, + ) -> Result { + Ok(false) + } + fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { + Ok(false) + } + fn get_next_channel_announcement( + &self, _starting_point: u64, + ) -> Option<(ChannelAnnouncement, Option, Option)> { + None + } + fn get_next_node_announcement( + &self, _starting_point: Option<&NodeId>, + ) -> Option { + None + } + fn peer_connected( + &self, _their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool, + ) -> Result<(), ()> { + Ok(()) + } + fn handle_reply_channel_range( + &self, _their_node_id: &PublicKey, _msg: ReplyChannelRange, + ) -> Result<(), LightningError> { + Ok(()) + } + fn handle_reply_short_channel_ids_end( + &self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd, + ) -> Result<(), LightningError> { + Ok(()) + } + fn handle_query_channel_range( + &self, _their_node_id: &PublicKey, _msg: QueryChannelRange, + ) -> Result<(), LightningError> { + Ok(()) + } + fn handle_query_short_channel_ids( + &self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds, + ) -> Result<(), LightningError> { + Ok(()) + } + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { + InitFeatures::empty() + } + fn processing_queue_high(&self) -> bool { + false + } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {} @@ -623,13 +726,20 @@ mod tests { fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {} fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {} fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {} - fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {} + fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) { + } fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailHTLC) {} - fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC) {} + fn handle_update_fail_malformed_htlc( + &self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC, + ) { + } fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &CommitmentSigned) {} fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {} fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {} - fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {} + fn handle_announcement_signatures( + &self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures, + ) { + } fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &ChannelUpdate) {} fn handle_open_channel_v2(&self, _their_node_id: &PublicKey, _msg: &OpenChannelV2) {} fn handle_accept_channel_v2(&self, _their_node_id: &PublicKey, _msg: &AcceptChannelV2) {} @@ -655,16 +765,25 @@ mod tests { self.pubkey_disconnected.clone().try_send(()).unwrap(); } } - fn peer_connected(&self, their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool) -> Result<(), ()> { + fn peer_connected( + &self, their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool, + ) -> Result<(), ()> { if *their_node_id == self.expected_pubkey { self.pubkey_connected.clone().try_send(()).unwrap(); } Ok(()) } - fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {} + fn handle_channel_reestablish( + &self, _their_node_id: &PublicKey, _msg: &ChannelReestablish, + ) { + } fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {} - fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } - fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { + InitFeatures::empty() + } fn get_chain_hashes(&self) -> Option> { Some(vec![ChainHash::using_genesis_block(Network::Testnet)]) } @@ -690,7 +809,9 @@ mod tests { (std::net::TcpStream::connect("127.0.0.1:9999").unwrap(), listener.accept().unwrap().0) } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:46926") { (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0) - } else { panic!("Failed to bind to v4 localhost on common ports"); } + } else { + panic!("Failed to bind to v4 localhost on common ports"); + } } async fn do_basic_connection_test() { @@ -712,10 +833,16 @@ mod tests { let a_msg_handler = MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - onion_message_handler: Arc::new(IgnoringMessageHandler{}), - custom_message_handler: Arc::new(IgnoringMessageHandler{}), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), }; - let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); + let a_manager = Arc::new(PeerManager::new( + a_msg_handler, + 0, + &[1; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(a_key)), + )); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -729,10 +856,16 @@ mod tests { let b_msg_handler = MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - onion_message_handler: Arc::new(IgnoringMessageHandler{}), - custom_message_handler: Arc::new(IgnoringMessageHandler{}), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), }; - let b_manager = Arc::new(PeerManager::new(b_msg_handler, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key)))); + let b_manager = Arc::new(PeerManager::new( + b_msg_handler, + 0, + &[2; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(b_key)), + )); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is @@ -747,7 +880,8 @@ mod tests { tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { - node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None } + node_id: b_pub, + action: ErrorAction::DisconnectPeer { msg: None }, }); assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst)); assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst)); @@ -783,11 +917,17 @@ mod tests { let a_msg_handler = MessageHandler { chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()), - onion_message_handler: Arc::new(IgnoringMessageHandler{}), - route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - custom_message_handler: Arc::new(IgnoringMessageHandler{}), + onion_message_handler: Arc::new(IgnoringMessageHandler {}), + route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler {}), + custom_message_handler: Arc::new(IgnoringMessageHandler {}), }; - let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); + let a_manager = Arc::new(PeerManager::new( + a_msg_handler, + 0, + &[1; 32], + Arc::new(TestLogger()), + Arc::new(TestNodeSigner::new(a_key)), + )); // Make two connections, one for an inbound and one for an outbound connection let conn_a = { @@ -801,12 +941,8 @@ mod tests { // Call connection setup inside new tokio tasks. let manager_reference = Arc::clone(&a_manager); - tokio::spawn(async move { - super::setup_inbound(manager_reference, conn_a).await - }); - tokio::spawn(async move { - super::setup_outbound(a_manager, b_pub, conn_b).await - }); + tokio::spawn(async move { super::setup_inbound(manager_reference, conn_a).await }); + tokio::spawn(async move { super::setup_outbound(a_manager, b_pub, conn_b).await }); } #[tokio::test(flavor = "multi_thread")]