use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
use lightning::util::logger::Logger;
+use std::ops::Deref;
use std::task;
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
id: u64,
}
impl Connection {
- async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+ async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+ {
loop {
if event_receiver.recv().await.is_none() {
return;
}
}
- async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+ async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
+ RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+ L::Target: Logger + 'static + Send + Sync,
+ UMH::Target: CustomMessageHandler + 'static + Send + Sync,
+ {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
let remote_addr = get_addr_from_stream(&stream);
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
let remote_addr = get_addr_from_stream(&stream);
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
- CMH: ChannelMessageHandler + 'static + Send + Sync,
- RMH: RoutingMessageHandler + 'static + Send + Sync,
- L: Logger + 'static + ?Sized + Send + Sync,
- UMH: CustomMessageHandler + 'static + Send + Sync {
+pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+ CMH: Deref + 'static + Send + Sync,
+ RMH: Deref + 'static + Send + Sync,
+ L: Deref + 'static + Send + Sync,
+ UMH: Deref + 'static + Send + Sync,
+ CMH::Target: ChannelMessageHandler + Send + Sync,
+ RMH::Target: RoutingMessageHandler + Send + Sync,
+ L::Target: Logger + Send + Sync,
+ UMH::Target: CustomMessageHandler + Send + Sync,
+{
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }