From 8808d87877909d7d4f97d7ff320d8d165ece5800 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 19 May 2019 15:58:27 -0400 Subject: [PATCH] Implement shutdown, input reading --- src/main.rs | 24 ++++++++++++---- src/printer.rs | 6 ++++ src/reader.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 src/reader.rs diff --git a/src/main.rs b/src/main.rs index ac1b0d4..e03dd61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ mod printer; +mod reader; mod peer; mod timeout_stream; mod datastore; @@ -6,6 +7,7 @@ mod datastore; use std::env; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{Ordering, AtomicBool}; use std::time::{Duration, Instant}; use std::net::SocketAddr; @@ -29,6 +31,7 @@ static mut HEADER_MAP: Option>>> = None; static mut HEIGHT_MAP: Option>>> = None; static mut DATA_STORE: Option> = None; static mut PRINTER: Option> = None; +pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false); struct PeerState { recvd_version: bool, @@ -40,7 +43,8 @@ struct PeerState { request: (u64, sha256d::Hash), } -fn scan_node(scan_time: Instant, node: SocketAddr) { +pub fn scan_node(scan_time: Instant, node: SocketAddr) { + if START_SHUTDOWN.load(Ordering::Relaxed) { return; } let printer = unsafe { PRINTER.as_ref().unwrap() }; let store = unsafe { DATA_STORE.as_ref().unwrap() }; @@ -180,7 +184,9 @@ fn scan_net() { Delay::new(iter_time).then(|_| { let store = unsafe { DATA_STORE.as_ref().unwrap() }; store.save_data().then(|_| { - scan_net(); + if !START_SHUTDOWN.load(Ordering::Relaxed) { + scan_net(); + } future::ok(()) }) }) @@ -195,6 +201,9 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) { let (mut trusted_write, trusted_read) = trusted_split; let mut starting_height = 0; trusted_read.map_err(|_| { () }).for_each(move |msg| { + if START_SHUTDOWN.load(Ordering::Relaxed) { + return future::err(()); + } match msg { NetworkMessage::Version(ver) => { if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) { @@ -263,8 +272,10 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) { future::err(()) }) }).then(move |_: Result<(), ()>| { - printer.add_line("Lost connection from trusted peer".to_string(), true); - make_trusted_conn(trusted_sockaddr); + if !START_SHUTDOWN.load(Ordering::Relaxed) { + printer.add_line("Lost connection from trusted peer".to_string(), true); + make_trusted_conn(trusted_sockaddr); + } future::ok(()) })); } @@ -289,11 +300,14 @@ fn main() { Store::new(path).and_then(move |store| { unsafe { DATA_STORE = Some(Box::new(store)) }; - unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) }; + let store = unsafe { DATA_STORE.as_ref().unwrap() }; + unsafe { PRINTER = Some(Box::new(Printer::new(store))) }; let trusted_sockaddr: SocketAddr = addr.parse().unwrap(); make_trusted_conn(trusted_sockaddr); + reader::read(store, unsafe { PRINTER.as_ref().unwrap() }); + future::ok(()) }).or_else(|_| { future::err(()) diff --git a/src/printer.rs b/src/printer.rs index a23d023..f51fee5 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -1,9 +1,12 @@ +use std::sync::atomic::Ordering; use std::collections::LinkedList; use std::sync::{Arc, Mutex}; use std::io::Write; use crate::datastore::{Store, AddressState, U64Setting, RegexSetting}; +use crate::START_SHUTDOWN; + pub enum Stat { HeaderCount(u64), NewConnection, @@ -31,6 +34,9 @@ impl Printer { std::thread::spawn(move || { loop { std::thread::sleep(std::time::Duration::from_secs(1)); + if START_SHUTDOWN.load(Ordering::Relaxed) { + break; + } let stdout = std::io::stdout(); let mut out = stdout.lock(); diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 0000000..a6a1fb0 --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,75 @@ +use std::sync::atomic::Ordering; +use std::io::BufReader; +use std::net::SocketAddr; +use std::time::Instant; + +use tokio::prelude::*; +use tokio::io::{stdin, lines}; + +use crate::printer::Printer; +use crate::datastore::{Store, AddressState, U64Setting, RegexSetting}; + +use crate::{START_SHUTDOWN, scan_node}; + +use regex::Regex; + +pub fn read(store: &'static Store, printer: &'static Printer) { + tokio::spawn(lines(BufReader::new(stdin())).for_each(move |line| { + macro_rules! err { + () => { { + printer.add_line(format!("Unparsable input: \"{}\"", line), true); + return future::ok(()); + } } + } + let mut line_iter = line.split(' '); + macro_rules! get_next_chunk { + () => { { + match line_iter.next() { + Some(c) => c, + None => err!(), + } + } } + } + macro_rules! try_parse_next_chunk { + ($type: ty) => { { + match get_next_chunk!().parse::<$type>() { + Ok(res) => res, + Err(_) => err!(), + } + } } + } + match get_next_chunk!() { + "c" => store.set_u64(U64Setting::ConnsPerSec, try_parse_next_chunk!(u64)), + "t" => store.set_u64(U64Setting::RunTimeout, try_parse_next_chunk!(u64)), + "v" => store.set_u64(U64Setting::MinProtocolVersion, try_parse_next_chunk!(u64)), + "w" => store.set_u64(U64Setting::WasGoodTimeout, try_parse_next_chunk!(u64)), + "s" => store.set_regex(RegexSetting::SubverRegex, try_parse_next_chunk!(Regex)), + "a" => scan_node(Instant::now(), try_parse_next_chunk!(SocketAddr)), + "r" => { + match try_parse_next_chunk!(u8) { + 0 => store.set_u64(U64Setting::RescanInterval(AddressState::Untested), try_parse_next_chunk!(u64)), + 1 => store.set_u64(U64Setting::RescanInterval(AddressState::LowBlockCount), try_parse_next_chunk!(u64)), + 2 => store.set_u64(U64Setting::RescanInterval(AddressState::HighBlockCount), try_parse_next_chunk!(u64)), + 3 => store.set_u64(U64Setting::RescanInterval(AddressState::LowVersion), try_parse_next_chunk!(u64)), + 4 => store.set_u64(U64Setting::RescanInterval(AddressState::BadVersion), try_parse_next_chunk!(u64)), + 5 => store.set_u64(U64Setting::RescanInterval(AddressState::NotFullNode), try_parse_next_chunk!(u64)), + 6 => store.set_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_parse_next_chunk!(u64)), + 7 => store.set_u64(U64Setting::RescanInterval(AddressState::Timeout), try_parse_next_chunk!(u64)), + 8 => store.set_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_parse_next_chunk!(u64)), + 9 => store.set_u64(U64Setting::RescanInterval(AddressState::Good), try_parse_next_chunk!(u64)), + 10 => store.set_u64(U64Setting::RescanInterval(AddressState::WasGood), try_parse_next_chunk!(u64)), + _ => err!(), + } + }, + "q" => { + START_SHUTDOWN.store(true, Ordering::SeqCst); + return future::err(std::io::Error::new(std::io::ErrorKind::Other, "")); + }, + _ => err!(), + } + future::ok(()) + }).then(|_| { + println!("Exiting"); + future::ok(()) + })); +} -- 2.39.5