use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::peer_handler::CustomMessageHandler;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
use lightning::util::logger::Logger;
use std::task;
+use std::net::IpAddr;
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
use std::sync::{Arc, Mutex};
id: u64,
}
impl Connection {
+ async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
+ CMH: ChannelMessageHandler + 'static + Send + Sync,
+ RMH: RoutingMessageHandler + 'static + Send + Sync,
+ L: Logger + 'static + ?Sized + Send + Sync,
+ UMH: CustomMessageHandler + 'static + Send + Sync {
+ loop {
+ if event_receiver.recv().await.is_none() {
+ return;
+ }
+ peer_manager.process_events();
+ }
+ }
+
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
- CMH: ChannelMessageHandler + 'static,
- RMH: RoutingMessageHandler + 'static,
- L: Logger + 'static + ?Sized,
- UMH: CustomMessageHandler + 'static {
+ CMH: ChannelMessageHandler + 'static + Send + Sync,
+ RMH: RoutingMessageHandler + 'static + Send + Sync,
+ L: Logger + 'static + ?Sized + Send + Sync,
+ UMH: CustomMessageHandler + 'static + Send + Sync {
+ // Create a waker to wake up poll_event_process, above
+ let (event_waker, event_receiver) = mpsc::channel(1);
+ tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
+
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];
Err(_) => break Disconnect::PeerDisconnected,
},
}
- peer_manager.process_events();
+ let _ = event_waker.try_send(());
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
+ let ip_addr = stream.peer_addr().unwrap();
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);
- let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone())) {
+ let handle_opt = if let Ok(_) = peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone()), match ip_addr.ip() {
+ IpAddr::V4(ip) => Some(NetAddress::IPv4 {
+ addr: ip.octets(),
+ port: ip_addr.port(),
+ }),
+ IpAddr::V6(ip) => Some(NetAddress::IPv6 {
+ addr: ip.octets(),
+ port: ip_addr.port(),
+ }),
+ }) {
Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
} else {
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync,
UMH: CustomMessageHandler + 'static + Send + Sync {
+ let ip_addr = stream.peer_addr().unwrap();
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);
-
- let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
+ let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), match ip_addr.ip() {
+ IpAddr::V4(ip) => Some(NetAddress::IPv4 {
+ addr: ip.octets(),
+ port: ip_addr.port(),
+ }),
+ IpAddr::V6(ip) => Some(NetAddress::IPv6 {
+ addr: ip.octets(),
+ port: ip_addr.port(),
+ }),
+ }) {
Some(tokio::spawn(async move {
// We should essentially always have enough room in a TCP socket buffer to send the
// initial 10s of bytes. However, tokio running in single-threaded mode will always