--- /dev/null
+[package]
+name = "dnsseed-rust"
+version = "0.1.0"
+authors = ["matt"]
+edition = "2018"
+
+[dependencies]
+bitcoin = { path = "../Bitcoin/rust-bitcoin" }
+bitcoin_hashes = "0.3"
+tokio = "0.1"
+bytes = "0.4"
+futures = "0.1"
+rand = "0.6"
--- /dev/null
+use std::{cmp, mem};
+use std::collections::{HashMap, hash_map};
+use std::sync::RwLock;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use bitcoin::network::address::Address;
+
+use rand::thread_rng;
+use rand::seq::SliceRandom;
+
+#[derive(Clone, Copy, Hash, PartialEq, Eq)]
+pub enum AddressState {
+ Untested,
+ LowBlockCount,
+ HighBlockCount,
+ LowVersion,
+ BadVersion,
+ NotFullNode,
+ ProtocolViolation,
+ Timeout,
+ TimeoutDuringRequest,
+ Good,
+ WasGood,
+}
+
+#[derive(Hash, PartialEq, Eq)]
+pub enum U64Setting {
+ ConnsPerSec,
+ RunTimeout,
+ WasGoodTimeout,
+ RescanInterval(AddressState),
+ MinProtocolVersion,
+}
+
+#[derive(Hash, PartialEq, Eq)]
+pub enum StringSetting {
+ SubverRegex,
+}
+
+struct Nodes {
+ good_node_services: HashMap<u8, Vec<SocketAddr>>,
+ nodes_to_state: HashMap<SocketAddr, AddressState>,
+ state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
+}
+
+pub struct Store {
+ u64_settings: RwLock<HashMap<U64Setting, u64>>,
+ subver_regex: RwLock<String>,
+ nodes: RwLock<Nodes>,
+}
+
+impl Store {
+ pub fn new() -> Store {
+ let mut u64s = HashMap::with_capacity(15);
+ u64s.insert(U64Setting::ConnsPerSec, 50);
+ u64s.insert(U64Setting::RunTimeout, 120);
+ u64s.insert(U64Setting::WasGoodTimeout, 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
+ u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
+ u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
+ let mut state_vecs = HashMap::with_capacity(11);
+ state_vecs.insert(AddressState::Untested, Vec::new());
+ state_vecs.insert(AddressState::LowBlockCount, Vec::new());
+ state_vecs.insert(AddressState::HighBlockCount, Vec::new());
+ state_vecs.insert(AddressState::LowVersion, Vec::new());
+ state_vecs.insert(AddressState::BadVersion, Vec::new());
+ state_vecs.insert(AddressState::NotFullNode, Vec::new());
+ state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
+ state_vecs.insert(AddressState::Timeout, Vec::new());
+ state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
+ state_vecs.insert(AddressState::Good, Vec::new());
+ state_vecs.insert(AddressState::WasGood, Vec::new());
+ let mut good_node_services = HashMap::with_capacity(64);
+ for i in 0..64 {
+ good_node_services.insert(i, Vec::new());
+ }
+ Store {
+ u64_settings: RwLock::new(u64s),
+ subver_regex: RwLock::new(".*".to_string()),
+ nodes: RwLock::new(Nodes {
+ good_node_services,
+ nodes_to_state: HashMap::new(),
+ state_next_scan: state_vecs,
+ }),
+ }
+ }
+
+ pub fn get_u64(&self, setting: U64Setting) -> u64 {
+ *self.u64_settings.read().unwrap().get(&setting).unwrap()
+ }
+
+ pub fn get_node_count(&self, state: AddressState) -> usize {
+ self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
+ }
+
+ pub fn get_string(&self, _setting: StringSetting) -> String {
+ self.subver_regex.read().unwrap().clone()
+ }
+
+ pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
+ let mut nodes = self.nodes.write().unwrap();
+ let cur_time = Instant::now();
+ for &(_, ref addr) in addresses {
+ if let Ok(socketaddr) = addr.socket_addr() {
+ match nodes.nodes_to_state.entry(socketaddr.clone()) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(AddressState::Untested);
+ nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
+ },
+ hash_map::Entry::Occupied(_) => {},
+ }
+ } else {
+ //TODO: Handle onions
+ }
+ }
+ }
+
+ pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
+ let mut nodes = self.nodes.write().unwrap();
+ let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
+ if *state_ref == AddressState::Good && state != AddressState::Good {
+ *state_ref = AddressState::WasGood;
+ nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((Instant::now(), addr));
+ } else {
+ *state_ref = state;
+ nodes.state_next_scan.get_mut(&state).unwrap().push((Instant::now(), addr));
+ }
+ if state == AddressState::Good {
+
+ }
+ }
+
+ pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
+ let mut res = Vec::with_capacity(600);
+ let cur_time = Instant::now();
+ let mut nodes = self.nodes.write().unwrap();
+ for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
+ let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
+ let split_point = cmp::min(cmp::min(600 - res.len(), 60),
+ state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
+ let mut new_nodes = state_nodes.split_off(split_point);
+ mem::swap(&mut new_nodes, state_nodes);
+ for (_, node) in new_nodes.drain(..) {
+ res.push(node);
+ }
+ }
+ res.shuffle(&mut thread_rng());
+ res
+ }
+}
--- /dev/null
+mod printer;
+mod peer;
+mod timeout_stream;
+mod datastore;
+
+use std::env;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+use std::net::SocketAddr;
+
+use bitcoin_hashes::sha256d;
+
+use bitcoin::blockdata::constants::genesis_block;
+use bitcoin::network::constants::Network;
+use bitcoin::network::message::NetworkMessage;
+use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory, InvType};
+use bitcoin::util::hash::BitcoinHash;
+
+use printer::{Printer, Stat};
+use peer::Peer;
+use datastore::{AddressState, Store, U64Setting};
+
+use tokio::prelude::*;
+use tokio::timer::Delay;
+
+static mut HIGHEST_HEADER: Option<Box<Mutex<(sha256d::Hash, u64)>>> = None;
+static mut HEADER_MAP: Option<Box<Mutex<HashMap<sha256d::Hash, u64>>>> = None;
+static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
+static mut DATA_STORE: Option<Box<Store>> = None;
+static mut PRINTER: Option<Box<Printer>> = None;
+
+struct PeerState {
+ recvd_version: bool,
+ recvd_verack: bool,
+ recvd_addrs: bool,
+ recvd_block: bool,
+ node_services: u64,
+ fail_reason: AddressState,
+ request: (u64, sha256d::Hash),
+}
+
+fn scan_net() {
+ tokio::spawn(future::lazy(|| {
+ let store = unsafe { DATA_STORE.as_ref().unwrap() };
+ let printer = unsafe { PRINTER.as_ref().unwrap() };
+
+ let mut scan_nodes = store.get_next_scan_nodes();
+ let timeout = store.get_u64(U64Setting::RunTimeout);
+ let per_iter_time = Duration::from_millis(1000 / store.get_u64(U64Setting::ConnsPerSec));
+ let mut iter_time = Instant::now();
+ let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
+ let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
+
+ for node in scan_nodes.drain(..) {
+ let peer_state = Arc::new(Mutex::new(PeerState {
+ recvd_version: false,
+ recvd_verack: false,
+ recvd_addrs: false,
+ recvd_block: false,
+ node_services: 0,
+ fail_reason: AddressState::Timeout,
+ request: (requested_height, requested_block),
+ }));
+ let final_peer_state = Arc::clone(&peer_state);
+ let peer = Delay::new(iter_time).then(move |_| {
+ printer.set_stat(Stat::NewConnection);
+ Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+ });
+ iter_time += per_iter_time;
+ tokio::spawn(peer.and_then(move |conn_split| {
+ let (mut write, read) = conn_split;
+ read.map_err(|_| { () }).for_each(move |msg| {
+ let mut state_lock = peer_state.lock().unwrap();
+ macro_rules! check_set_flag {
+ ($recvd_flag: ident, $msg: expr) => { {
+ if state_lock.$recvd_flag {
+ state_lock.fail_reason = AddressState::ProtocolViolation;
+ printer.add_line(format!("Updating {} to ProtocolViolation due to dup {}", node, $msg), true);
+ state_lock.$recvd_flag = false;
+ return future::err(());
+ }
+ state_lock.$recvd_flag = true;
+ } }
+ }
+ state_lock.fail_reason = AddressState::TimeoutDuringRequest;
+ match msg {
+ NetworkMessage::Version(ver) => {
+ if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008*2 {
+ state_lock.fail_reason = AddressState::HighBlockCount;
+ return future::err(());
+ }
+ if (ver.start_height as u64) < state_lock.request.0 {
+ printer.add_line(format!("Updating {} to LowBlockCount ({} < {})", node, ver.start_height, state_lock.request.0), true);
+ state_lock.fail_reason = AddressState::LowBlockCount;
+ return future::err(());
+ }
+ let min_version = store.get_u64(U64Setting::MinProtocolVersion);
+ if (ver.version as u64) < min_version {
+ printer.add_line(format!("Updating {} to LowVersion ({} < {})", node, ver.version, min_version), true);
+ state_lock.fail_reason = AddressState::LowVersion;
+ return future::err(());
+ }
+ if ver.services & 1 != 1 {
+ printer.add_line(format!("Updating {} to NotFullNode (services {:x})", node, ver.services), true);
+ state_lock.fail_reason = AddressState::NotFullNode;
+ return future::err(());
+ }
+ check_set_flag!(recvd_version, "version");
+ state_lock.node_services = ver.services;
+ if let Err(_) = write.try_send(NetworkMessage::Verack) {
+ return future::err(());
+ }
+ },
+ NetworkMessage::Verack => {
+ check_set_flag!(recvd_verack, "verack");
+ if let Err(_) = write.try_send(NetworkMessage::GetAddr) {
+ return future::err(());
+ }
+ if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory {
+ inv_type: InvType::WitnessBlock,
+ hash: state_lock.request.1,
+ }])) {
+ return future::err(());
+ }
+ },
+ NetworkMessage::Ping(v) => {
+ if let Err(_) = write.try_send(NetworkMessage::Pong(v)) {
+ return future::err(())
+ }
+ },
+ NetworkMessage::Addr(addrs) => {
+ if addrs.len() > 1 {
+ check_set_flag!(recvd_addrs, "addr");
+ unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
+ }
+ },
+ NetworkMessage::Block(block) => {
+ if block.header.bitcoin_hash() != state_lock.request.1 ||
+ !block.check_merkle_root() || !block.check_witness_commitment() {
+ state_lock.fail_reason = AddressState::ProtocolViolation;
+ printer.add_line(format!("Updating {} to ProtocolViolation due to bad block", node), true);
+ return future::err(());
+ }
+ check_set_flag!(recvd_block, "block");
+ },
+ _ => {},
+ }
+ future::ok(())
+ }).then(|_| {
+ future::err(())
+ })
+ }).then(move |_: Result<(), ()>| {
+ let printer = unsafe { PRINTER.as_ref().unwrap() };
+ let store = unsafe { DATA_STORE.as_ref().unwrap() };
+
+ printer.set_stat(Stat::ConnectionClosed);
+
+ let state_lock = final_peer_state.lock().unwrap();
+ if state_lock.recvd_version && state_lock.recvd_verack &&
+ state_lock.recvd_addrs && state_lock.recvd_block {
+ store.set_node_state(node, AddressState::Good, state_lock.node_services);
+ } else {
+ if state_lock.fail_reason == AddressState::Timeout || state_lock.fail_reason == AddressState::TimeoutDuringRequest {
+ printer.add_line(format!("Updating {} to Timeout[DuringRequest]", node), true);
+ }
+ assert!(state_lock.fail_reason != AddressState::Good);
+ store.set_node_state(node, state_lock.fail_reason, 0);
+ }
+ future::ok(())
+ }));
+ }
+ Delay::new(iter_time).then(|_| {
+ scan_net();
+ future::ok(())
+ })
+ }));
+}
+
+fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
+ let printer = unsafe { PRINTER.as_ref().unwrap() };
+ let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
+ tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+ printer.add_line("Connected to local peer".to_string(), false);
+ let (mut trusted_write, trusted_read) = trusted_split;
+ let mut starting_height = 0;
+ trusted_read.map_err(|_| { () }).for_each(move |msg| {
+ match msg {
+ NetworkMessage::Version(ver) => {
+ if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
+ return future::err(())
+ }
+ starting_height = ver.start_height;
+ },
+ NetworkMessage::Verack => {
+ if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
+ version: 70015,
+ locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
+ stop_hash: Default::default(),
+ })) {
+ return future::err(());
+ }
+ if let Err(_) = trusted_write.try_send(NetworkMessage::GetAddr) {
+ return future::err(());
+ }
+ },
+ NetworkMessage::Addr(addrs) => {
+ unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
+ },
+ NetworkMessage::Headers(headers) => {
+ if headers.is_empty() {
+ return future::ok(());
+ }
+ let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
+ let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap();
+ if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() {
+ for i in 0..headers.len() {
+ let hash = headers[i].bitcoin_hash();
+ if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash {
+ return future::err(());
+ }
+ header_map.insert(headers[i].bitcoin_hash(), height + 1 + (i as u64));
+ height_map.insert(height + 1 + (i as u64), headers[i].bitcoin_hash());
+ }
+ let top_height = height + headers.len() as u64;
+ *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap()
+ = (headers.last().unwrap().bitcoin_hash(), top_height);
+ printer.set_stat(printer::Stat::HeaderCount(top_height));
+ if top_height >= starting_height as u64 {
+ scan_net();
+ }
+ } else {
+ // Wat? Lets start again...
+ printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true);
+ }
+ if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
+ version: 70015,
+ locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
+ stop_hash: Default::default(),
+ })) {
+ return future::err(())
+ }
+ },
+ NetworkMessage::Ping(v) => {
+ if let Err(_) = trusted_write.try_send(NetworkMessage::Pong(v)) {
+ return future::err(())
+ }
+ },
+ _ => {},
+ }
+ future::ok(())
+ }).then(|_| {
+ future::err(())
+ })
+ }).then(move |_: Result<(), ()>| {
+ printer.add_line("Lost connection from trusted peer".to_string(), true);
+ make_trusted_conn(trusted_sockaddr);
+ future::ok(())
+ }));
+}
+
+fn main() {
+ if env::args().len() != 3 {
+ println!("USAGE: dnsseed-rust datastore localPeerAddress");
+ return;
+ }
+
+ unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::new()))) };
+ unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::new()))) };
+ unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).bitcoin_hash(), 0);
+ unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).bitcoin_hash());
+ unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).bitcoin_hash(), 0)))) };
+
+ unsafe { DATA_STORE = Some(Box::new(Store::new())) };
+ unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) };
+
+ tokio::run(future::lazy(|| {
+ let mut args = env::args();
+ args.next();
+ let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
+ make_trusted_conn(trusted_sockaddr);
+
+ future::ok(())
+ }));
+}
--- /dev/null
+use std::cmp;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+use bitcoin::consensus::encode;
+use bitcoin::consensus::encode::{Decodable, Encodable};
+use bitcoin::network::address::Address;
+use bitcoin::network::constants::Network;
+use bitcoin::network::message::{RawNetworkMessage, NetworkMessage};
+use bitcoin::network::message_network::VersionMessage;
+
+use tokio::prelude::*;
+use tokio::codec;
+use tokio::codec::Framed;
+use tokio::net::TcpStream;
+use tokio::timer::Delay;
+
+use futures::sync::mpsc;
+
+use crate::printer::Printer;
+use crate::timeout_stream::TimeoutStream;
+
+struct BytesCoder<'a>(&'a mut bytes::BytesMut);
+impl<'a> std::io::Write for BytesCoder<'a> {
+ fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
+ self.0.extend_from_slice(&b);
+ Ok(b.len())
+ }
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ Ok(())
+ }
+}
+struct BytesDecoder<'a> {
+ buf: &'a mut bytes::BytesMut,
+ pos: usize,
+}
+impl<'a> std::io::Read for BytesDecoder<'a> {
+ fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
+ let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
+ b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
+ self.pos += copy_len;
+ Ok(copy_len)
+ }
+}
+
+struct MsgCoder<'a>(&'a Printer);
+impl<'a> codec::Decoder for MsgCoder<'a> {
+ type Item = NetworkMessage;
+ type Error = std::io::Error;
+
+ fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, std::io::Error> {
+ let mut decoder = BytesDecoder {
+ buf: bytes,
+ pos: 0
+ };
+ match RawNetworkMessage::consensus_decode(&mut decoder) {
+ Ok(res) => {
+ decoder.buf.advance(decoder.pos);
+ if res.magic == Network::Bitcoin.magic() {
+ Ok(Some(res.payload))
+ } else {
+ Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic"))
+ }
+ },
+ Err(e) => match e {
+ encode::Error::Io(_) => Ok(None),
+ encode::Error::UnrecognizedNetworkCommand(_msg) => {
+ decoder.buf.advance(decoder.pos);
+ //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
+ Ok(None)
+ },
+ _ => {
+ self.0.add_line(format!("Error decoding message: {:?}", e), true);
+ Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
+ },
+ }
+ }
+ }
+}
+impl<'a> codec::Encoder for MsgCoder<'a> {
+ type Item = NetworkMessage;
+ type Error = std::io::Error;
+
+ fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
+ if let Err(_) = (RawNetworkMessage {
+ magic: Network::Bitcoin.magic(),
+ payload: msg,
+ }.consensus_encode(&mut BytesCoder(res))) {
+ //XXX
+ }
+ Ok(())
+ }
+}
+
+pub struct Peer {
+
+}
+
+impl Peer {
+ pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=std::io::Error>)> {
+ let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
+ future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
+ });
+ TcpStream::connect(&addr).select(connect_timeout)
+ .or_else(move |_| {
+ Delay::new(Instant::now() + timeout / 10).then(|_| {
+ future::err(())
+ })
+ }).and_then(move |stream| {
+ let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
+ let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
+ tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
+ .then(|_| {
+ future::err(())
+ }));
+ let _ = sender.try_send(NetworkMessage::Version(VersionMessage {
+ version: 70015,
+ services: (1 << 3), // NODE_WITNESS
+ timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64,
+ receiver: Address::new(&addr, 0),
+ sender: Address::new(&"0.0.0.0:0".parse().unwrap(), 0),
+ nonce: 0xdeadbeef,
+ user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
+ start_height: 0,
+ relay: true,
+ }));
+ future::ok((sender, TimeoutStream::new(read, timeout)))
+ })
+ }
+}
--- /dev/null
+use std::collections::LinkedList;
+use std::sync::{Arc, Mutex};
+use std::io::Write;
+
+use crate::datastore::{Store, AddressState, U64Setting, StringSetting};
+
+pub enum Stat {
+ HeaderCount(u64),
+ NewConnection,
+ ConnectionClosed,
+}
+
+struct Stats {
+ lines: LinkedList<String>,
+ header_count: u64,
+ connection_count: u64,
+}
+
+pub struct Printer {
+ stats: Arc<Mutex<Stats>>,
+}
+
+impl Printer {
+ pub fn new(store: &'static Store) -> Printer {
+ let stats: Arc<Mutex<Stats>> = Arc::new(Mutex::new(Stats {
+ lines: LinkedList::new(),
+ header_count: 0,
+ connection_count: 0,
+ }));
+ let thread_arc = Arc::clone(&stats);
+ std::thread::spawn(move || {
+ loop {
+ std::thread::sleep(std::time::Duration::from_secs(1));
+
+ let stdout = std::io::stdout();
+ let mut out = stdout.lock();
+
+ let stats = thread_arc.lock().unwrap();
+
+ out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?");
+ for line in stats.lines.iter() {
+ out.write_all(line.as_bytes()).expect("stdout broken?");
+ out.write_all(b"\n").expect("stdout broken?");
+ }
+
+ out.write_all(b"\nNode counts by status:\n").expect("stdout broken?");
+ out.write_all(format!("Untested: {}\n", store.get_node_count(AddressState::Untested)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Low Block Count: {}\n", store.get_node_count(AddressState::LowBlockCount)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("High Block Count: {}\n", store.get_node_count(AddressState::HighBlockCount)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Low Version: {}\n", store.get_node_count(AddressState::LowVersion)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Bad Version: {}\n", store.get_node_count(AddressState::BadVersion)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Not Full Node: {}\n", store.get_node_count(AddressState::NotFullNode)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Protocol Violation: {}\n", store.get_node_count(AddressState::ProtocolViolation)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Timeout: {}\n", store.get_node_count(AddressState::Timeout)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Timeout During Request: {}\n", store.get_node_count(AddressState::TimeoutDuringRequest)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("Good: {}\n", store.get_node_count(AddressState::Good)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!("WasGood: {}\n", store.get_node_count(AddressState::WasGood)
+ ).as_bytes()).expect("stdout broken?");
+
+ out.write_all(format!(
+ "\nCurrent connections open/in progress: {}\n", stats.connection_count).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Connections opened each second: {} (\"c x\" to change to x seconds)\n", store.get_u64(U64Setting::ConnsPerSec)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Current block count: {}\n", stats.header_count).as_bytes()).expect("stdout broken?");
+
+ out.write_all(format!(
+ "Timeout for full run (in seconds): {} (\"t x\" to change to x seconds)\n", store.get_u64(U64Setting::RunTimeout)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Minimum protocol version: {} (\"v x\" to change value to x)\n", store.get_u64(U64Setting::MinProtocolVersion)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Subversion match regex: {} (\"s x\" to change value to x)\n", store.get_string(StringSetting::SubverRegex)
+ ).as_bytes()).expect("stdout broken?");
+
+ out.write_all(b"\nRetry times (in seconds):\n").expect("stdout broken?");
+ out.write_all(format!(
+ "Untested: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Untested))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Low Block Count: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "High Block Count {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Low Version: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowVersion))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Bad Version: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::BadVersion))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Not Full Node: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Protocol Violation: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Timeout: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Timeout))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Timeout During Request: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Good: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Good))
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "Was Good: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::WasGood))
+ ).as_bytes()).expect("stdout broken?");
+
+ out.write_all(b"\nCommands:\n").expect("stdout broken?");
+ out.write_all(b"q: quit\n").expect("stdout broken?");
+ out.write_all(format!(
+ "r x y: Change retry time for status x (int value, see retry times section for name mappings) to y (in seconds)\n"
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(format!(
+ "w x: Change the amount of time a node is considered WAS_GOOD after it fails to x from {} (in seconds)\n",
+ store.get_u64(U64Setting::WasGoodTimeout)
+ ).as_bytes()).expect("stdout broken?");
+ out.write_all(b"p: Enable/disable updating these stats\n").expect("stdout broken?");
+ out.write_all(b"a x: Scan node x\n").expect("stdout broken?");
+ out.write_all(b"\x1b[s").expect("stdout broken?"); // Save cursor position and provide a blank line before cursor
+ out.write_all(b"\x1b[;H\x1b[2K").expect("stdout broken?");
+ out.write_all(b"Most recent log:\n").expect("stdout broken?");
+ out.write_all(b"\x1b[u").expect("stdout broken?"); // Restore cursor position and go up one line
+
+ out.flush().expect("stdout broken?");
+ }
+ });
+ Printer {
+ stats,
+ }
+ }
+
+ pub fn add_line(&self, line: String, _err: bool) {
+ let mut stats = self.stats.lock().unwrap();
+ stats.lines.push_back(line);
+ if stats.lines.len() > 50 {
+ stats.lines.pop_front();
+ }
+ }
+
+ pub fn set_stat(&self, s: Stat) {
+ match s {
+ Stat::HeaderCount(c) => self.stats.lock().unwrap().header_count = c,
+ Stat::NewConnection => self.stats.lock().unwrap().connection_count += 1,
+ Stat::ConnectionClosed => self.stats.lock().unwrap().connection_count -= 1,
+ }
+ }
+}
--- /dev/null
+use tokio::prelude::*;
+use tokio::timer::Delay;
+
+use std::time::{Duration, Instant};
+
+pub struct TimeoutStream<S> where S : Stream {
+ stream: S,
+ next_deadline: Delay,
+ timeout: Duration,
+}
+
+impl<S> TimeoutStream<S> where S : Stream {
+ pub fn new(stream: S, timeout: Duration) -> Self {
+ let next_deadline = Delay::new(Instant::now() + timeout);
+ Self {
+ stream,
+ next_deadline,
+ timeout,
+ }
+ }
+}
+
+impl<S> Stream for TimeoutStream<S> where S : Stream {
+ type Item = S::Item;
+ type Error = S::Error;
+ fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
+ match self.next_deadline.poll() {
+ Ok(Async::Ready(_)) => Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => {
+ match self.stream.poll() {
+ Ok(Async::Ready(v)) => {
+ self.next_deadline.reset(Instant::now() + self.timeout);
+ Ok(Async::Ready(v))
+ },
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(e) => Err(e),
+ }
+ },
+ Err(_) => Ok(Async::Ready(None)), // TODO: If I want to upstream TimeoutStream this is gonna need some love
+ }
+ }
+}