changes
[rust-lightning] / lightning / src / ln / peer_handler.rs
index c0035f4f42c09692970b5d472af6df0d97a4193b..a1620502b29ea0d207a12e5412d0c48f2a139190 100644 (file)
@@ -20,6 +20,7 @@ use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::{cmp,error,hash,fmt};
 
+
 use bitcoin_hashes::sha256::Hash as Sha256;
 use bitcoin_hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin_hashes::{HashEngine, Hash};
@@ -115,7 +116,9 @@ struct Peer {
        pending_read_is_header: bool,
 
        sync_status: InitSyncTracker,
-}
+
+       ping_tracker: u8,
+}      
 
 impl Peer {
        /// Returns true if the channel announcements/updates for the given channel should be
@@ -286,6 +289,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pending_read_is_header: false,
 
                        sync_status: InitSyncTracker::NoSyncRequested,
+
+                       ping_tracker: 0,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -322,6 +327,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pending_read_is_header: false,
 
                        sync_status: InitSyncTracker::NoSyncRequested,
+
+                       ping_tracker: 0,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -679,10 +686,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        encode_and_send_msg!(resp, 19);
                                                                                                }
                                                                                        },
-                                                                                       19 => {
-                                                                                               try_potential_decodeerror!(msgs::Pong::read(&mut reader));
+                                                                                       19 => { 
+                                                                                               peer.ping_tracker = 0;
+                                                                                               try_potential_decodeerror!(msgs::Pong::read(&mut reader)); 
                                                                                        },
-
                                                                                        // Channel control:
                                                                                        32 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader));
@@ -1088,8 +1095,133 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        }
                };
        }
+
+
+
+/*tto
+get all the descriptors (this would take the lock)
+
+do_read_event on all the Descriptors
+
+action_if_no_ping (also ping peers) this would take the lock
+
+
+
+
+*/
+
+fn get_des(&self) -> Vec<Descriptor> {
+       let mut des_set: Vec<Descriptor> = Vec::new();
+       let mut peers_lock = self.peers.lock().unwrap();
+       let peers = peers_lock.borrow_parts();
+                       
+       for (Descriptor, mut Peer) in peers.peers.iter_mut() {
+
+                               //read all events that have been sent from the Peer
+                               let mut descriptor = Descriptor.clone();
+                               des_set.push(descriptor);
+       }
+       des_set
+
+}
+
+fn mass_do_read_event(&self, mut des_set: Vec<Descriptor>)
+{
+       for (mut Descriptor) in des_set.iter_mut(){
+               let data: Vec<u8> = Vec::new();
+                               
+               let res = match self.do_read_event(&mut Descriptor, data){
+                                       Ok(pause_read) => pause_read,
+                                       Err(e) => panic!("something is wrong"),
+               };
+       }
+
+}
+
+fn action_if_no_ping(&self){
+
+                       let mut peers_lock = self.peers.lock().unwrap();
+                       let peers = peers_lock.borrow_parts();
+                       
+                       for (Descriptor, mut Peer) in peers.peers.iter_mut() {
+
+                               // Disconect the Peer if there is an outstanding ping for which we have not been ponged
+                               if Peer.ping_tracker > 0 {
+                                       self.disconnect_event(Descriptor);
+                               }
+
+                       else {
+               
+                       let ping = msgs::Ping {
+                               ponglen: 64,
+                               byteslen: 64,
+                       };
+
+                       Peer.pending_outbound_buffer.push_back(Peer.channel_encryptor.encrypt_message(&encode_msg!(ping, 18)));
+
+                       let mut descriptor = Descriptor.clone();
+                       self.do_attempt_write_data(&mut descriptor, &mut Peer);
+                       Peer.ping_tracker += 1;
+               
+
+                       }
+               }
+
 }
 
+///s
+       pub fn tto(&self){
+               let mut des_set: Vec<Descriptor> = self.get_des();
+               self.mass_do_read_event(des_set);
+               self.action_if_no_ping();
+       }
+
+       /// insure we recieved pong message from all peers or disconnect the peers then ping all peers
+       pub fn timer_tick_occurred(&self){
+
+                       let mut peers_lock = self.peers.lock().unwrap();
+                       let peers = peers_lock.borrow_parts();
+                       
+                       for (Descriptor, mut Peer) in peers.peers.iter_mut() {
+
+                               //read all events that have been sent from the Peer
+                               let mut descriptor = Descriptor.clone();
+                               let data: Vec<u8> = Vec::new();
+                               
+                               let res = match self.do_read_event(&mut descriptor, data){
+                                       Ok(pause_read) => pause_read,
+                                       Err(e) => panic!("something is wrong"),
+                               };
+                               
+                               // Disconect the Peer if there is an outstanding ping for which we have not been ponged
+                               if Peer.ping_tracker > 0 {
+                                       self.disconnect_event(Descriptor);
+                               }
+
+
+                       else {
+               
+                       let ping = msgs::Ping {
+                               ponglen: 64,
+                               byteslen: 64,
+                       };
+
+                       Peer.pending_outbound_buffer.push_back(Peer.channel_encryptor.encrypt_message(&encode_msg!(ping, 18)));
+
+                       let mut descriptor = Descriptor.clone();
+                       self.do_attempt_write_data(&mut descriptor, &mut Peer);
+                       Peer.ping_tracker += 1;
+               
+
+                       }
+               }
+       }
+}
+
+
+
+
+
 #[cfg(test)]
 mod tests {
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@@ -1097,6 +1229,9 @@ mod tests {
        use util::events;
        use util::test_utils;
        use util::logger::Logger;
+       use util::ser::{Writeable, Writer, Readable};
+
+       use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 
        use secp256k1::Secp256k1;
        use secp256k1::key::{SecretKey, PublicKey};
@@ -1159,7 +1294,6 @@ mod tests {
 
                let secp_ctx = Secp256k1::new();
                let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
-
                let chan_handler = test_utils::TestChannelMessageHandler::new();
                chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
                        node_id: their_id,
@@ -1171,4 +1305,35 @@ mod tests {
                peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
+
+       #[test]
+       fn ttt(){
+
+               let mut peers = create_network(2);
+               establish_connection(&peers[0], &peers[1]);
+               assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
+
+
+
+               for (peer_manager) in peers.iter(){
+                       for (key, val) in  peer_manager.peers.lock().unwrap().peers.iter_mut(){
+                               let secp_ctx = Secp256k1::new();
+                               val.their_node_id = std::option::Option::Some(PublicKey::from_secret_key(&secp_ctx, &peer_manager.our_node_secret));
+
+                       }
+               }
+               //bring the state to noise::state::finished::
+               peers[0].tto();
+               //peers[1].process_events();
+
+               //assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
+
+               //peers[0].timer_tick_occurred();
+               //peers[0].timer_tick_occurred();
+
+               //assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
+
+       }
+
 }
+       
\ No newline at end of file