X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=a1620502b29ea0d207a12e5412d0c48f2a139190;hb=547972966b932e46b73d07ccc30787d739f898cd;hp=c0035f4f42c09692970b5d472af6df0d97a4193b;hpb=06091cee0fd29549e5e24c673bf361ab3a562529;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index c0035f4f..a1620502 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp,error,hash,fmt}; + use bitcoin_hashes::sha256::Hash as Sha256; use bitcoin_hashes::sha256::HashEngine as Sha256Engine; use bitcoin_hashes::{HashEngine, Hash}; @@ -115,7 +116,9 @@ struct Peer { pending_read_is_header: bool, sync_status: InitSyncTracker, -} + + ping_tracker: u8, +} impl Peer { /// Returns true if the channel announcements/updates for the given channel should be @@ -286,6 +289,8 @@ impl PeerManager { pending_read_is_header: false, sync_status: InitSyncTracker::NoSyncRequested, + + ping_tracker: 0, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -322,6 +327,8 @@ impl PeerManager { pending_read_is_header: false, sync_status: InitSyncTracker::NoSyncRequested, + + ping_tracker: 0, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -679,10 +686,10 @@ impl PeerManager { encode_and_send_msg!(resp, 19); } }, - 19 => { - try_potential_decodeerror!(msgs::Pong::read(&mut reader)); + 19 => { + peer.ping_tracker = 0; + try_potential_decodeerror!(msgs::Pong::read(&mut reader)); }, - // Channel control: 32 => { let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader)); @@ -1088,8 +1095,133 @@ impl PeerManager { } }; } + + + +/*tto +get all the descriptors (this would take the lock) + +do_read_event on all the Descriptors + +action_if_no_ping (also ping peers) this would take the lock + + + + +*/ + +fn get_des(&self) -> Vec { + let mut des_set: Vec = Vec::new(); + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); + + for (Descriptor, mut Peer) in peers.peers.iter_mut() { + + //read all events that have been sent from the Peer + let mut descriptor = Descriptor.clone(); + des_set.push(descriptor); + } + des_set + +} + +fn mass_do_read_event(&self, mut des_set: Vec) +{ + for (mut Descriptor) in des_set.iter_mut(){ + let data: Vec = Vec::new(); + + let res = match self.do_read_event(&mut Descriptor, data){ + Ok(pause_read) => pause_read, + Err(e) => panic!("something is wrong"), + }; + } + +} + +fn action_if_no_ping(&self){ + + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); + + for (Descriptor, mut Peer) in peers.peers.iter_mut() { + + // Disconect the Peer if there is an outstanding ping for which we have not been ponged + if Peer.ping_tracker > 0 { + self.disconnect_event(Descriptor); + } + + else { + + let ping = msgs::Ping { + ponglen: 64, + byteslen: 64, + }; + + Peer.pending_outbound_buffer.push_back(Peer.channel_encryptor.encrypt_message(&encode_msg!(ping, 18))); + + let mut descriptor = Descriptor.clone(); + self.do_attempt_write_data(&mut descriptor, &mut Peer); + Peer.ping_tracker += 1; + + + } + } + } +///s + pub fn tto(&self){ + let mut des_set: Vec = self.get_des(); + self.mass_do_read_event(des_set); + self.action_if_no_ping(); + } + + /// insure we recieved pong message from all peers or disconnect the peers then ping all peers + pub fn timer_tick_occurred(&self){ + + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); + + for (Descriptor, mut Peer) in peers.peers.iter_mut() { + + //read all events that have been sent from the Peer + let mut descriptor = Descriptor.clone(); + let data: Vec = Vec::new(); + + let res = match self.do_read_event(&mut descriptor, data){ + Ok(pause_read) => pause_read, + Err(e) => panic!("something is wrong"), + }; + + // Disconect the Peer if there is an outstanding ping for which we have not been ponged + if Peer.ping_tracker > 0 { + self.disconnect_event(Descriptor); + } + + + else { + + let ping = msgs::Ping { + ponglen: 64, + byteslen: 64, + }; + + Peer.pending_outbound_buffer.push_back(Peer.channel_encryptor.encrypt_message(&encode_msg!(ping, 18))); + + let mut descriptor = Descriptor.clone(); + self.do_attempt_write_data(&mut descriptor, &mut Peer); + Peer.ping_tracker += 1; + + + } + } + } +} + + + + + #[cfg(test)] mod tests { use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; @@ -1097,6 +1229,9 @@ mod tests { use util::events; use util::test_utils; use util::logger::Logger; + use util::ser::{Writeable, Writer, Readable}; + + use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use secp256k1::Secp256k1; use secp256k1::key::{SecretKey, PublicKey}; @@ -1159,7 +1294,6 @@ mod tests { let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); - let chan_handler = test_utils::TestChannelMessageHandler::new(); chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, @@ -1171,4 +1305,35 @@ mod tests { peers[0].process_events(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); } + + #[test] + fn ttt(){ + + let mut peers = create_network(2); + establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + + + + for (peer_manager) in peers.iter(){ + for (key, val) in peer_manager.peers.lock().unwrap().peers.iter_mut(){ + let secp_ctx = Secp256k1::new(); + val.their_node_id = std::option::Option::Some(PublicKey::from_secret_key(&secp_ctx, &peer_manager.our_node_secret)); + + } + } + //bring the state to noise::state::finished:: + peers[0].tto(); + //peers[1].process_events(); + + //assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + + //peers[0].timer_tick_occurred(); + //peers[0].timer_tick_occurred(); + + //assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + + } + } + \ No newline at end of file