]> git.bitcoin.ninja Git - rust-lightning/commitdiff
`rustfmt`: Run on `lightning-net-tokio`
authorElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 08:05:05 +0000 (10:05 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 08:23:09 +0000 (10:23 +0200)
lightning-net-tokio/src/lib.rs

index 35124bab6a260b42cf44f3d05cfca78b885560f3..a537289c2c464f231e62704bfccbd2a487992890 100644 (file)
 
 #![deny(rustdoc::broken_intra_doc_links)]
 #![deny(rustdoc::private_intra_doc_links)]
-
 #![deny(missing_docs)]
 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
 
 use bitcoin::secp256k1::PublicKey;
 
 use tokio::net::TcpStream;
-use tokio::time;
 use tokio::sync::mpsc;
+use tokio::time;
 
+use lightning::ln::msgs::SocketAddress;
 use lightning::ln::peer_handler;
-use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::peer_handler::APeerManager;
-use lightning::ln::msgs::SocketAddress;
+use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 
-use std::ops::Deref;
-use std::task::{self, Poll};
 use std::future::Future;
+use std::hash::Hash;
 use std::net::SocketAddr;
 use std::net::TcpStream as StdTcpStream;
-use std::sync::{Arc, Mutex};
+use std::ops::Deref;
+use std::pin::Pin;
 use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{self, Poll};
 use std::time::Duration;
-use std::pin::Pin;
-use std::hash::Hash;
 
 static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
 
@@ -57,27 +56,34 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
 // define a trivial two- and three- select macro with the specific types we need and just use that.
 
 pub(crate) enum SelectorOutput {
-       A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
+       A(Option<()>),
+       B(Option<()>),
+       C(tokio::io::Result<()>),
 }
 
 pub(crate) struct TwoSelector<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
+       A: Future<Output = Option<()>> + Unpin,
+       B: Future<Output = Option<()>> + Unpin,
 > {
        pub a: A,
        pub b: B,
 }
 
-impl<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
-> Future for TwoSelector<A, B> {
+impl<A: Future<Output = Option<()>> + Unpin, B: Future<Output = Option<()>> + Unpin> Future
+       for TwoSelector<A, B>
+{
        type Output = SelectorOutput;
        fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
                match Pin::new(&mut self.a).poll(ctx) {
-                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+                       Poll::Ready(res) => {
+                               return Poll::Ready(SelectorOutput::A(res));
+                       },
                        Poll::Pending => {},
                }
                match Pin::new(&mut self.b).poll(ctx) {
-                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                       Poll::Ready(res) => {
+                               return Poll::Ready(SelectorOutput::B(res));
+                       },
                        Poll::Pending => {},
                }
                Poll::Pending
@@ -85,7 +91,9 @@ impl<
 }
 
 pub(crate) struct ThreeSelector<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
+       A: Future<Output = Option<()>> + Unpin,
+       B: Future<Output = Option<()>> + Unpin,
+       C: Future<Output = tokio::io::Result<()>> + Unpin,
 > {
        pub a: A,
        pub b: B,
@@ -93,20 +101,29 @@ pub(crate) struct ThreeSelector<
 }
 
 impl<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
-> Future for ThreeSelector<A, B, C> {
+               A: Future<Output = Option<()>> + Unpin,
+               B: Future<Output = Option<()>> + Unpin,
+               C: Future<Output = tokio::io::Result<()>> + Unpin,
+       > Future for ThreeSelector<A, B, C>
+{
        type Output = SelectorOutput;
        fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
                match Pin::new(&mut self.a).poll(ctx) {
-                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+                       Poll::Ready(res) => {
+                               return Poll::Ready(SelectorOutput::A(res));
+                       },
                        Poll::Pending => {},
                }
                match Pin::new(&mut self.b).poll(ctx) {
-                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                       Poll::Ready(res) => {
+                               return Poll::Ready(SelectorOutput::B(res));
+                       },
                        Poll::Pending => {},
                }
                match Pin::new(&mut self.c).poll(ctx) {
-                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+                       Poll::Ready(res) => {
+                               return Poll::Ready(SelectorOutput::C(res));
+                       },
                        Poll::Pending => {},
                }
                Poll::Pending
@@ -140,9 +157,10 @@ struct Connection {
 }
 impl Connection {
        async fn poll_event_process<PM: Deref + 'static + Send + Sync>(
-               peer_manager: PM,
-               mut event_receiver: mpsc::Receiver<()>,
-       ) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
+               peer_manager: PM, mut event_receiver: mpsc::Receiver<()>,
+       ) where
+               PM::Target: APeerManager<Descriptor = SocketDescriptor>,
+       {
                loop {
                        if event_receiver.recv().await.is_none() {
                                return;
@@ -152,12 +170,11 @@ impl Connection {
        }
 
        async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
-               peer_manager: PM,
-               us: Arc<Mutex<Self>>,
-               reader: Arc<TcpStream>,
-               mut read_wake_receiver: mpsc::Receiver<()>,
-               mut write_avail_receiver: mpsc::Receiver<()>,
-       ) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
+               peer_manager: PM, us: Arc<Mutex<Self>>, reader: Arc<TcpStream>,
+               mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>,
+       ) where
+               PM::Target: APeerManager<Descriptor = SocketDescriptor>,
+       {
                // Create a waker to wake up poll_event_process, above
                let (event_waker, event_receiver) = mpsc::channel(1);
                tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
@@ -178,7 +195,7 @@ impl Connection {
                        // closed.
                        // In this case, we do need to call peer_manager.socket_disconnected() to inform
                        // Rust-Lightning that the socket is gone.
-                       PeerDisconnected
+                       PeerDisconnected,
                }
                let disconnect_type = loop {
                        let read_paused = {
@@ -193,18 +210,21 @@ impl Connection {
                                TwoSelector {
                                        a: Box::pin(write_avail_receiver.recv()),
                                        b: Box::pin(read_wake_receiver.recv()),
-                               }.await
+                               }
+                               .await
                        } else {
                                ThreeSelector {
                                        a: Box::pin(write_avail_receiver.recv()),
                                        b: Box::pin(read_wake_receiver.recv()),
                                        c: Box::pin(reader.readable()),
-                               }.await
+                               }
+                               .await
                        };
                        match select_result {
                                SelectorOutput::A(v) => {
                                        assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
-                                       if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() {
+                                       if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err()
+                                       {
                                                break Disconnect::CloseConnection;
                                        }
                                },
@@ -215,11 +235,14 @@ impl Connection {
                                        debug_assert!(some.is_some());
                                },
                                SelectorOutput::C(res) => {
-                                       if res.is_err() { break Disconnect::PeerDisconnected; }
+                                       if res.is_err() {
+                                               break Disconnect::PeerDisconnected;
+                                       }
                                        match reader.try_read(&mut buf) {
                                                Ok(0) => break Disconnect::PeerDisconnected,
                                                Ok(len) => {
-                                                       let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
+                                                       let read_res =
+                                                               peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
                                                        let mut us_lock = us.lock().unwrap();
                                                        match read_res {
                                                                Ok(pause_read) => {
@@ -254,7 +277,9 @@ impl Connection {
                }
        }
 
-       fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
+       fn new(
+               stream: StdTcpStream,
+       ) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
                // We only ever need a channel of depth 1 here: if we returned a non-full write to the
                // PeerManager, we will eventually get notified that there is room in the socket to write
                // new bytes, which will generate an event. That event will be popped off the queue before
@@ -276,7 +301,7 @@ impl Connection {
                        read_waker,
                        read_paused: false,
                        rl_requested_disconnect: false,
-                       id
+                       id,
                }));
                (tokio_stream, write_receiver, read_receiver, conn)
        }
@@ -284,14 +309,12 @@ impl Connection {
 
 fn get_addr_from_stream(stream: &StdTcpStream) -> Option<SocketAddress> {
        match stream.peer_addr() {
-               Ok(SocketAddr::V4(sockaddr)) => Some(SocketAddress::TcpIpV4 {
-                       addr: sockaddr.ip().octets(),
-                       port: sockaddr.port(),
-               }),
-               Ok(SocketAddr::V6(sockaddr)) => Some(SocketAddress::TcpIpV6 {
-                       addr: sockaddr.ip().octets(),
-                       port: sockaddr.port(),
-               }),
+               Ok(SocketAddr::V4(sockaddr)) => {
+                       Some(SocketAddress::TcpIpV4 { addr: sockaddr.ip().octets(), port: sockaddr.port() })
+               },
+               Ok(SocketAddr::V6(sockaddr)) => {
+                       Some(SocketAddress::TcpIpV6 { addr: sockaddr.ip().octets(), port: sockaddr.port() })
+               },
                Err(_) => None,
        }
 }
@@ -303,17 +326,28 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<SocketAddress> {
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
 pub fn setup_inbound<PM: Deref + 'static + Send + Sync + Clone>(
-       peer_manager: PM,
-       stream: StdTcpStream,
-) -> impl std::future::Future<Output=()>
-where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
+       peer_manager: PM, stream: StdTcpStream,
+) -> impl std::future::Future<Output = ()>
+where
+       PM::Target: APeerManager<Descriptor = SocketDescriptor>,
+{
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
        #[cfg(test)]
        let last_us = Arc::clone(&us);
 
-       let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
-               let handle = tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver));
+       let handle_opt = if peer_manager
+               .as_ref()
+               .new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr)
+               .is_ok()
+       {
+               let handle = tokio::spawn(Connection::schedule_read(
+                       peer_manager,
+                       us,
+                       reader,
+                       read_receiver,
+                       write_receiver,
+               ));
                Some(handle)
        } else {
                // Note that we will skip socket_disconnected here, in accordance with the PeerManager
@@ -346,16 +380,20 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
 pub fn setup_outbound<PM: Deref + 'static + Send + Sync + Clone>(
-       peer_manager: PM,
-       their_node_id: PublicKey,
-       stream: StdTcpStream,
-) -> impl std::future::Future<Output=()>
-where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
+       peer_manager: PM, their_node_id: PublicKey, stream: StdTcpStream,
+) -> impl std::future::Future<Output = ()>
+where
+       PM::Target: APeerManager<Descriptor = SocketDescriptor>,
+{
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
        #[cfg(test)]
        let last_us = Arc::clone(&us);
-       let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
+       let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(
+               their_node_id,
+               SocketDescriptor::new(us.clone()),
+               remote_addr,
+       ) {
                let handle = tokio::spawn(async move {
                        // We should essentially always have enough room in a TCP socket buffer to send the
                        // initial 10s of bytes. However, tokio running in single-threaded mode will always
@@ -374,15 +412,18 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
                                                },
                                                _ => {
                                                        eprintln!("Failed to write first full message to socket!");
-                                                       peer_manager.as_ref().socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
+                                                       peer_manager
+                                                               .as_ref()
+                                                               .socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
                                                        break Err(());
-                                               }
+                                               },
                                        }
                                }
                        };
                        let timeout_send_fut = tokio::time::timeout(Duration::from_millis(100), send_fut);
                        if let Ok(Ok(())) = timeout_send_fut.await {
-                               Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
+                               Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)
+                                       .await;
                        }
                });
                Some(handle)
@@ -421,19 +462,25 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
 /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
 /// make progress.
 pub async fn connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
-       peer_manager: PM,
-       their_node_id: PublicKey,
-       addr: SocketAddr,
-) -> Option<impl std::future::Future<Output=()>>
-where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
+       peer_manager: PM, their_node_id: PublicKey, addr: SocketAddr,
+) -> Option<impl std::future::Future<Output = ()>>
+where
+       PM::Target: APeerManager<Descriptor = SocketDescriptor>,
+{
        let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) };
        if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
                Some(setup_outbound(peer_manager, their_node_id, stream))
