X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-net-tokio%2Fsrc%2Flib.rs;h=45fe9b12f61b17a7b078805de1e6344efa91fbf3;hb=fce03975076d067b33435aff2d23349c0ec70ab6;hp=d5feb9936527ce156b2bec62ccec6c2f241245eb;hpb=877a5fc9c0d7455bcbde8dbd9d314664bdcfba90;p=rust-lightning diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index d5feb993..45fe9b12 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -43,7 +43,7 @@ //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { //! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; //! loop { -//! let event_handler = |event: &Event| { +//! let event_handler = |event: Event| { //! // Handle the event! //! }; //! channel_manager.await_persistable_update(); @@ -56,7 +56,7 @@ //! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, socket: TcpStream) { //! lightning_net_tokio::setup_inbound(peer_manager, socket); //! loop { -//! let event_handler = |event: &Event| { +//! let event_handler = |event: Event| { //! // Handle the event! //! }; //! channel_manager.await_persistable_update(); @@ -123,7 +123,11 @@ struct Connection { id: u64, } impl Connection { - async fn poll_event_process(peer_manager: Arc>, mut event_receiver: mpsc::Receiver<()>) where + async fn poll_event_process( + peer_manager: PM, + mut event_receiver: mpsc::Receiver<()>, + ) where + PM: Deref> + 'static + Send + Sync, CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, OMH: Deref + 'static + Send + Sync, @@ -134,7 +138,7 @@ impl Connection { OMH::Target: OnionMessageHandler + Send + Sync, L::Target: Logger + Send + Sync, UMH::Target: CustomMessageHandler + Send + Sync, - { + { loop { if event_receiver.recv().await.is_none() { return; @@ -143,7 +147,14 @@ impl Connection { } } - async fn schedule_read(peer_manager: Arc>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where + async fn schedule_read( + peer_manager: PM, + us: Arc>, + mut reader: io::ReadHalf, + mut read_wake_receiver: mpsc::Receiver<()>, + mut write_avail_receiver: mpsc::Receiver<()>, + ) where + PM: Deref> + 'static + Send + Sync + Clone, CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, OMH: Deref + 'static + Send + Sync, @@ -154,10 +165,10 @@ impl Connection { OMH::Target: OnionMessageHandler + 'static + Send + Sync, L::Target: Logger + 'static + Send + Sync, UMH::Target: CustomMessageHandler + 'static + Send + Sync, - { + { // Create a waker to wake up poll_event_process, above let (event_waker, event_receiver) = mpsc::channel(1); - tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver)); + tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver)); // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -272,7 +283,11 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -pub fn setup_inbound(peer_manager: Arc>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound( + peer_manager: PM, + stream: StdTcpStream, +) -> impl std::future::Future where + PM: Deref> + 'static + Send + Sync + Clone, CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, OMH: Deref + 'static + Send + Sync, @@ -321,7 +336,12 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound( + peer_manager: PM, + their_node_id: PublicKey, + stream: StdTcpStream, +) -> impl std::future::Future where + PM: Deref> + 'static + Send + Sync + Clone, CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, OMH: Deref + 'static + Send + Sync, @@ -399,7 +419,12 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound( + peer_manager: PM, + their_node_id: PublicKey, + addr: SocketAddr, +) -> Option> where + PM: Deref> + 'static + Send + Sync + Clone, CMH: Deref + 'static + Send + Sync, RMH: Deref + 'static + Send + Sync, OMH: Deref + 'static + Send + Sync, @@ -577,21 +602,21 @@ mod tests { fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { None } fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } - fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } + fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) -> Result<(), ()> { Ok(()) } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } - fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::known() } - fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::known() } + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } } impl ChannelMessageHandler for MsgHandler { - fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {} - fn handle_accept_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &AcceptChannel) {} + fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {} + fn handle_accept_channel(&self, _their_node_id: &PublicKey, _msg: &AcceptChannel) {} fn handle_funding_created(&self, _their_node_id: &PublicKey, _msg: &FundingCreated) {} fn handle_funding_signed(&self, _their_node_id: &PublicKey, _msg: &FundingSigned) {} fn handle_channel_ready(&self, _their_node_id: &PublicKey, _msg: &ChannelReady) {} - fn handle_shutdown(&self, _their_node_id: &PublicKey, _their_features: &InitFeatures, _msg: &Shutdown) {} + fn handle_shutdown(&self, _their_node_id: &PublicKey, _msg: &Shutdown) {} fn handle_closing_signed(&self, _their_node_id: &PublicKey, _msg: &ClosingSigned) {} fn handle_update_add_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateAddHTLC) {} fn handle_update_fulfill_htlc(&self, _their_node_id: &PublicKey, _msg: &UpdateFulfillHTLC) {} @@ -608,15 +633,16 @@ mod tests { self.pubkey_disconnected.clone().try_send(()).unwrap(); } } - fn peer_connected(&self, their_node_id: &PublicKey, _msg: &Init) { + fn peer_connected(&self, their_node_id: &PublicKey, _init_msg: &Init) -> Result<(), ()> { if *their_node_id == self.expected_pubkey { self.pubkey_connected.clone().try_send(()).unwrap(); } + Ok(()) } fn handle_channel_reestablish(&self, _their_node_id: &PublicKey, _msg: &ChannelReestablish) {} fn handle_error(&self, _their_node_id: &PublicKey, _msg: &ErrorMessage) {} - fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::known() } - fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::known() } + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } } impl MessageSendEventsProvider for MsgHandler { fn get_and_clear_pending_msg_events(&self) -> Vec {