From 1ad3d257518100cc938b4dabfff8636414131ce0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 20 May 2019 14:51:09 -0400 Subject: [PATCH] Total runtime timeout, fix saves --- src/datastore.rs | 8 +++++--- src/main.rs | 17 ++++++++++------- src/peer.rs | 3 +-- src/printer.rs | 6 +++--- src/reader.rs | 4 ++-- src/timeout_stream.rs | 18 ++++++++++++++++-- 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/src/datastore.rs b/src/datastore.rs index 3e9a1b3..3628ac2 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -116,7 +116,7 @@ impl Store { future::ok((u64s, try_read!(l, Regex))) }).or_else(|_| -> future::FutureResult<(HashMap, Regex), ()> { let mut u64s = HashMap::with_capacity(15); - u64s.insert(U64Setting::ConnsPerSec, 50); + u64s.insert(U64Setting::ConnsPerSec, 10); u64s.insert(U64Setting::RunTimeout, 120); u64s.insert(U64Setting::WasGoodTimeout, 21600); u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0); @@ -299,7 +299,7 @@ impl Store { pub fn save_data(&'static self) -> impl Future { let settings_file = self.store.clone() + "/settings"; let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| { - let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", self.get_u64(U64Setting::ConnsPerSec), self.get_u64(U64Setting::RunTimeout), self.get_u64(U64Setting::WasGoodTimeout), @@ -314,7 +314,8 @@ impl Store { self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)), self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)), self.get_u64(U64Setting::RescanInterval(AddressState::Good)), - self.get_u64(U64Setting::RescanInterval(AddressState::WasGood))); + self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)), + self.get_regex(RegexSetting::SubverRegex).as_str()); write_all(f, settings_string).and_then(|(mut f, _)| { f.poll_sync_all() }).and_then(|_| { @@ -344,6 +345,7 @@ impl Store { AddressState::Good => 9u8, AddressState::WasGood => 10u8, }.to_string(); + nodes_buff += ","; nodes_buff += &node.last_services.to_string(); nodes_buff += "\n"; } diff --git a/src/main.rs b/src/main.rs index e03dd61..7f94090 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use bitcoin::util::hash::BitcoinHash; use printer::{Printer, Stat}; use peer::Peer; use datastore::{AddressState, Store, U64Setting, RegexSetting}; +use timeout_stream::TimeoutStream; use tokio::prelude::*; use tokio::timer::Delay; @@ -62,15 +63,14 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr) { let peer = Delay::new(scan_time).then(move |_| { printer.set_stat(Stat::NewConnection); let timeout = store.get_u64(U64Setting::RunTimeout); - Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run + Peer::new(node.clone(), Duration::from_secs(timeout), printer) }); - tokio::spawn(peer.and_then(move |conn_split| { + tokio::spawn(peer.and_then(move |(mut write, read)| { let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008; let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone(); peer_state.lock().unwrap().request = (requested_height, requested_block); - let (mut write, read) = conn_split; - read.map_err(|_| { () }).for_each(move |msg| { + TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| { let mut state_lock = peer_state.lock().unwrap(); macro_rules! check_set_flag { ($recvd_flag: ident, $msg: expr) => { { @@ -196,11 +196,10 @@ fn scan_net() { fn make_trusted_conn(trusted_sockaddr: SocketAddr) { let printer = unsafe { PRINTER.as_ref().unwrap() }; let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer); - tokio::spawn(trusted_peer.and_then(move |trusted_split| { + tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| { printer.add_line("Connected to local peer".to_string(), false); - let (mut trusted_write, trusted_read) = trusted_split; let mut starting_height = 0; - trusted_read.map_err(|_| { () }).for_each(move |msg| { + TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| { if START_SHUTDOWN.load(Ordering::Relaxed) { return future::err(()); } @@ -313,4 +312,8 @@ fn main() { future::err(()) }) })); + + tokio::run(future::lazy(|| { + unsafe { DATA_STORE.as_ref().unwrap() }.save_data() + })); } diff --git a/src/peer.rs b/src/peer.rs index 0d0fc3f..9276d08 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -18,7 +18,6 @@ use tokio::timer::Delay; use futures::sync::mpsc; use crate::printer::Printer; -use crate::timeout_stream::TimeoutStream; struct BytesCoder<'a>(&'a mut bytes::BytesMut); impl<'a> std::io::Write for BytesCoder<'a> { @@ -121,7 +120,7 @@ impl Peer { start_height: 0, relay: true, })); - future::ok((sender, TimeoutStream::new(read, timeout))) + future::ok((sender, read)) }) } } diff --git a/src/printer.rs b/src/printer.rs index f51fee5..4818d9a 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -34,14 +34,14 @@ impl Printer { std::thread::spawn(move || { loop { std::thread::sleep(std::time::Duration::from_secs(1)); - if START_SHUTDOWN.load(Ordering::Relaxed) { - break; - } let stdout = std::io::stdout(); let mut out = stdout.lock(); let stats = thread_arc.lock().unwrap(); + if START_SHUTDOWN.load(Ordering::Relaxed) && stats.connection_count == 0 { + break; + } out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?"); for line in stats.lines.iter() { diff --git a/src/reader.rs b/src/reader.rs index a6a1fb0..8250aae 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -68,8 +68,8 @@ pub fn read(store: &'static Store, printer: &'static Printer) { _ => err!(), } future::ok(()) - }).then(|_| { - println!("Exiting"); + }).then(move |_| { + printer.add_line("Shutting down...".to_string(), true); future::ok(()) })); } diff --git a/src/timeout_stream.rs b/src/timeout_stream.rs index 57664f8..0d0caea 100644 --- a/src/timeout_stream.rs +++ b/src/timeout_stream.rs @@ -6,18 +6,30 @@ use std::time::{Duration, Instant}; pub struct TimeoutStream where S : Stream { stream: S, next_deadline: Delay, + extend_on_recv: bool, timeout: Duration, } impl TimeoutStream where S : Stream { - pub fn new(stream: S, timeout: Duration) -> Self { + pub fn new_persistent(stream: S, timeout: Duration) -> Self { let next_deadline = Delay::new(Instant::now() + timeout); Self { stream, next_deadline, + extend_on_recv: true, timeout, } } + + pub fn new_timeout(stream: S, timeout: Instant) -> Self { + let next_deadline = Delay::new(timeout); + Self { + stream, + next_deadline, + extend_on_recv: false, + timeout: Duration::from_secs(0), + } + } } impl Stream for TimeoutStream where S : Stream { @@ -29,7 +41,9 @@ impl Stream for TimeoutStream where S : Stream { Ok(Async::NotReady) => { match self.stream.poll() { Ok(Async::Ready(v)) => { - self.next_deadline.reset(Instant::now() + self.timeout); + if self.extend_on_recv { + self.next_deadline.reset(Instant::now() + self.timeout); + } Ok(Async::Ready(v)) }, Ok(Async::NotReady) => Ok(Async::NotReady), -- 2.39.5