-       } else { None }
+       } else {
+               None
+       }
 }
 
-const SOCK_WAKER_VTABLE: task::RawWakerVTable =
-       task::RawWakerVTable::new(clone_socket_waker, wake_socket_waker, wake_socket_waker_by_ref, drop_socket_waker);
+const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
+       clone_socket_waker,
+       wake_socket_waker,
+       wake_socket_waker_by_ref,
+       drop_socket_waker,
+);
 
 fn clone_socket_waker(orig_ptr: *const ()) -> task::RawWaker {
        let new_waker = unsafe { Arc::from_raw(orig_ptr as *const mpsc::Sender<()>) };
@@ -504,8 +551,11 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                        us.read_paused = false;
                        let _ = us.read_waker.try_send(());
                }
-               if data.is_empty() { return 0; }
-               let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&self.write_avail_sender)) };
+               if data.is_empty() {
+                       return 0;
+               }
+               let waker =
+                       unsafe { task::Waker::from_raw(write_avail_to_waker(&self.write_avail_sender)) };
                let mut ctx = task::Context::from_waker(&waker);
                let mut written_len = 0;
                loop {
@@ -515,11 +565,13 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                                                Ok(res) => {
                                                        debug_assert_ne!(res, 0);
                                                        written_len += res;
-                                                       if written_len == data.len() { return written_len; }
+                                                       if written_len == data.len() {
+                                                               return written_len;
+                                                       }
                                                },
                                                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                                                        continue;
-                                               }
+                                               },
                                                Err(_) => return written_len,
                                        }
                                },
