stream.set_nonblocking(true).unwrap();
let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
- (Arc::clone(&tokio_stream), write_receiver, read_receiver,
- Arc::new(Mutex::new(Self {
- writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
+ let id = ID_COUNTER.fetch_add(1, Ordering::AcqRel);
+ let writer = Some(Arc::clone(&tokio_stream));
+ let conn = Arc::new(Mutex::new(Self {
+ writer,
+ write_avail,
+ read_waker,
+ read_paused: false,
rl_requested_disconnect: false,
- id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
- })))
+ id
+ }));
+ (tokio_stream, write_receiver, read_receiver, conn)
}
}
let last_us = Arc::clone(&us);
let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
- Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
+ let handle = tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver));
+ Some(handle)
} else {
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
// requirements.
#[cfg(test)]
let last_us = Arc::clone(&us);
let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
- Some(tokio::spawn(async move {
+ let handle = tokio::spawn(async move {
// We should essentially always have enough room in a TCP socket buffer to send the
// initial 10s of bytes. However, tokio running in single-threaded mode will always
// fail writes and wake us back up later to write. Thus, we handle a single
// std::task::Poll::Pending but still expect to write the full set of bytes at once
// and use a relatively tight timeout.
- if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
+ let send_fut = async {
loop {
match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
v if v == initial_send.len() => break Ok(()),
}
}
}
- }).await {
+ };
+ let timeout_send_fut = tokio::time::timeout(Duration::from_millis(100), send_fut);
+ if let Ok(Ok(())) = timeout_send_fut.await {
Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
}
- }))
+ });
+ Some(handle)
} else {
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
// requirements.
addr: SocketAddr,
) -> Option<impl std::future::Future<Output=()>>
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
- if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
+ let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) };
+ if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }
}
mod tests {
use lightning::ln::features::*;
use lightning::ln::msgs::*;
- use lightning::ln::peer_handler::{MessageHandler, PeerManager};
+ use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringMessageHandler};
use lightning::routing::gossip::NodeId;
use lightning::events::*;
use lightning::util::test_utils::TestNodeSigner;
disconnected_flag: AtomicBool::new(false),
msg_events: Mutex::new(Vec::new()),
});
- let a_manager = Arc::new(PeerManager::new(MessageHandler {
+ let a_msg_handler = MessageHandler {
chan_handler: Arc::clone(&a_handler),
route_handler: Arc::clone(&a_handler),
- onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+ onion_message_handler: Arc::new(IgnoringMessageHandler{}),
+ custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+ };
+ let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
disconnected_flag: AtomicBool::new(false),
msg_events: Mutex::new(Vec::new()),
});
- let b_manager = Arc::new(PeerManager::new(MessageHandler {
+ let b_msg_handler = MessageHandler {
chan_handler: Arc::clone(&b_handler),
route_handler: Arc::clone(&b_handler),
- onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- }, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));
+ onion_message_handler: Arc::new(IgnoringMessageHandler{}),
+ custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+ };
+ let b_manager = Arc::new(PeerManager::new(b_msg_handler, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));
// We bind on localhost, hoping the environment is properly configured with a local
// address. This may not always be the case in containers and the like, so if this test is
let b_key = SecretKey::from_slice(&[2; 32]).unwrap();
let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
- let a_manager = Arc::new(PeerManager::new(MessageHandler {
+ let a_msg_handler = MessageHandler {
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
- onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
+ onion_message_handler: Arc::new(IgnoringMessageHandler{}),
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
- }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+ custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+ };
+ let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
// Make two connections, one for an inbound and one for an outbound connection
let conn_a = {