use printer::{Printer, Stat};
use peer::Peer;
use datastore::{AddressState, Store, U64Setting, RegexSetting};
+use timeout_stream::TimeoutStream;
use tokio::prelude::*;
use tokio::timer::Delay;
let peer = Delay::new(scan_time).then(move |_| {
printer.set_stat(Stat::NewConnection);
let timeout = store.get_u64(U64Setting::RunTimeout);
- Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+ Peer::new(node.clone(), Duration::from_secs(timeout), printer)
});
- tokio::spawn(peer.and_then(move |conn_split| {
+ tokio::spawn(peer.and_then(move |(mut write, read)| {
let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
peer_state.lock().unwrap().request = (requested_height, requested_block);
- let (mut write, read) = conn_split;
- read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
let mut state_lock = peer_state.lock().unwrap();
macro_rules! check_set_flag {
($recvd_flag: ident, $msg: expr) => { {
fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
let printer = unsafe { PRINTER.as_ref().unwrap() };
let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
- tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+ tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
printer.add_line("Connected to local peer".to_string(), false);
- let (mut trusted_write, trusted_read) = trusted_split;
let mut starting_height = 0;
- trusted_read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
if START_SHUTDOWN.load(Ordering::Relaxed) {
return future::err(());
}
future::err(())
})
}));
+
+ tokio::run(future::lazy(|| {
+ unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
+ }));
}