@@ -568,15 +620,15 @@ impl Hash for SocketDescriptor {
 
 #[cfg(test)]
 mod tests {
+       use bitcoin::blockdata::constants::ChainHash;
+       use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
+       use bitcoin::Network;
+       use lightning::events::*;
        use lightning::ln::features::*;
        use lightning::ln::msgs::*;
-       use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringMessageHandler};
+       use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, PeerManager};
        use lightning::routing::gossip::NodeId;
-       use lightning::events::*;
        use lightning::util::test_utils::TestNodeSigner;
-       use bitcoin::Network;
-       use bitcoin::blockdata::constants::ChainHash;
-       use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey};
 
        use tokio::sync::mpsc;
 
@@ -588,11 +640,18 @@ mod tests {
        pub struct TestLogger();
        impl lightning::util::logger::Logger for TestLogger {
                fn log(&self, record: lightning::util::logger::Record) {
-                       println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+                       println!(
+                               "{:<5} [{} : {}, {}] {}",
+                               record.level.to_string(),
+                               record.module_path,
+                               record.file,
+                               record.line,
+                               record.args
+                       );
                }
        }
 
-       struct MsgHandler{
+       struct MsgHandler {
                expected_pubkey: PublicKey,
                pubkey_connected: mpsc::Sender<()>,
                pubkey_disconnected: mpsc::Sender<()>,
@@ -600,19 +659,63 @@ mod tests {
                msg_events: Mutex<Vec<MessageSendEvent>>,
        }
        impl RoutingMessageHandler for MsgHandler {
-               fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
-               fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
-               fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-               fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
-               fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> { None }
-               fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
-               fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
-               fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
-               fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
-               fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
-               fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
-               fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
-               fn processing_queue_high(&self) -> bool { false }
+               fn handle_node_announcement(
+                       &self, _msg: &NodeAnnouncement,
+               ) -> Result<bool, LightningError> {
+                       Ok(false)
+               }
+               fn handle_channel_announcement(
+                       &self, _msg: &ChannelAnnouncement,
+               ) -> Result<bool, LightningError> {
+                       Ok(false)
+               }
+               fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> {
+                       Ok(false)
+               }
+               fn get_next_channel_announcement(
+                       &self, _starting_point: u64,
+               ) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
+                       None
+               }
+               fn get_next_node_announcement(
+                       &self, _starting_point: Option<&NodeId>,
+               ) -> Option<NodeAnnouncement> {
+                       None
+               }
+               fn peer_connected(
+                       &self, _their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool,
+               ) -> Result<(), ()> {
+                       Ok(())
+               }
+               fn handle_reply_channel_range(
+                       &self, _their_node_id: &PublicKey, _msg: ReplyChannelRange,
+               ) -> Result<(), LightningError> {
+                       Ok(())
+               }
+               fn handle_reply_short_channel_ids_end(
+                       &self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd,
+               ) -> Result<(), LightningError> {
+                       Ok(())
+               }
+               fn handle_query_channel_range(
+                       &self, _their_node_id: &PublicKey, _msg: QueryChannelRange,
+               ) -> Result<(), LightningError> {
+                       Ok(())
+               }
+               fn handle_query_short_channel_ids(
+                       &self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds,
+               ) -> Result<(), LightningError> {
+                       Ok(())
+               }
+               fn provided_node_features(&self) -> NodeFeatures {
+                       NodeFeatures::empty()
+               }
+               fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
+                       InitFeatures::empty()
+               }
+               fn processing_queue_high(&self) -> bool {
+                       false
+               }
        }
        impl ChannelMessageHandler for MsgHandler {
                fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}
@@ -623,13 +726,20 @@ mod tests {
                fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {}
                fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {}
                fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {}
-               fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {}
+               fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {
+               }
                fn handle_update_fail_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailHTLC) {}
-               fn handle_update_fail_malformed_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC) {}
+               fn handle_update_fail_malformed_htlc(
+                       &self, _their_node_id: &PublicKey, _msg: &UpdateFailMalformedHTLC,
+               ) {
+               }
                fn handle_commitment_signed(&self, _their_node_id: &PublicKey, _msg: &CommitmentSigned) {}
                fn handle_revoke_and_ack(&self, _their_node_id: &PublicKey, _msg: &RevokeAndACK) {}
                fn handle_update_fee(&self, _their_node_id: &PublicKey, _msg: &UpdateFee) {}
