mod printer;
+mod reader;
mod peer;
mod timeout_stream;
mod datastore;
use std::env;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{Ordering, AtomicBool};
use std::time::{Duration, Instant};
use std::net::SocketAddr;
static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
static mut DATA_STORE: Option<Box<Store>> = None;
static mut PRINTER: Option<Box<Printer>> = None;
+pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
struct PeerState {
recvd_version: bool,
request: (u64, sha256d::Hash),
}
-fn scan_node(scan_time: Instant, node: SocketAddr) {
+pub fn scan_node(scan_time: Instant, node: SocketAddr) {
+ if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
let printer = unsafe { PRINTER.as_ref().unwrap() };
let store = unsafe { DATA_STORE.as_ref().unwrap() };
Delay::new(iter_time).then(|_| {
let store = unsafe { DATA_STORE.as_ref().unwrap() };
store.save_data().then(|_| {
- scan_net();
+ if !START_SHUTDOWN.load(Ordering::Relaxed) {
+ scan_net();
+ }
future::ok(())
})
})
let (mut trusted_write, trusted_read) = trusted_split;
let mut starting_height = 0;
trusted_read.map_err(|_| { () }).for_each(move |msg| {
+ if START_SHUTDOWN.load(Ordering::Relaxed) {
+ return future::err(());
+ }
match msg {
NetworkMessage::Version(ver) => {
if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
future::err(())
})
}).then(move |_: Result<(), ()>| {
- printer.add_line("Lost connection from trusted peer".to_string(), true);
- make_trusted_conn(trusted_sockaddr);
+ if !START_SHUTDOWN.load(Ordering::Relaxed) {
+ printer.add_line("Lost connection from trusted peer".to_string(), true);
+ make_trusted_conn(trusted_sockaddr);
+ }
future::ok(())
}));
}
Store::new(path).and_then(move |store| {
unsafe { DATA_STORE = Some(Box::new(store)) };
- unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) };
+ let store = unsafe { DATA_STORE.as_ref().unwrap() };
+ unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
let trusted_sockaddr: SocketAddr = addr.parse().unwrap();
make_trusted_conn(trusted_sockaddr);
+ reader::read(store, unsafe { PRINTER.as_ref().unwrap() });
+
future::ok(())
}).or_else(|_| {
future::err(())
+use std::sync::atomic::Ordering;
use std::collections::LinkedList;
use std::sync::{Arc, Mutex};
use std::io::Write;
use crate::datastore::{Store, AddressState, U64Setting, RegexSetting};
+use crate::START_SHUTDOWN;
+
pub enum Stat {
HeaderCount(u64),
NewConnection,
std::thread::spawn(move || {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
+ if START_SHUTDOWN.load(Ordering::Relaxed) {
+ break;
+ }
let stdout = std::io::stdout();
let mut out = stdout.lock();
--- /dev/null
+use std::sync::atomic::Ordering;
+use std::io::BufReader;
+use std::net::SocketAddr;
+use std::time::Instant;
+
+use tokio::prelude::*;
+use tokio::io::{stdin, lines};
+
+use crate::printer::Printer;
+use crate::datastore::{Store, AddressState, U64Setting, RegexSetting};
+
+use crate::{START_SHUTDOWN, scan_node};
+
+use regex::Regex;
+
+pub fn read(store: &'static Store, printer: &'static Printer) {
+ tokio::spawn(lines(BufReader::new(stdin())).for_each(move |line| {
+ macro_rules! err {
+ () => { {
+ printer.add_line(format!("Unparsable input: \"{}\"", line), true);
+ return future::ok(());
+ } }
+ }
+ let mut line_iter = line.split(' ');
+ macro_rules! get_next_chunk {
+ () => { {
+ match line_iter.next() {
+ Some(c) => c,
+ None => err!(),
+ }
+ } }
+ }
+ macro_rules! try_parse_next_chunk {
+ ($type: ty) => { {
+ match get_next_chunk!().parse::<$type>() {
+ Ok(res) => res,
+ Err(_) => err!(),
+ }
+ } }
+ }
+ match get_next_chunk!() {
+ "c" => store.set_u64(U64Setting::ConnsPerSec, try_parse_next_chunk!(u64)),
+ "t" => store.set_u64(U64Setting::RunTimeout, try_parse_next_chunk!(u64)),
+ "v" => store.set_u64(U64Setting::MinProtocolVersion, try_parse_next_chunk!(u64)),
+ "w" => store.set_u64(U64Setting::WasGoodTimeout, try_parse_next_chunk!(u64)),
+ "s" => store.set_regex(RegexSetting::SubverRegex, try_parse_next_chunk!(Regex)),
+ "a" => scan_node(Instant::now(), try_parse_next_chunk!(SocketAddr)),
+ "r" => {
+ match try_parse_next_chunk!(u8) {
+ 0 => store.set_u64(U64Setting::RescanInterval(AddressState::Untested), try_parse_next_chunk!(u64)),
+ 1 => store.set_u64(U64Setting::RescanInterval(AddressState::LowBlockCount), try_parse_next_chunk!(u64)),
+ 2 => store.set_u64(U64Setting::RescanInterval(AddressState::HighBlockCount), try_parse_next_chunk!(u64)),
+ 3 => store.set_u64(U64Setting::RescanInterval(AddressState::LowVersion), try_parse_next_chunk!(u64)),
+ 4 => store.set_u64(U64Setting::RescanInterval(AddressState::BadVersion), try_parse_next_chunk!(u64)),
+ 5 => store.set_u64(U64Setting::RescanInterval(AddressState::NotFullNode), try_parse_next_chunk!(u64)),
+ 6 => store.set_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_parse_next_chunk!(u64)),
+ 7 => store.set_u64(U64Setting::RescanInterval(AddressState::Timeout), try_parse_next_chunk!(u64)),
+ 8 => store.set_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_parse_next_chunk!(u64)),
+ 9 => store.set_u64(U64Setting::RescanInterval(AddressState::Good), try_parse_next_chunk!(u64)),
+ 10 => store.set_u64(U64Setting::RescanInterval(AddressState::WasGood), try_parse_next_chunk!(u64)),
+ _ => err!(),
+ }
+ },
+ "q" => {
+ START_SHUTDOWN.store(true, Ordering::SeqCst);
+ return future::err(std::io::Error::new(std::io::ErrorKind::Other, ""));
+ },
+ _ => err!(),
+ }
+ future::ok(())
+ }).then(|_| {
+ println!("Exiting");
+ future::ok(())
+ }));
+}