Total runtime timeout, fix saves
authorMatt Corallo <git@bluematt.me>
Mon, 20 May 2019 18:51:09 +0000 (14:51 -0400)
committerMatt Corallo <git@bluematt.me>
Mon, 20 May 2019 18:51:09 +0000 (14:51 -0400)
src/datastore.rs
src/main.rs
src/peer.rs
src/printer.rs
src/reader.rs
src/timeout_stream.rs

index 3e9a1b31714f8423a2a3cd81eb61cae5c20ce117..3628ac202ec5045e6c36f74d6333db56889dd2ea 100644 (file)
@@ -116,7 +116,7 @@ impl Store {
                        future::ok((u64s, try_read!(l, Regex)))
                }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
                        let mut u64s = HashMap::with_capacity(15);
-                       u64s.insert(U64Setting::ConnsPerSec, 50);
+                       u64s.insert(U64Setting::ConnsPerSec, 10);
                        u64s.insert(U64Setting::RunTimeout, 120);
                        u64s.insert(U64Setting::WasGoodTimeout, 21600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
@@ -299,7 +299,7 @@ impl Store {
        pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
                let settings_file = self.store.clone() + "/settings";
                let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
-                       let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+                       let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
                                self.get_u64(U64Setting::ConnsPerSec),
                                self.get_u64(U64Setting::RunTimeout),
                                self.get_u64(U64Setting::WasGoodTimeout),
@@ -314,7 +314,8 @@ impl Store {
                                self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
                                self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
                                self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
-                               self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)));
+                               self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
+                               self.get_regex(RegexSetting::SubverRegex).as_str());
                        write_all(f, settings_string).and_then(|(mut f, _)| {
                                f.poll_sync_all()
                        }).and_then(|_| {
@@ -344,6 +345,7 @@ impl Store {
                                                AddressState::Good => 9u8,
                                                AddressState::WasGood => 10u8,
                                        }.to_string();
+                                       nodes_buff += ",";
                                        nodes_buff += &node.last_services.to_string();
                                        nodes_buff += "\n";
                                }
index e03dd615b9c0318cab50bb96d5f7d920ede7e3e0..7f9409055c6102fca15108528acf8db37ab5c660 100644 (file)
@@ -22,6 +22,7 @@ use bitcoin::util::hash::BitcoinHash;
 use printer::{Printer, Stat};
 use peer::Peer;
 use datastore::{AddressState, Store, U64Setting, RegexSetting};
+use timeout_stream::TimeoutStream;
 
 use tokio::prelude::*;
 use tokio::timer::Delay;
@@ -62,15 +63,14 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr) {
        let peer = Delay::new(scan_time).then(move |_| {
                printer.set_stat(Stat::NewConnection);
                let timeout = store.get_u64(U64Setting::RunTimeout);
-               Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+               Peer::new(node.clone(), Duration::from_secs(timeout), printer)
        });
-       tokio::spawn(peer.and_then(move |conn_split| {
+       tokio::spawn(peer.and_then(move |(mut write, read)| {
                let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
                let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
                peer_state.lock().unwrap().request = (requested_height, requested_block);
 
-               let (mut write, read) = conn_split;
-               read.map_err(|_| { () }).for_each(move |msg| {
+               TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
                        let mut state_lock = peer_state.lock().unwrap();
                        macro_rules! check_set_flag {
                                ($recvd_flag: ident, $msg: expr) => { {
@@ -196,11 +196,10 @@ fn scan_net() {
 fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
        let printer = unsafe { PRINTER.as_ref().unwrap() };
        let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
-       tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+       tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
                printer.add_line("Connected to local peer".to_string(), false);
-               let (mut trusted_write, trusted_read) = trusted_split;
                let mut starting_height = 0;
-               trusted_read.map_err(|_| { () }).for_each(move |msg| {
+               TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
                        if START_SHUTDOWN.load(Ordering::Relaxed) {
                                return future::err(());
                        }
@@ -313,4 +312,8 @@ fn main() {
                        future::err(())
                })
        }));
+
+       tokio::run(future::lazy(|| {
+               unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
+       }));
 }
index 0d0fc3f9689be897d89b1d422c14cd269b30beff..9276d082d17bd8fe48281a9dc7d464e82dfc75f5 100644 (file)
@@ -18,7 +18,6 @@ use tokio::timer::Delay;
 use futures::sync::mpsc;
 
 use crate::printer::Printer;
-use crate::timeout_stream::TimeoutStream;
 
 struct BytesCoder<'a>(&'a mut bytes::BytesMut);
 impl<'a> std::io::Write for BytesCoder<'a> {
@@ -121,7 +120,7 @@ impl Peer {
                                        start_height: 0,
                                        relay: true,
                                }));
-                               future::ok((sender, TimeoutStream::new(read, timeout)))
+                               future::ok((sender, read))
                        })
        }
 }
index f51fee55bb116b9e49dd9c5b4f9273061b9a291e..4818d9af61ce4076dddc53de9a349e7cc3051c9f 100644 (file)
@@ -34,14 +34,14 @@ impl Printer {
                std::thread::spawn(move || {
                        loop {
                                std::thread::sleep(std::time::Duration::from_secs(1));
-                               if START_SHUTDOWN.load(Ordering::Relaxed) {
-                                       break;
-                               }
 
                                let stdout = std::io::stdout();
                                let mut out = stdout.lock();
 
                                let stats = thread_arc.lock().unwrap();
+                               if START_SHUTDOWN.load(Ordering::Relaxed) && stats.connection_count == 0 {
+                                       break;
+                               }
 
                                out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?");
                                for line in stats.lines.iter() {
index a6a1fb0a0280ddbbe33c02b9df928cbd639de1ff..8250aaed871c44b49d794b3be394ec36c213c95d 100644 (file)
@@ -68,8 +68,8 @@ pub fn read(store: &'static Store, printer: &'static Printer) {
                        _ => err!(),
                }
                future::ok(())
-       }).then(|_| {
-               println!("Exiting");
+       }).then(move |_| {
+               printer.add_line("Shutting down...".to_string(), true);
                future::ok(())
        }));
 }
index 57664f8f5de7fe0fb89b11d4b44180747be15fbd..0d0caea3d1829fb1c5adc4c8b1c364c01698535c 100644 (file)
@@ -6,18 +6,30 @@ use std::time::{Duration, Instant};
 pub struct TimeoutStream<S> where S : Stream {
        stream: S,
        next_deadline: Delay,
+       extend_on_recv: bool,
        timeout: Duration,
 }
 
 impl<S> TimeoutStream<S> where S : Stream {
-       pub fn new(stream: S, timeout: Duration) -> Self {
+       pub fn new_persistent(stream: S, timeout: Duration) -> Self {
                let next_deadline = Delay::new(Instant::now() + timeout);
                Self {
                        stream,
                        next_deadline,
+                       extend_on_recv: true,
                        timeout,
                }
        }
+
+       pub fn new_timeout(stream: S, timeout: Instant) -> Self {
+               let next_deadline = Delay::new(timeout);
+               Self {
+                       stream,
+                       next_deadline,
+                       extend_on_recv: false,
+                       timeout: Duration::from_secs(0),
+               }
+       }
 }
 
 impl<S> Stream for TimeoutStream<S> where S : Stream {
@@ -29,7 +41,9 @@ impl<S> Stream for TimeoutStream<S> where S : Stream {
                        Ok(Async::NotReady) => {
                                match self.stream.poll() {
                                        Ok(Async::Ready(v)) => {
-                                               self.next_deadline.reset(Instant::now() + self.timeout);
+                                               if self.extend_on_recv {
+                                                       self.next_deadline.reset(Instant::now() + self.timeout);
+                                               }
                                                Ok(Async::Ready(v))
                                        },
                                        Ok(Async::NotReady) => Ok(Async::NotReady),