-               fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {}
+               fn handle_announcement_signatures(
+                       &self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures,
+               ) {
+               }
                fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &ChannelUpdate) {}
                fn handle_open_channel_v2(&self, _their_node_id: &PublicKey, _msg: &OpenChannelV2) {}
                fn handle_accept_channel_v2(&self, _their_node_id: &PublicKey, _msg: &AcceptChannelV2) {}
@@ -655,16 +765,25 @@ mod tests {
                                self.pubkey_disconnected.clone().try_send(()).unwrap();
                        }
                }
-               fn peer_connected(&self, their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool) -> Result<(), ()> {
+               fn peer_connected(
+                       &self, their_node_id: &PublicKey, _init_msg: &Init, _inbound: bool,
+               ) -> Result<(), ()> {
                        if *their_node_id == self.expected_pubkey {
                                self.pubkey_connected.clone().try_send(()).unwrap();
                        }
                        Ok(())
                }
-               fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {}
+               fn handle_channel_reestablish(
+                       &self, _their_node_id: &PublicKey, _msg: &ChannelReestablish,
+               ) {
+               }
                fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {}
-               fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
-               fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
+               fn provided_node_features(&self) -> NodeFeatures {
+                       NodeFeatures::empty()
+               }
+               fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
+                       InitFeatures::empty()
+               }
                fn get_chain_hashes(&self) -> Option<Vec<ChainHash>> {
                        Some(vec![ChainHash::using_genesis_block(Network::Testnet)])
                }
