future::ok((u64s, try_read!(l, Regex)))
}).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
let mut u64s = HashMap::with_capacity(15);
- u64s.insert(U64Setting::ConnsPerSec, 50);
+ u64s.insert(U64Setting::ConnsPerSec, 10);
u64s.insert(U64Setting::RunTimeout, 120);
u64s.insert(U64Setting::WasGoodTimeout, 21600);
u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
let settings_file = self.store.clone() + "/settings";
let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
- let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+ let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
self.get_u64(U64Setting::ConnsPerSec),
self.get_u64(U64Setting::RunTimeout),
self.get_u64(U64Setting::WasGoodTimeout),
self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
- self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)));
+ self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
+ self.get_regex(RegexSetting::SubverRegex).as_str());
write_all(f, settings_string).and_then(|(mut f, _)| {
f.poll_sync_all()
}).and_then(|_| {
AddressState::Good => 9u8,
AddressState::WasGood => 10u8,
}.to_string();
+ nodes_buff += ",";
nodes_buff += &node.last_services.to_string();
nodes_buff += "\n";
}
use printer::{Printer, Stat};
use peer::Peer;
use datastore::{AddressState, Store, U64Setting, RegexSetting};
+use timeout_stream::TimeoutStream;
use tokio::prelude::*;
use tokio::timer::Delay;
let peer = Delay::new(scan_time).then(move |_| {
printer.set_stat(Stat::NewConnection);
let timeout = store.get_u64(U64Setting::RunTimeout);
- Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+ Peer::new(node.clone(), Duration::from_secs(timeout), printer)
});
- tokio::spawn(peer.and_then(move |conn_split| {
+ tokio::spawn(peer.and_then(move |(mut write, read)| {
let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
peer_state.lock().unwrap().request = (requested_height, requested_block);
- let (mut write, read) = conn_split;
- read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
let mut state_lock = peer_state.lock().unwrap();
macro_rules! check_set_flag {
($recvd_flag: ident, $msg: expr) => { {
fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
let printer = unsafe { PRINTER.as_ref().unwrap() };
let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
- tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+ tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
printer.add_line("Connected to local peer".to_string(), false);
- let (mut trusted_write, trusted_read) = trusted_split;
let mut starting_height = 0;
- trusted_read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
if START_SHUTDOWN.load(Ordering::Relaxed) {
return future::err(());
}
future::err(())
})
}));
+
+ tokio::run(future::lazy(|| {
+ unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
+ }));
}
use futures::sync::mpsc;
use crate::printer::Printer;
-use crate::timeout_stream::TimeoutStream;
struct BytesCoder<'a>(&'a mut bytes::BytesMut);
impl<'a> std::io::Write for BytesCoder<'a> {
start_height: 0,
relay: true,
}));
- future::ok((sender, TimeoutStream::new(read, timeout)))
+ future::ok((sender, read))
})
}
}
std::thread::spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
- if START_SHUTDOWN.load(Ordering::Relaxed) {
- break;
- }
let stdout = std::io::stdout();
let mut out = stdout.lock();
let stats = thread_arc.lock().unwrap();
+ if START_SHUTDOWN.load(Ordering::Relaxed) && stats.connection_count == 0 {
+ break;
+ }
out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?");
for line in stats.lines.iter() {
_ => err!(),
}
future::ok(())
- }).then(|_| {
- println!("Exiting");
+ }).then(move |_| {
+ printer.add_line("Shutting down...".to_string(), true);
future::ok(())
}));
}
pub struct TimeoutStream<S> where S : Stream {
stream: S,
next_deadline: Delay,
+ extend_on_recv: bool,
timeout: Duration,
}
impl<S> TimeoutStream<S> where S : Stream {
- pub fn new(stream: S, timeout: Duration) -> Self {
+ pub fn new_persistent(stream: S, timeout: Duration) -> Self {
let next_deadline = Delay::new(Instant::now() + timeout);
Self {
stream,
next_deadline,
+ extend_on_recv: true,
timeout,
}
}
+
+ pub fn new_timeout(stream: S, timeout: Instant) -> Self {
+ let next_deadline = Delay::new(timeout);
+ Self {
+ stream,
+ next_deadline,
+ extend_on_recv: false,
+ timeout: Duration::from_secs(0),
+ }
+ }
}
impl<S> Stream for TimeoutStream<S> where S : Stream {
Ok(Async::NotReady) => {
match self.stream.poll() {
Ok(Async::Ready(v)) => {
- self.next_deadline.reset(Instant::now() + self.timeout);
+ if self.extend_on_recv {
+ self.next_deadline.reset(Instant::now() + self.timeout);
+ }
Ok(Async::Ready(v))
},
Ok(Async::NotReady) => Ok(Async::NotReady),