Total runtime timeout, fix saves
[dnsseed-rust] / src / main.rs
index e03dd615b9c0318cab50bb96d5f7d920ede7e3e0..7f9409055c6102fca15108528acf8db37ab5c660 100644 (file)
@@ -22,6 +22,7 @@ use bitcoin::util::hash::BitcoinHash;
 use printer::{Printer, Stat};
 use peer::Peer;
 use datastore::{AddressState, Store, U64Setting, RegexSetting};
+use timeout_stream::TimeoutStream;
 
 use tokio::prelude::*;
 use tokio::timer::Delay;
@@ -62,15 +63,14 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr) {
        let peer = Delay::new(scan_time).then(move |_| {
                printer.set_stat(Stat::NewConnection);
                let timeout = store.get_u64(U64Setting::RunTimeout);
-               Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+               Peer::new(node.clone(), Duration::from_secs(timeout), printer)
        });
-       tokio::spawn(peer.and_then(move |conn_split| {
+       tokio::spawn(peer.and_then(move |(mut write, read)| {
                let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
                let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
                peer_state.lock().unwrap().request = (requested_height, requested_block);
 
-               let (mut write, read) = conn_split;
-               read.map_err(|_| { () }).for_each(move |msg| {
+               TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
                        let mut state_lock = peer_state.lock().unwrap();
                        macro_rules! check_set_flag {
                                ($recvd_flag: ident, $msg: expr) => { {
@@ -196,11 +196,10 @@ fn scan_net() {
 fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
        let printer = unsafe { PRINTER.as_ref().unwrap() };
        let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
-       tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+       tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
                printer.add_line("Connected to local peer".to_string(), false);
-               let (mut trusted_write, trusted_read) = trusted_split;
                let mut starting_height = 0;
-               trusted_read.map_err(|_| { () }).for_each(move |msg| {
+               TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
                        if START_SHUTDOWN.load(Ordering::Relaxed) {
                                return future::err(());
                        }
@@ -313,4 +312,8 @@ fn main() {
                        future::err(())
                })
        }));
+
+       tokio::run(future::lazy(|| {
+               unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
+       }));
 }