Implement shutdown, input reading
authorMatt Corallo <git@bluematt.me>
Sun, 19 May 2019 19:58:27 +0000 (15:58 -0400)
committerMatt Corallo <git@bluematt.me>
Sun, 19 May 2019 19:58:27 +0000 (15:58 -0400)
src/main.rs
src/printer.rs
src/reader.rs [new file with mode: 0644]

index ac1b0d4cdb458943f84a1edb46e144ae6000123c..e03dd615b9c0318cab50bb96d5f7d920ede7e3e0 100644 (file)
@@ -1,4 +1,5 @@
 mod printer;
+mod reader;
 mod peer;
 mod timeout_stream;
 mod datastore;
@@ -6,6 +7,7 @@ mod datastore;
 use std::env;
 use std::collections::HashMap;
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{Ordering, AtomicBool};
 use std::time::{Duration, Instant};
 use std::net::SocketAddr;
 
@@ -29,6 +31,7 @@ static mut HEADER_MAP: Option<Box<Mutex<HashMap<sha256d::Hash, u64>>>> = None;
 static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
 static mut DATA_STORE: Option<Box<Store>> = None;
 static mut PRINTER: Option<Box<Printer>> = None;
+pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
 
 struct PeerState {
        recvd_version: bool,
@@ -40,7 +43,8 @@ struct PeerState {
        request: (u64, sha256d::Hash),
 }
 
-fn scan_node(scan_time: Instant, node: SocketAddr) {
+pub fn scan_node(scan_time: Instant, node: SocketAddr) {
+       if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
        let printer = unsafe { PRINTER.as_ref().unwrap() };
        let store = unsafe { DATA_STORE.as_ref().unwrap() };
 
@@ -180,7 +184,9 @@ fn scan_net() {
                Delay::new(iter_time).then(|_| {
                        let store = unsafe { DATA_STORE.as_ref().unwrap() };
                        store.save_data().then(|_| {
-                               scan_net();
+                               if !START_SHUTDOWN.load(Ordering::Relaxed) {
+                                       scan_net();
+                               }
                                future::ok(())
                        })
                })
@@ -195,6 +201,9 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
                let (mut trusted_write, trusted_read) = trusted_split;
                let mut starting_height = 0;
                trusted_read.map_err(|_| { () }).for_each(move |msg| {
+                       if START_SHUTDOWN.load(Ordering::Relaxed) {
+                               return future::err(());
+                       }
                        match msg {
                                NetworkMessage::Version(ver) => {
                                        if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
@@ -263,8 +272,10 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
                        future::err(())
                })
        }).then(move |_: Result<(), ()>| {
-               printer.add_line("Lost connection from trusted peer".to_string(), true);
-               make_trusted_conn(trusted_sockaddr);
+               if !START_SHUTDOWN.load(Ordering::Relaxed) {
+                       printer.add_line("Lost connection from trusted peer".to_string(), true);
+                       make_trusted_conn(trusted_sockaddr);
+               }
                future::ok(())
        }));
 }
@@ -289,11 +300,14 @@ fn main() {
 
                Store::new(path).and_then(move |store| {
                        unsafe { DATA_STORE = Some(Box::new(store)) };
-                       unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) };
+                       let store = unsafe { DATA_STORE.as_ref().unwrap() };
+                       unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
 
                        let trusted_sockaddr: SocketAddr = addr.parse().unwrap();
                        make_trusted_conn(trusted_sockaddr);
 
+                       reader::read(store, unsafe { PRINTER.as_ref().unwrap() });
+
                        future::ok(())
                }).or_else(|_| {
                        future::err(())
index a23d023cb2d2457ce895643c6a0f84471e16a163..f51fee55bb116b9e49dd9c5b4f9273061b9a291e 100644 (file)
@@ -1,9 +1,12 @@
+use std::sync::atomic::Ordering;
 use std::collections::LinkedList;
 use std::sync::{Arc, Mutex};
 use std::io::Write;
 
 use crate::datastore::{Store, AddressState, U64Setting, RegexSetting};
 
+use crate::START_SHUTDOWN;
+
 pub enum Stat {
        HeaderCount(u64),
        NewConnection,
@@ -31,6 +34,9 @@ impl Printer {
                std::thread::spawn(move || {
                        loop {
                                std::thread::sleep(std::time::Duration::from_secs(1));
+                               if START_SHUTDOWN.load(Ordering::Relaxed) {
+                                       break;
+                               }
 
                                let stdout = std::io::stdout();
                                let mut out = stdout.lock();
diff --git a/src/reader.rs b/src/reader.rs
new file mode 100644 (file)
index 0000000..a6a1fb0
--- /dev/null
@@ -0,0 +1,75 @@
+use std::sync::atomic::Ordering;
+use std::io::BufReader;
+use std::net::SocketAddr;
+use std::time::Instant;
+
+use tokio::prelude::*;
+use tokio::io::{stdin, lines};
+
+use crate::printer::Printer;
+use crate::datastore::{Store, AddressState, U64Setting, RegexSetting};
+
+use crate::{START_SHUTDOWN, scan_node};
+
+use regex::Regex;
+
+pub fn read(store: &'static Store, printer: &'static Printer) {
+       tokio::spawn(lines(BufReader::new(stdin())).for_each(move |line| {
+               macro_rules! err {
+                       () => { {
+                               printer.add_line(format!("Unparsable input: \"{}\"", line), true);
+                               return future::ok(());
+                       } }
+               }
+               let mut line_iter = line.split(' ');
+               macro_rules! get_next_chunk {
+                       () => { {
+                               match line_iter.next() {
+                                       Some(c) => c,
+                                       None => err!(),
+                               }
+                       } }
+               }
+               macro_rules! try_parse_next_chunk {
+                       ($type: ty) => { {
+                               match get_next_chunk!().parse::<$type>() {
+                                       Ok(res) => res,
+                                       Err(_) => err!(),
+                               }
+                       } }
+               }
+               match get_next_chunk!() {
+                       "c" => store.set_u64(U64Setting::ConnsPerSec, try_parse_next_chunk!(u64)),
+                       "t" => store.set_u64(U64Setting::RunTimeout, try_parse_next_chunk!(u64)),
+                       "v" => store.set_u64(U64Setting::MinProtocolVersion, try_parse_next_chunk!(u64)),
+                       "w" => store.set_u64(U64Setting::WasGoodTimeout, try_parse_next_chunk!(u64)),
+                       "s" => store.set_regex(RegexSetting::SubverRegex, try_parse_next_chunk!(Regex)),
+                       "a" => scan_node(Instant::now(), try_parse_next_chunk!(SocketAddr)),
+                       "r" => {
+                               match try_parse_next_chunk!(u8) {
+                                       0 => store.set_u64(U64Setting::RescanInterval(AddressState::Untested), try_parse_next_chunk!(u64)),
+                                       1 => store.set_u64(U64Setting::RescanInterval(AddressState::LowBlockCount), try_parse_next_chunk!(u64)),
+                                       2 => store.set_u64(U64Setting::RescanInterval(AddressState::HighBlockCount), try_parse_next_chunk!(u64)),
+                                       3 => store.set_u64(U64Setting::RescanInterval(AddressState::LowVersion), try_parse_next_chunk!(u64)),
+                                       4 => store.set_u64(U64Setting::RescanInterval(AddressState::BadVersion), try_parse_next_chunk!(u64)),
+                                       5 => store.set_u64(U64Setting::RescanInterval(AddressState::NotFullNode), try_parse_next_chunk!(u64)),
+                                       6 => store.set_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_parse_next_chunk!(u64)),
+                                       7 => store.set_u64(U64Setting::RescanInterval(AddressState::Timeout), try_parse_next_chunk!(u64)),
+                                       8 => store.set_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_parse_next_chunk!(u64)),
+                                       9 => store.set_u64(U64Setting::RescanInterval(AddressState::Good), try_parse_next_chunk!(u64)),
+                                       10 => store.set_u64(U64Setting::RescanInterval(AddressState::WasGood), try_parse_next_chunk!(u64)),
+                                       _ => err!(),
+                               }
+                       },
+                       "q" => {
+                               START_SHUTDOWN.store(true, Ordering::SeqCst);
+                               return future::err(std::io::Error::new(std::io::ErrorKind::Other, ""));
+                       },
+                       _ => err!(),
+               }
+               future::ok(())
+       }).then(|_| {
+               println!("Exiting");
+               future::ok(())
+       }));
+}