Move open_channel message handling into an event
[rust-lightning] / src / ln / peer_handler.rs
index 30900cab499a8407fb4c7029c76a2de75517276d..2472518d613a4199678a6dbe0c811d9fe42e38ea 100644 (file)
@@ -8,6 +8,7 @@ use util::events::{EventsProvider,Event};
 
 use std::collections::{HashMap,LinkedList};
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::{cmp,error,mem,hash,fmt};
 
 pub struct MessageHandler {
@@ -20,7 +21,9 @@ pub struct MessageHandler {
 /// implement Hash to meet the PeerManager API.
 /// For efficiency, Clone should be relatively cheap for this type.
 /// You probably want to just extend an int and put a file descriptor in a struct and implement
-/// send_data.
+/// send_data. Note that if you are using a higher-level net library that may close() itself, be
+/// careful to ensure you don't have races whereby you might register a new connection with an fd
+/// the same as a yet-to-be-disconnect_event()-ed.
 pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
        /// Attempts to send some data from the given Vec starting at the given offset to the peer.
        /// Returns the amount of data which was sent, possibly 0 if the socket has since disconnected.
@@ -86,6 +89,7 @@ pub struct PeerManager<Descriptor: SocketDescriptor> {
        peers: Mutex<PeerHolder<Descriptor>>,
        pending_events: Mutex<Vec<Event>>,
        our_node_secret: SecretKey,
+       initial_syncs_sent: AtomicUsize,
 }
 
 
@@ -101,6 +105,9 @@ macro_rules! encode_msg {
        }
 }
 
+//TODO: Really should do something smarter for this
+const INITIAL_SYNCS_TO_SEND: usize = 5;
+
 /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
 /// PeerIds may repeat, but only after disconnect_event() has been called.
 impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
@@ -110,6 +117,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
                        pending_events: Mutex::new(Vec::new()),
                        our_node_secret: our_node_secret,
+                       initial_syncs_sent: AtomicUsize::new(0),
                }
        }
 
@@ -333,9 +341,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                        peer.pending_read_is_header = true;
 
                                                                        insert_node_id = Some(peer.their_node_id.unwrap());
+                                                                       let mut local_features = msgs::LocalFeatures::new();
+                                                                       if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND {
+                                                                               self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
+                                                                               local_features.set_initial_routing_sync();
+                                                                       }
                                                                        encode_and_send_msg!(msgs::Init {
                                                                                global_features: msgs::GlobalFeatures::new(),
-                                                                               local_features: msgs::LocalFeatures::new(),
+                                                                               local_features,
                                                                        }, 16);
                                                                },
                                                                NextNoiseStep::ActThree => {
@@ -381,9 +394,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                peer.their_local_features = Some(msg.local_features);
 
                                                                                                if !peer.outbound {
+                                                                                                       let mut local_features = msgs::LocalFeatures::new();
+                                                                                                       if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND {
+                                                                                                               self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
+                                                                                                               local_features.set_initial_routing_sync();
+                                                                                                       }
                                                                                                        encode_and_send_msg!(msgs::Init {
                                                                                                                global_features: msgs::GlobalFeatures::new(),
-                                                                                                               local_features: msgs::LocalFeatures::new(),
+                                                                                                               local_features,
                                                                                                        }, 16);
                                                                                                }
                                                                                        },
@@ -393,8 +411,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        18 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::Ping::decode(&msg_data[2..]));
-                                                                                               let resp = msgs::Pong { byteslen: msg.ponglen };
-                                                                                               encode_and_send_msg!(resp, 19);
+                                                                                               if msg.ponglen < 65532 {
+                                                                                                       let resp = msgs::Pong { byteslen: msg.ponglen };
+                                                                                                       encode_and_send_msg!(resp, 19);
+                                                                                               }
                                                                                        },
                                                                                        19 => {
                                                                                                try_potential_decodeerror!(msgs::Pong::decode(&msg_data[2..]));
@@ -594,6 +614,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                //TODO: Handle upstream in some confused form so that upstream just knows
                                                //to call us somehow?
                                        },
+                                       Event::SendOpenChannel { ref node_id, ref msg } => {
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               continue;
+                                       },
                                        Event::SendFundingCreated { ref node_id, ref msg } => {
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: generate a DiscardFunding event indicating to the wallet that