@@ -690,7 +809,9 @@ mod tests {
                        (std::net::TcpStream::connect("127.0.0.1:9999").unwrap(), listener.accept().unwrap().0)
                } else if let Ok(listener) = std::net::TcpListener::bind("127.0.0.1:46926") {
                        (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0)
-               } else { panic!("Failed to bind to v4 localhost on common ports"); }
+               } else {
+                       panic!("Failed to bind to v4 localhost on common ports");
+               }
        }
 
        async fn do_basic_connection_test() {
@@ -712,10 +833,16 @@ mod tests {
                let a_msg_handler = MessageHandler {
                        chan_handler: Arc::clone(&a_handler),
                        route_handler: Arc::clone(&a_handler),
-                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+                       onion_message_handler: Arc::new(IgnoringMessageHandler {}),
+                       custom_message_handler: Arc::new(IgnoringMessageHandler {}),
                };
-               let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+               let a_manager = Arc::new(PeerManager::new(
+                       a_msg_handler,
+                       0,
+                       &[1; 32],
+                       Arc::new(TestLogger()),
+                       Arc::new(TestNodeSigner::new(a_key)),
+               ));
 
                let (b_connected_sender, mut b_connected) = mpsc::channel(1);
                let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
@@ -729,10 +856,16 @@ mod tests {
                let b_msg_handler = MessageHandler {
                        chan_handler: Arc::clone(&b_handler),
                        route_handler: Arc::clone(&b_handler),
-                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+                       onion_message_handler: Arc::new(IgnoringMessageHandler {}),
+                       custom_message_handler: Arc::new(IgnoringMessageHandler {}),
                };
-               let b_manager = Arc::new(PeerManager::new(b_msg_handler, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));
+               let b_manager = Arc::new(PeerManager::new(
+                       b_msg_handler,
+                       0,
+                       &[2; 32],
+                       Arc::new(TestLogger()),
+                       Arc::new(TestNodeSigner::new(b_key)),
+               ));
 
                // We bind on localhost, hoping the environment is properly configured with a local
                // address. This may not always be the case in containers and the like, so if this test is
@@ -747,7 +880,8 @@ mod tests {
                tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap();
 
                a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError {
-                       node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None }
+                       node_id: b_pub,
+                       action: ErrorAction::DisconnectPeer { msg: None },
                });
                assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst));
                assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst));
@@ -783,11 +917,17 @@ mod tests {
 
                let a_msg_handler = MessageHandler {
                        chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
-                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
-                       route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+                       onion_message_handler: Arc::new(IgnoringMessageHandler {}),
+                       route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler {}),
+                       custom_message_handler: Arc::new(IgnoringMessageHandler {}),
                };
-               let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+               let a_manager = Arc::new(PeerManager::new(
+                       a_msg_handler,
+                       0,
+                       &[1; 32],
+                       Arc::new(TestLogger()),
+                       Arc::new(TestNodeSigner::new(a_key)),
+               ));
 
                // Make two connections, one for an inbound and one for an outbound connection
                let conn_a = {
@@ -801,12 +941,8 @@ mod tests {
 
                // Call connection setup inside new tokio tasks.
                let manager_reference = Arc::clone(&a_manager);
-               tokio::spawn(async move {
-                       super::setup_inbound(manager_reference, conn_a).await
-               });
-               tokio::spawn(async move {
-                       super::setup_outbound(a_manager, b_pub, conn_b).await
-               });
+               tokio::spawn(async move { super::setup_inbound(manager_reference, conn_a).await });
+               tokio::spawn(async move { super::setup_outbound(a_manager, b_pub, conn_b).await });
        }
 
        #[tokio::test(flavor = "multi_thread")]