From 1e90008ac857dbf642addc27cf92ef4d7c76b51d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 18 May 2019 15:36:04 -0400 Subject: [PATCH] Initial checkin --- Cargo.toml | 13 ++ src/datastore.rs | 160 ++++++++++++++++++++++++ src/main.rs | 285 ++++++++++++++++++++++++++++++++++++++++++ src/peer.rs | 130 +++++++++++++++++++ src/printer.rs | 162 ++++++++++++++++++++++++ src/timeout_stream.rs | 42 +++++++ 6 files changed, 792 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/datastore.rs create mode 100644 src/main.rs create mode 100644 src/peer.rs create mode 100644 src/printer.rs create mode 100644 src/timeout_stream.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..88b3deb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dnsseed-rust" +version = "0.1.0" +authors = ["matt"] +edition = "2018" + +[dependencies] +bitcoin = { path = "../Bitcoin/rust-bitcoin" } +bitcoin_hashes = "0.3" +tokio = "0.1" +bytes = "0.4" +futures = "0.1" +rand = "0.6" diff --git a/src/datastore.rs b/src/datastore.rs new file mode 100644 index 0000000..1013a9e --- /dev/null +++ b/src/datastore.rs @@ -0,0 +1,160 @@ +use std::{cmp, mem}; +use std::collections::{HashMap, hash_map}; +use std::sync::RwLock; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use bitcoin::network::address::Address; + +use rand::thread_rng; +use rand::seq::SliceRandom; + +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +pub enum AddressState { + Untested, + LowBlockCount, + HighBlockCount, + LowVersion, + BadVersion, + NotFullNode, + ProtocolViolation, + Timeout, + TimeoutDuringRequest, + Good, + WasGood, +} + +#[derive(Hash, PartialEq, Eq)] +pub enum U64Setting { + ConnsPerSec, + RunTimeout, + WasGoodTimeout, + RescanInterval(AddressState), + MinProtocolVersion, +} + +#[derive(Hash, PartialEq, Eq)] +pub enum StringSetting { + SubverRegex, +} + +struct Nodes { + good_node_services: HashMap>, + nodes_to_state: HashMap, + state_next_scan: HashMap>, +} + +pub struct Store { + u64_settings: RwLock>, + subver_regex: RwLock, + nodes: RwLock, +} + +impl Store { + pub fn new() -> Store { + let mut u64s = HashMap::with_capacity(15); + u64s.insert(U64Setting::ConnsPerSec, 50); + u64s.insert(U64Setting::RunTimeout, 120); + u64s.insert(U64Setting::WasGoodTimeout, 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0); + u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600); + u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200); + u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800); + u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800); + u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX + let mut state_vecs = HashMap::with_capacity(11); + state_vecs.insert(AddressState::Untested, Vec::new()); + state_vecs.insert(AddressState::LowBlockCount, Vec::new()); + state_vecs.insert(AddressState::HighBlockCount, Vec::new()); + state_vecs.insert(AddressState::LowVersion, Vec::new()); + state_vecs.insert(AddressState::BadVersion, Vec::new()); + state_vecs.insert(AddressState::NotFullNode, Vec::new()); + state_vecs.insert(AddressState::ProtocolViolation, Vec::new()); + state_vecs.insert(AddressState::Timeout, Vec::new()); + state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new()); + state_vecs.insert(AddressState::Good, Vec::new()); + state_vecs.insert(AddressState::WasGood, Vec::new()); + let mut good_node_services = HashMap::with_capacity(64); + for i in 0..64 { + good_node_services.insert(i, Vec::new()); + } + Store { + u64_settings: RwLock::new(u64s), + subver_regex: RwLock::new(".*".to_string()), + nodes: RwLock::new(Nodes { + good_node_services, + nodes_to_state: HashMap::new(), + state_next_scan: state_vecs, + }), + } + } + + pub fn get_u64(&self, setting: U64Setting) -> u64 { + *self.u64_settings.read().unwrap().get(&setting).unwrap() + } + + pub fn get_node_count(&self, state: AddressState) -> usize { + self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len() + } + + pub fn get_string(&self, _setting: StringSetting) -> String { + self.subver_regex.read().unwrap().clone() + } + + pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) { + let mut nodes = self.nodes.write().unwrap(); + let cur_time = Instant::now(); + for &(_, ref addr) in addresses { + if let Ok(socketaddr) = addr.socket_addr() { + match nodes.nodes_to_state.entry(socketaddr.clone()) { + hash_map::Entry::Vacant(e) => { + e.insert(AddressState::Untested); + nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr)); + }, + hash_map::Entry::Occupied(_) => {}, + } + } else { + //TODO: Handle onions + } + } + } + + pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) { + let mut nodes = self.nodes.write().unwrap(); + let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap(); + if *state_ref == AddressState::Good && state != AddressState::Good { + *state_ref = AddressState::WasGood; + nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((Instant::now(), addr)); + } else { + *state_ref = state; + nodes.state_next_scan.get_mut(&state).unwrap().push((Instant::now(), addr)); + } + if state == AddressState::Good { + + } + } + + pub fn get_next_scan_nodes(&self) -> Vec { + let mut res = Vec::with_capacity(600); + let cur_time = Instant::now(); + let mut nodes = self.nodes.write().unwrap(); + for (state, state_nodes) in nodes.state_next_scan.iter_mut() { + let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state))); + let split_point = cmp::min(cmp::min(600 - res.len(), 60), + state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx)); + let mut new_nodes = state_nodes.split_off(split_point); + mem::swap(&mut new_nodes, state_nodes); + for (_, node) in new_nodes.drain(..) { + res.push(node); + } + } + res.shuffle(&mut thread_rng()); + res + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..d58b1ad --- /dev/null +++ b/src/main.rs @@ -0,0 +1,285 @@ +mod printer; +mod peer; +mod timeout_stream; +mod datastore; + +use std::env; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use std::net::SocketAddr; + +use bitcoin_hashes::sha256d; + +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::network::constants::Network; +use bitcoin::network::message::NetworkMessage; +use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory, InvType}; +use bitcoin::util::hash::BitcoinHash; + +use printer::{Printer, Stat}; +use peer::Peer; +use datastore::{AddressState, Store, U64Setting}; + +use tokio::prelude::*; +use tokio::timer::Delay; + +static mut HIGHEST_HEADER: Option>> = None; +static mut HEADER_MAP: Option>>> = None; +static mut HEIGHT_MAP: Option>>> = None; +static mut DATA_STORE: Option> = None; +static mut PRINTER: Option> = None; + +struct PeerState { + recvd_version: bool, + recvd_verack: bool, + recvd_addrs: bool, + recvd_block: bool, + node_services: u64, + fail_reason: AddressState, + request: (u64, sha256d::Hash), +} + +fn scan_net() { + tokio::spawn(future::lazy(|| { + let store = unsafe { DATA_STORE.as_ref().unwrap() }; + let printer = unsafe { PRINTER.as_ref().unwrap() }; + + let mut scan_nodes = store.get_next_scan_nodes(); + let timeout = store.get_u64(U64Setting::RunTimeout); + let per_iter_time = Duration::from_millis(1000 / store.get_u64(U64Setting::ConnsPerSec)); + let mut iter_time = Instant::now(); + let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008; + let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone(); + + for node in scan_nodes.drain(..) { + let peer_state = Arc::new(Mutex::new(PeerState { + recvd_version: false, + recvd_verack: false, + recvd_addrs: false, + recvd_block: false, + node_services: 0, + fail_reason: AddressState::Timeout, + request: (requested_height, requested_block), + })); + let final_peer_state = Arc::clone(&peer_state); + let peer = Delay::new(iter_time).then(move |_| { + printer.set_stat(Stat::NewConnection); + Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run + }); + iter_time += per_iter_time; + tokio::spawn(peer.and_then(move |conn_split| { + let (mut write, read) = conn_split; + read.map_err(|_| { () }).for_each(move |msg| { + let mut state_lock = peer_state.lock().unwrap(); + macro_rules! check_set_flag { + ($recvd_flag: ident, $msg: expr) => { { + if state_lock.$recvd_flag { + state_lock.fail_reason = AddressState::ProtocolViolation; + printer.add_line(format!("Updating {} to ProtocolViolation due to dup {}", node, $msg), true); + state_lock.$recvd_flag = false; + return future::err(()); + } + state_lock.$recvd_flag = true; + } } + } + state_lock.fail_reason = AddressState::TimeoutDuringRequest; + match msg { + NetworkMessage::Version(ver) => { + if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008*2 { + state_lock.fail_reason = AddressState::HighBlockCount; + return future::err(()); + } + if (ver.start_height as u64) < state_lock.request.0 { + printer.add_line(format!("Updating {} to LowBlockCount ({} < {})", node, ver.start_height, state_lock.request.0), true); + state_lock.fail_reason = AddressState::LowBlockCount; + return future::err(()); + } + let min_version = store.get_u64(U64Setting::MinProtocolVersion); + if (ver.version as u64) < min_version { + printer.add_line(format!("Updating {} to LowVersion ({} < {})", node, ver.version, min_version), true); + state_lock.fail_reason = AddressState::LowVersion; + return future::err(()); + } + if ver.services & 1 != 1 { + printer.add_line(format!("Updating {} to NotFullNode (services {:x})", node, ver.services), true); + state_lock.fail_reason = AddressState::NotFullNode; + return future::err(()); + } + check_set_flag!(recvd_version, "version"); + state_lock.node_services = ver.services; + if let Err(_) = write.try_send(NetworkMessage::Verack) { + return future::err(()); + } + }, + NetworkMessage::Verack => { + check_set_flag!(recvd_verack, "verack"); + if let Err(_) = write.try_send(NetworkMessage::GetAddr) { + return future::err(()); + } + if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory { + inv_type: InvType::WitnessBlock, + hash: state_lock.request.1, + }])) { + return future::err(()); + } + }, + NetworkMessage::Ping(v) => { + if let Err(_) = write.try_send(NetworkMessage::Pong(v)) { + return future::err(()) + } + }, + NetworkMessage::Addr(addrs) => { + if addrs.len() > 1 { + check_set_flag!(recvd_addrs, "addr"); + unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs); + } + }, + NetworkMessage::Block(block) => { + if block.header.bitcoin_hash() != state_lock.request.1 || + !block.check_merkle_root() || !block.check_witness_commitment() { + state_lock.fail_reason = AddressState::ProtocolViolation; + printer.add_line(format!("Updating {} to ProtocolViolation due to bad block", node), true); + return future::err(()); + } + check_set_flag!(recvd_block, "block"); + }, + _ => {}, + } + future::ok(()) + }).then(|_| { + future::err(()) + }) + }).then(move |_: Result<(), ()>| { + let printer = unsafe { PRINTER.as_ref().unwrap() }; + let store = unsafe { DATA_STORE.as_ref().unwrap() }; + + printer.set_stat(Stat::ConnectionClosed); + + let state_lock = final_peer_state.lock().unwrap(); + if state_lock.recvd_version && state_lock.recvd_verack && + state_lock.recvd_addrs && state_lock.recvd_block { + store.set_node_state(node, AddressState::Good, state_lock.node_services); + } else { + if state_lock.fail_reason == AddressState::Timeout || state_lock.fail_reason == AddressState::TimeoutDuringRequest { + printer.add_line(format!("Updating {} to Timeout[DuringRequest]", node), true); + } + assert!(state_lock.fail_reason != AddressState::Good); + store.set_node_state(node, state_lock.fail_reason, 0); + } + future::ok(()) + })); + } + Delay::new(iter_time).then(|_| { + scan_net(); + future::ok(()) + }) + })); +} + +fn make_trusted_conn(trusted_sockaddr: SocketAddr) { + let printer = unsafe { PRINTER.as_ref().unwrap() }; + let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer); + tokio::spawn(trusted_peer.and_then(move |trusted_split| { + printer.add_line("Connected to local peer".to_string(), false); + let (mut trusted_write, trusted_read) = trusted_split; + let mut starting_height = 0; + trusted_read.map_err(|_| { () }).for_each(move |msg| { + match msg { + NetworkMessage::Version(ver) => { + if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) { + return future::err(()) + } + starting_height = ver.start_height; + }, + NetworkMessage::Verack => { + if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage { + version: 70015, + locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()], + stop_hash: Default::default(), + })) { + return future::err(()); + } + if let Err(_) = trusted_write.try_send(NetworkMessage::GetAddr) { + return future::err(()); + } + }, + NetworkMessage::Addr(addrs) => { + unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs); + }, + NetworkMessage::Headers(headers) => { + if headers.is_empty() { + return future::ok(()); + } + let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap(); + let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap(); + if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() { + for i in 0..headers.len() { + let hash = headers[i].bitcoin_hash(); + if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash { + return future::err(()); + } + header_map.insert(headers[i].bitcoin_hash(), height + 1 + (i as u64)); + height_map.insert(height + 1 + (i as u64), headers[i].bitcoin_hash()); + } + let top_height = height + headers.len() as u64; + *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap() + = (headers.last().unwrap().bitcoin_hash(), top_height); + printer.set_stat(printer::Stat::HeaderCount(top_height)); + if top_height >= starting_height as u64 { + scan_net(); + } + } else { + // Wat? Lets start again... + printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true); + } + if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage { + version: 70015, + locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()], + stop_hash: Default::default(), + })) { + return future::err(()) + } + }, + NetworkMessage::Ping(v) => { + if let Err(_) = trusted_write.try_send(NetworkMessage::Pong(v)) { + return future::err(()) + } + }, + _ => {}, + } + future::ok(()) + }).then(|_| { + future::err(()) + }) + }).then(move |_: Result<(), ()>| { + printer.add_line("Lost connection from trusted peer".to_string(), true); + make_trusted_conn(trusted_sockaddr); + future::ok(()) + })); +} + +fn main() { + if env::args().len() != 3 { + println!("USAGE: dnsseed-rust datastore localPeerAddress"); + return; + } + + unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::new()))) }; + unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::new()))) }; + unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).bitcoin_hash(), 0); + unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).bitcoin_hash()); + unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).bitcoin_hash(), 0)))) }; + + unsafe { DATA_STORE = Some(Box::new(Store::new())) }; + unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) }; + + tokio::run(future::lazy(|| { + let mut args = env::args(); + args.next(); + let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); + make_trusted_conn(trusted_sockaddr); + + future::ok(()) + })); +} diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 0000000..95028ed --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,130 @@ +use std::cmp; +use std::net::SocketAddr; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use bitcoin::consensus::encode; +use bitcoin::consensus::encode::{Decodable, Encodable}; +use bitcoin::network::address::Address; +use bitcoin::network::constants::Network; +use bitcoin::network::message::{RawNetworkMessage, NetworkMessage}; +use bitcoin::network::message_network::VersionMessage; + +use tokio::prelude::*; +use tokio::codec; +use tokio::codec::Framed; +use tokio::net::TcpStream; +use tokio::timer::Delay; + +use futures::sync::mpsc; + +use crate::printer::Printer; +use crate::timeout_stream::TimeoutStream; + +struct BytesCoder<'a>(&'a mut bytes::BytesMut); +impl<'a> std::io::Write for BytesCoder<'a> { + fn write(&mut self, b: &[u8]) -> Result { + self.0.extend_from_slice(&b); + Ok(b.len()) + } + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } +} +struct BytesDecoder<'a> { + buf: &'a mut bytes::BytesMut, + pos: usize, +} +impl<'a> std::io::Read for BytesDecoder<'a> { + fn read(&mut self, b: &mut [u8]) -> Result { + let copy_len = cmp::min(b.len(), self.buf.len() - self.pos); + b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]); + self.pos += copy_len; + Ok(copy_len) + } +} + +struct MsgCoder<'a>(&'a Printer); +impl<'a> codec::Decoder for MsgCoder<'a> { + type Item = NetworkMessage; + type Error = std::io::Error; + + fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result, std::io::Error> { + let mut decoder = BytesDecoder { + buf: bytes, + pos: 0 + }; + match RawNetworkMessage::consensus_decode(&mut decoder) { + Ok(res) => { + decoder.buf.advance(decoder.pos); + if res.magic == Network::Bitcoin.magic() { + Ok(Some(res.payload)) + } else { + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic")) + } + }, + Err(e) => match e { + encode::Error::Io(_) => Ok(None), + encode::Error::UnrecognizedNetworkCommand(_msg) => { + decoder.buf.advance(decoder.pos); + //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true); + Ok(None) + }, + _ => { + self.0.add_line(format!("Error decoding message: {:?}", e), true); + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + }, + } + } + } +} +impl<'a> codec::Encoder for MsgCoder<'a> { + type Item = NetworkMessage; + type Error = std::io::Error; + + fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> { + if let Err(_) = (RawNetworkMessage { + magic: Network::Bitcoin.magic(), + payload: msg, + }.consensus_encode(&mut BytesCoder(res))) { + //XXX + } + Ok(()) + } +} + +pub struct Peer { + +} + +impl Peer { + pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future, impl Stream)> { + let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| { + future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached")) + }); + TcpStream::connect(&addr).select(connect_timeout) + .or_else(move |_| { + Delay::new(Instant::now() + timeout / 10).then(|_| { + future::err(()) + }) + }).and_then(move |stream| { + let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split(); + let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb + tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver) + .then(|_| { + future::err(()) + })); + let _ = sender.try_send(NetworkMessage::Version(VersionMessage { + version: 70015, + services: (1 << 3), // NODE_WITNESS + timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64, + receiver: Address::new(&addr, 0), + sender: Address::new(&"0.0.0.0:0".parse().unwrap(), 0), + nonce: 0xdeadbeef, + user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(), + start_height: 0, + relay: true, + })); + future::ok((sender, TimeoutStream::new(read, timeout))) + }) + } +} diff --git a/src/printer.rs b/src/printer.rs new file mode 100644 index 0000000..10d2150 --- /dev/null +++ b/src/printer.rs @@ -0,0 +1,162 @@ +use std::collections::LinkedList; +use std::sync::{Arc, Mutex}; +use std::io::Write; + +use crate::datastore::{Store, AddressState, U64Setting, StringSetting}; + +pub enum Stat { + HeaderCount(u64), + NewConnection, + ConnectionClosed, +} + +struct Stats { + lines: LinkedList, + header_count: u64, + connection_count: u64, +} + +pub struct Printer { + stats: Arc>, +} + +impl Printer { + pub fn new(store: &'static Store) -> Printer { + let stats: Arc> = Arc::new(Mutex::new(Stats { + lines: LinkedList::new(), + header_count: 0, + connection_count: 0, + })); + let thread_arc = Arc::clone(&stats); + std::thread::spawn(move || { + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + + let stats = thread_arc.lock().unwrap(); + + out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?"); + for line in stats.lines.iter() { + out.write_all(line.as_bytes()).expect("stdout broken?"); + out.write_all(b"\n").expect("stdout broken?"); + } + + out.write_all(b"\nNode counts by status:\n").expect("stdout broken?"); + out.write_all(format!("Untested: {}\n", store.get_node_count(AddressState::Untested) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Low Block Count: {}\n", store.get_node_count(AddressState::LowBlockCount) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("High Block Count: {}\n", store.get_node_count(AddressState::HighBlockCount) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Low Version: {}\n", store.get_node_count(AddressState::LowVersion) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Bad Version: {}\n", store.get_node_count(AddressState::BadVersion) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Not Full Node: {}\n", store.get_node_count(AddressState::NotFullNode) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Protocol Violation: {}\n", store.get_node_count(AddressState::ProtocolViolation) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Timeout: {}\n", store.get_node_count(AddressState::Timeout) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Timeout During Request: {}\n", store.get_node_count(AddressState::TimeoutDuringRequest) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("Good: {}\n", store.get_node_count(AddressState::Good) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!("WasGood: {}\n", store.get_node_count(AddressState::WasGood) + ).as_bytes()).expect("stdout broken?"); + + out.write_all(format!( + "\nCurrent connections open/in progress: {}\n", stats.connection_count).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Connections opened each second: {} (\"c x\" to change to x seconds)\n", store.get_u64(U64Setting::ConnsPerSec) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Current block count: {}\n", stats.header_count).as_bytes()).expect("stdout broken?"); + + out.write_all(format!( + "Timeout for full run (in seconds): {} (\"t x\" to change to x seconds)\n", store.get_u64(U64Setting::RunTimeout) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Minimum protocol version: {} (\"v x\" to change value to x)\n", store.get_u64(U64Setting::MinProtocolVersion) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Subversion match regex: {} (\"s x\" to change value to x)\n", store.get_string(StringSetting::SubverRegex) + ).as_bytes()).expect("stdout broken?"); + + out.write_all(b"\nRetry times (in seconds):\n").expect("stdout broken?"); + out.write_all(format!( + "Untested: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Untested)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Low Block Count: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "High Block Count {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Low Version: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Bad Version: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Not Full Node: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Protocol Violation: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Timeout: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Timeout)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Timeout During Request: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Good: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Good)) + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "Was Good: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::WasGood)) + ).as_bytes()).expect("stdout broken?"); + + out.write_all(b"\nCommands:\n").expect("stdout broken?"); + out.write_all(b"q: quit\n").expect("stdout broken?"); + out.write_all(format!( + "r x y: Change retry time for status x (int value, see retry times section for name mappings) to y (in seconds)\n" + ).as_bytes()).expect("stdout broken?"); + out.write_all(format!( + "w x: Change the amount of time a node is considered WAS_GOOD after it fails to x from {} (in seconds)\n", + store.get_u64(U64Setting::WasGoodTimeout) + ).as_bytes()).expect("stdout broken?"); + out.write_all(b"p: Enable/disable updating these stats\n").expect("stdout broken?"); + out.write_all(b"a x: Scan node x\n").expect("stdout broken?"); + out.write_all(b"\x1b[s").expect("stdout broken?"); // Save cursor position and provide a blank line before cursor + out.write_all(b"\x1b[;H\x1b[2K").expect("stdout broken?"); + out.write_all(b"Most recent log:\n").expect("stdout broken?"); + out.write_all(b"\x1b[u").expect("stdout broken?"); // Restore cursor position and go up one line + + out.flush().expect("stdout broken?"); + } + }); + Printer { + stats, + } + } + + pub fn add_line(&self, line: String, _err: bool) { + let mut stats = self.stats.lock().unwrap(); + stats.lines.push_back(line); + if stats.lines.len() > 50 { + stats.lines.pop_front(); + } + } + + pub fn set_stat(&self, s: Stat) { + match s { + Stat::HeaderCount(c) => self.stats.lock().unwrap().header_count = c, + Stat::NewConnection => self.stats.lock().unwrap().connection_count += 1, + Stat::ConnectionClosed => self.stats.lock().unwrap().connection_count -= 1, + } + } +} diff --git a/src/timeout_stream.rs b/src/timeout_stream.rs new file mode 100644 index 0000000..57664f8 --- /dev/null +++ b/src/timeout_stream.rs @@ -0,0 +1,42 @@ +use tokio::prelude::*; +use tokio::timer::Delay; + +use std::time::{Duration, Instant}; + +pub struct TimeoutStream where S : Stream { + stream: S, + next_deadline: Delay, + timeout: Duration, +} + +impl TimeoutStream where S : Stream { + pub fn new(stream: S, timeout: Duration) -> Self { + let next_deadline = Delay::new(Instant::now() + timeout); + Self { + stream, + next_deadline, + timeout, + } + } +} + +impl Stream for TimeoutStream where S : Stream { + type Item = S::Item; + type Error = S::Error; + fn poll(&mut self) -> Result>, S::Error> { + match self.next_deadline.poll() { + Ok(Async::Ready(_)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => { + match self.stream.poll() { + Ok(Async::Ready(v)) => { + self.next_deadline.reset(Instant::now() + self.timeout); + Ok(Async::Ready(v)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Err(e), + } + }, + Err(_) => Ok(Async::Ready(None)), // TODO: If I want to upstream TimeoutStream this is gonna need some love + } + } +} -- 2.39.5