Initial checkin
authorMatt Corallo <git@bluematt.me>
Sat, 18 May 2019 19:36:04 +0000 (15:36 -0400)
committerMatt Corallo <git@bluematt.me>
Sat, 18 May 2019 19:36:04 +0000 (15:36 -0400)
Cargo.toml [new file with mode: 0644]
src/datastore.rs [new file with mode: 0644]
src/main.rs [new file with mode: 0644]
src/peer.rs [new file with mode: 0644]
src/printer.rs [new file with mode: 0644]
src/timeout_stream.rs [new file with mode: 0644]

diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644 (file)
index 0000000..88b3deb
--- /dev/null
@@ -0,0 +1,13 @@
+[package]
+name = "dnsseed-rust"
+version = "0.1.0"
+authors = ["matt"]
+edition = "2018"
+
+[dependencies]
+bitcoin = { path = "../Bitcoin/rust-bitcoin" }
+bitcoin_hashes = "0.3"
+tokio = "0.1"
+bytes = "0.4"
+futures = "0.1"
+rand = "0.6"
diff --git a/src/datastore.rs b/src/datastore.rs
new file mode 100644 (file)
index 0000000..1013a9e
--- /dev/null
@@ -0,0 +1,160 @@
+use std::{cmp, mem};
+use std::collections::{HashMap, hash_map};
+use std::sync::RwLock;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use bitcoin::network::address::Address;
+
+use rand::thread_rng;
+use rand::seq::SliceRandom;
+
+#[derive(Clone, Copy, Hash, PartialEq, Eq)]
+pub enum AddressState {
+       Untested,
+       LowBlockCount,
+       HighBlockCount,
+       LowVersion,
+       BadVersion,
+       NotFullNode,
+       ProtocolViolation,
+       Timeout,
+       TimeoutDuringRequest,
+       Good,
+       WasGood,
+}
+
+#[derive(Hash, PartialEq, Eq)]
+pub enum U64Setting {
+       ConnsPerSec,
+       RunTimeout,
+       WasGoodTimeout,
+       RescanInterval(AddressState),
+       MinProtocolVersion,
+}
+
+#[derive(Hash, PartialEq, Eq)]
+pub enum StringSetting {
+       SubverRegex,
+}
+
+struct Nodes {
+       good_node_services: HashMap<u8, Vec<SocketAddr>>,
+       nodes_to_state: HashMap<SocketAddr, AddressState>,
+       state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
+}
+
+pub struct Store {
+       u64_settings: RwLock<HashMap<U64Setting, u64>>,
+       subver_regex: RwLock<String>,
+       nodes: RwLock<Nodes>,
+}
+
+impl Store {
+       pub fn new() -> Store {
+               let mut u64s = HashMap::with_capacity(15);
+               u64s.insert(U64Setting::ConnsPerSec, 50);
+               u64s.insert(U64Setting::RunTimeout, 120);
+               u64s.insert(U64Setting::WasGoodTimeout, 21600);
+               u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
+               u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
+               u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
+               u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
+               u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
+               u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
+               u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
+               u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
+               u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
+               u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
+               u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
+               u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
+               let mut state_vecs = HashMap::with_capacity(11);
+               state_vecs.insert(AddressState::Untested, Vec::new());
+               state_vecs.insert(AddressState::LowBlockCount, Vec::new());
+               state_vecs.insert(AddressState::HighBlockCount, Vec::new());
+               state_vecs.insert(AddressState::LowVersion, Vec::new());
+               state_vecs.insert(AddressState::BadVersion, Vec::new());
+               state_vecs.insert(AddressState::NotFullNode, Vec::new());
+               state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
+               state_vecs.insert(AddressState::Timeout, Vec::new());
+               state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
+               state_vecs.insert(AddressState::Good, Vec::new());
+               state_vecs.insert(AddressState::WasGood, Vec::new());
+               let mut good_node_services = HashMap::with_capacity(64);
+               for i in 0..64 {
+                       good_node_services.insert(i, Vec::new());
+               }
+               Store {
+                       u64_settings: RwLock::new(u64s),
+                       subver_regex: RwLock::new(".*".to_string()),
+                       nodes: RwLock::new(Nodes {
+                               good_node_services,
+                               nodes_to_state: HashMap::new(),
+                               state_next_scan: state_vecs,
+                       }),
+               }
+       }
+
+       pub fn get_u64(&self, setting: U64Setting) -> u64 {
+               *self.u64_settings.read().unwrap().get(&setting).unwrap()
+       }
+
+       pub fn get_node_count(&self, state: AddressState) -> usize {
+               self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
+       }
+
+       pub fn get_string(&self, _setting: StringSetting) -> String {
+               self.subver_regex.read().unwrap().clone()
+       }
+
+       pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
+               let mut nodes = self.nodes.write().unwrap();
+               let cur_time = Instant::now();
+               for &(_, ref addr) in addresses {
+                       if let Ok(socketaddr) = addr.socket_addr() {
+                               match nodes.nodes_to_state.entry(socketaddr.clone()) {
+                                       hash_map::Entry::Vacant(e) => {
+                                               e.insert(AddressState::Untested);
+                                               nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
+                                       },
+                                       hash_map::Entry::Occupied(_) => {},
+                               }
+                       } else {
+                               //TODO: Handle onions
+                       }
+               }
+       }
+
+       pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
+               let mut nodes = self.nodes.write().unwrap();
+               let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
+               if *state_ref == AddressState::Good && state != AddressState::Good {
+                       *state_ref = AddressState::WasGood;
+                       nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((Instant::now(), addr));
+               } else {
+                       *state_ref = state;
+                       nodes.state_next_scan.get_mut(&state).unwrap().push((Instant::now(), addr));
+               }
+               if state == AddressState::Good {
+
+               }
+       }
+
+       pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
+               let mut res = Vec::with_capacity(600);
+               let cur_time = Instant::now();
+               let mut nodes = self.nodes.write().unwrap();
+               for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
+                       let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
+                       let split_point = cmp::min(cmp::min(600 - res.len(), 60),
+                                       state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
+                       let mut new_nodes = state_nodes.split_off(split_point);
+                       mem::swap(&mut new_nodes, state_nodes);
+                       for (_, node) in new_nodes.drain(..) {
+                               res.push(node);
+                       }
+               }
+               res.shuffle(&mut thread_rng());
+               res
+       }
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644 (file)
index 0000000..d58b1ad
--- /dev/null
@@ -0,0 +1,285 @@
+mod printer;
+mod peer;
+mod timeout_stream;
+mod datastore;
+
+use std::env;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+use std::net::SocketAddr;
+
+use bitcoin_hashes::sha256d;
+
+use bitcoin::blockdata::constants::genesis_block;
+use bitcoin::network::constants::Network;
+use bitcoin::network::message::NetworkMessage;
+use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory, InvType};
+use bitcoin::util::hash::BitcoinHash;
+
+use printer::{Printer, Stat};
+use peer::Peer;
+use datastore::{AddressState, Store, U64Setting};
+
+use tokio::prelude::*;
+use tokio::timer::Delay;
+
+static mut HIGHEST_HEADER: Option<Box<Mutex<(sha256d::Hash, u64)>>> = None;
+static mut HEADER_MAP: Option<Box<Mutex<HashMap<sha256d::Hash, u64>>>> = None;
+static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
+static mut DATA_STORE: Option<Box<Store>> = None;
+static mut PRINTER: Option<Box<Printer>> = None;
+
+struct PeerState {
+       recvd_version: bool,
+       recvd_verack: bool,
+       recvd_addrs: bool,
+       recvd_block: bool,
+       node_services: u64,
+       fail_reason: AddressState,
+       request: (u64, sha256d::Hash),
+}
+
+fn scan_net() {
+       tokio::spawn(future::lazy(|| {
+               let store = unsafe { DATA_STORE.as_ref().unwrap() };
+               let printer = unsafe { PRINTER.as_ref().unwrap() };
+
+               let mut scan_nodes = store.get_next_scan_nodes();
+               let timeout = store.get_u64(U64Setting::RunTimeout);
+               let per_iter_time = Duration::from_millis(1000 / store.get_u64(U64Setting::ConnsPerSec));
+               let mut iter_time = Instant::now();
+               let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
+               let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
+
+               for node in scan_nodes.drain(..) {
+                       let peer_state = Arc::new(Mutex::new(PeerState {
+                               recvd_version: false,
+                               recvd_verack: false,
+                               recvd_addrs: false,
+                               recvd_block: false,
+                               node_services: 0,
+                               fail_reason: AddressState::Timeout,
+                               request: (requested_height, requested_block),
+                       }));
+                       let final_peer_state = Arc::clone(&peer_state);
+                       let peer = Delay::new(iter_time).then(move |_| {
+                               printer.set_stat(Stat::NewConnection);
+                               Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+                       });
+                       iter_time += per_iter_time;
+                       tokio::spawn(peer.and_then(move |conn_split| {
+                               let (mut write, read) = conn_split;
+                               read.map_err(|_| { () }).for_each(move |msg| {
+                                       let mut state_lock = peer_state.lock().unwrap();
+                                       macro_rules! check_set_flag {
+                                               ($recvd_flag: ident, $msg: expr) => { {
+                                                       if state_lock.$recvd_flag {
+                                                               state_lock.fail_reason = AddressState::ProtocolViolation;
+                                                               printer.add_line(format!("Updating {} to ProtocolViolation due to dup {}", node, $msg), true);
+                                                               state_lock.$recvd_flag = false;
+                                                               return future::err(());
+                                                       }
+                                                       state_lock.$recvd_flag = true;
+                                               } }
+                                       }
+                                       state_lock.fail_reason = AddressState::TimeoutDuringRequest;
+                                       match msg {
+                                               NetworkMessage::Version(ver) => {
+                                                       if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008*2 {
+                                                               state_lock.fail_reason = AddressState::HighBlockCount;
+                                                               return future::err(());
+                                                       }
+                                                       if (ver.start_height as u64) < state_lock.request.0 {
+                                                               printer.add_line(format!("Updating {} to LowBlockCount ({} < {})", node, ver.start_height, state_lock.request.0), true);
+                                                               state_lock.fail_reason = AddressState::LowBlockCount;
+                                                               return future::err(());
+                                                       }
+                                                       let min_version = store.get_u64(U64Setting::MinProtocolVersion);
+                                                       if (ver.version as u64) < min_version {
+                                                               printer.add_line(format!("Updating {} to LowVersion ({} < {})", node, ver.version, min_version), true);
+                                                               state_lock.fail_reason = AddressState::LowVersion;
+                                                               return future::err(());
+                                                       }
+                                                       if ver.services & 1 != 1 {
+                                                               printer.add_line(format!("Updating {} to NotFullNode (services {:x})", node, ver.services), true);
+                                                               state_lock.fail_reason = AddressState::NotFullNode;
+                                                               return future::err(());
+                                                       }
+                                                       check_set_flag!(recvd_version, "version");
+                                                       state_lock.node_services = ver.services;
+                                                       if let Err(_) = write.try_send(NetworkMessage::Verack) {
+                                                               return future::err(());
+                                                       }
+                                               },
+                                               NetworkMessage::Verack => {
+                                                       check_set_flag!(recvd_verack, "verack");
+                                                       if let Err(_) = write.try_send(NetworkMessage::GetAddr) {
+                                                               return future::err(());
+                                                       }
+                                                       if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory {
+                                                               inv_type: InvType::WitnessBlock,
+                                                               hash: state_lock.request.1,
+                                                       }])) {
+                                                               return future::err(());
+                                                       }
+                                               },
+                                               NetworkMessage::Ping(v) => {
+                                                       if let Err(_) = write.try_send(NetworkMessage::Pong(v)) {
+                                                               return future::err(())
+                                                       }
+                                               },
+                                               NetworkMessage::Addr(addrs) => {
+                                                       if addrs.len() > 1 {
+                                                               check_set_flag!(recvd_addrs, "addr");
+                                                               unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
+                                                       }
+                                               },
+                                               NetworkMessage::Block(block) => {
+                                                       if block.header.bitcoin_hash() != state_lock.request.1 ||
+                                                                       !block.check_merkle_root() || !block.check_witness_commitment() {
+                                                               state_lock.fail_reason = AddressState::ProtocolViolation;
+                                                               printer.add_line(format!("Updating {} to ProtocolViolation due to bad block", node), true);
+                                                               return future::err(());
+                                                       }
+                                                       check_set_flag!(recvd_block, "block");
+                                               },
+                                               _ => {},
+                                       }
+                                       future::ok(())
+                               }).then(|_| {
+                                       future::err(())
+                               })
+                       }).then(move |_: Result<(), ()>| {
+                               let printer = unsafe { PRINTER.as_ref().unwrap() };
+                               let store = unsafe { DATA_STORE.as_ref().unwrap() };
+
+                               printer.set_stat(Stat::ConnectionClosed);
+
+                               let state_lock = final_peer_state.lock().unwrap();
+                               if state_lock.recvd_version && state_lock.recvd_verack &&
+                                               state_lock.recvd_addrs && state_lock.recvd_block {
+                                       store.set_node_state(node, AddressState::Good, state_lock.node_services);
+                               } else {
+                                       if state_lock.fail_reason == AddressState::Timeout || state_lock.fail_reason == AddressState::TimeoutDuringRequest {
+                                               printer.add_line(format!("Updating {} to Timeout[DuringRequest]", node), true);
+                                       }
+                                       assert!(state_lock.fail_reason != AddressState::Good);
+                                       store.set_node_state(node, state_lock.fail_reason, 0);
+                               }
+                               future::ok(())
+                       }));
+               }
+               Delay::new(iter_time).then(|_| {
+                       scan_net();
+                       future::ok(())
+               })
+       }));
+}
+
+fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
+       let printer = unsafe { PRINTER.as_ref().unwrap() };
+       let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
+       tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+               printer.add_line("Connected to local peer".to_string(), false);
+               let (mut trusted_write, trusted_read) = trusted_split;
+               let mut starting_height = 0;
+               trusted_read.map_err(|_| { () }).for_each(move |msg| {
+                       match msg {
+                               NetworkMessage::Version(ver) => {
+                                       if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
+                                               return future::err(())
+                                       }
+                                       starting_height = ver.start_height;
+                               },
+                               NetworkMessage::Verack => {
+                                       if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
+                                               version: 70015,
+                                               locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
+                                               stop_hash: Default::default(),
+                                       })) {
+                                               return future::err(());
+                                       }
+                                       if let Err(_) = trusted_write.try_send(NetworkMessage::GetAddr) {
+                                               return future::err(());
+                                       }
+                               },
+                               NetworkMessage::Addr(addrs) => {
+                                       unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
+                               },
+                               NetworkMessage::Headers(headers) => {
+                                       if headers.is_empty() {
+                                               return future::ok(());
+                                       }
+                                       let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
+                                       let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap();
+                                       if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() {
+                                               for i in 0..headers.len() {
+                                                       let hash = headers[i].bitcoin_hash();
+                                                       if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash {
+                                                               return future::err(());
+                                                       }
+                                                       header_map.insert(headers[i].bitcoin_hash(), height + 1 + (i as u64));
+                                                       height_map.insert(height + 1 + (i as u64), headers[i].bitcoin_hash());
+                                               }
+                                               let top_height = height + headers.len() as u64;
+                                               *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap()
+                                                       = (headers.last().unwrap().bitcoin_hash(), top_height);
+                                               printer.set_stat(printer::Stat::HeaderCount(top_height));
+                                               if top_height >= starting_height as u64 {
+                                                       scan_net();
+                                               }
+                                       } else {
+                                               // Wat? Lets start again...
+                                               printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true);
+                                       }
+                                       if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
+                                               version: 70015,
+                                               locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
+                                               stop_hash: Default::default(),
+                                       })) {
+                                               return future::err(())
+                                       }
+                               },
+                               NetworkMessage::Ping(v) => {
+                                       if let Err(_) = trusted_write.try_send(NetworkMessage::Pong(v)) {
+                                               return future::err(())
+                                       }
+                               },
+                               _ => {},
+                       }
+                       future::ok(())
+               }).then(|_| {
+                       future::err(())
+               })
+       }).then(move |_: Result<(), ()>| {
+               printer.add_line("Lost connection from trusted peer".to_string(), true);
+               make_trusted_conn(trusted_sockaddr);
+               future::ok(())
+       }));
+}
+
+fn main() {
+       if env::args().len() != 3 {
+               println!("USAGE: dnsseed-rust datastore localPeerAddress");
+               return;
+       }
+
+       unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::new()))) };
+       unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::new()))) };
+       unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).bitcoin_hash(), 0);
+       unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).bitcoin_hash());
+       unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).bitcoin_hash(), 0)))) };
+
+       unsafe { DATA_STORE = Some(Box::new(Store::new())) };
+       unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) };
+
+       tokio::run(future::lazy(|| {
+               let mut args = env::args();
+               args.next();
+               let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
+               make_trusted_conn(trusted_sockaddr);
+
+               future::ok(())
+       }));
+}
diff --git a/src/peer.rs b/src/peer.rs
new file mode 100644 (file)
index 0000000..95028ed
--- /dev/null
@@ -0,0 +1,130 @@
+use std::cmp;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+use bitcoin::consensus::encode;
+use bitcoin::consensus::encode::{Decodable, Encodable};
+use bitcoin::network::address::Address;
+use bitcoin::network::constants::Network;
+use bitcoin::network::message::{RawNetworkMessage, NetworkMessage};
+use bitcoin::network::message_network::VersionMessage;
+
+use tokio::prelude::*;
+use tokio::codec;
+use tokio::codec::Framed;
+use tokio::net::TcpStream;
+use tokio::timer::Delay;
+
+use futures::sync::mpsc;
+
+use crate::printer::Printer;
+use crate::timeout_stream::TimeoutStream;
+
+struct BytesCoder<'a>(&'a mut bytes::BytesMut);
+impl<'a> std::io::Write for BytesCoder<'a> {
+       fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
+               self.0.extend_from_slice(&b);
+               Ok(b.len())
+       }
+       fn flush(&mut self) -> Result<(), std::io::Error> {
+               Ok(())
+       }
+}
+struct BytesDecoder<'a> {
+       buf: &'a mut bytes::BytesMut,
+       pos: usize,
+}
+impl<'a> std::io::Read for BytesDecoder<'a> {
+       fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
+               let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
+               b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
+               self.pos += copy_len;
+               Ok(copy_len)
+       }
+}
+
+struct MsgCoder<'a>(&'a Printer);
+impl<'a> codec::Decoder for MsgCoder<'a> {
+       type Item = NetworkMessage;
+       type Error = std::io::Error;
+
+       fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, std::io::Error> {
+               let mut decoder = BytesDecoder {
+                       buf: bytes,
+                       pos: 0
+               };
+               match RawNetworkMessage::consensus_decode(&mut decoder) {
+                       Ok(res) => {
+                               decoder.buf.advance(decoder.pos);
+                               if res.magic == Network::Bitcoin.magic() {
+                                       Ok(Some(res.payload))
+                               } else {
+                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic"))
+                               }
+                       },
+                       Err(e) => match e {
+                               encode::Error::Io(_) => Ok(None),
+                               encode::Error::UnrecognizedNetworkCommand(_msg) => {
+                                       decoder.buf.advance(decoder.pos);
+                                       //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
+                                       Ok(None)
+                               },
+                               _ => {
+                                       self.0.add_line(format!("Error decoding message: {:?}", e), true);
+                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
+                               },
+                       }
+               }
+       }
+}
+impl<'a> codec::Encoder for MsgCoder<'a> {
+       type Item = NetworkMessage;
+       type Error = std::io::Error;
+
+       fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
+               if let Err(_) = (RawNetworkMessage {
+                       magic: Network::Bitcoin.magic(),
+                       payload: msg,
+               }.consensus_encode(&mut BytesCoder(res))) {
+                       //XXX
+               }
+               Ok(())
+       }
+}
+
+pub struct Peer {
+       
+}
+
+impl Peer {
+       pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=std::io::Error>)> {
+               let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
+                       future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
+               });
+               TcpStream::connect(&addr).select(connect_timeout)
+                       .or_else(move |_| {
+                               Delay::new(Instant::now() + timeout / 10).then(|_| {
+                                       future::err(())
+                               })
+                       }).and_then(move |stream| {
+                               let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
+                               let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
+                               tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
+                                       .then(|_| {
+                                               future::err(())
+                                       }));
+                               let _ = sender.try_send(NetworkMessage::Version(VersionMessage {
+                                       version: 70015,
+                                       services: (1 << 3), // NODE_WITNESS
+                                       timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64,
+                                       receiver: Address::new(&addr, 0),
+                                       sender: Address::new(&"0.0.0.0:0".parse().unwrap(), 0),
+                                       nonce: 0xdeadbeef,
+                                       user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
+                                       start_height: 0,
+                                       relay: true,
+                               }));
+                               future::ok((sender, TimeoutStream::new(read, timeout)))
+                       })
+       }
+}
diff --git a/src/printer.rs b/src/printer.rs
new file mode 100644 (file)
index 0000000..10d2150
--- /dev/null
@@ -0,0 +1,162 @@
+use std::collections::LinkedList;
+use std::sync::{Arc, Mutex};
+use std::io::Write;
+
+use crate::datastore::{Store, AddressState, U64Setting, StringSetting};
+
+pub enum Stat {
+       HeaderCount(u64),
+       NewConnection,
+       ConnectionClosed,
+}
+
+struct Stats {
+       lines: LinkedList<String>,
+       header_count: u64,
+       connection_count: u64,
+}
+
+pub struct Printer {
+       stats: Arc<Mutex<Stats>>,
+}
+
+impl Printer {
+       pub fn new(store: &'static Store) -> Printer {
+               let stats: Arc<Mutex<Stats>> = Arc::new(Mutex::new(Stats {
+                       lines: LinkedList::new(),
+                       header_count: 0,
+                       connection_count: 0,
+               }));
+               let thread_arc = Arc::clone(&stats);
+               std::thread::spawn(move || {
+                       loop {
+                               std::thread::sleep(std::time::Duration::from_secs(1));
+
+                               let stdout = std::io::stdout();
+                               let mut out = stdout.lock();
+
+                               let stats = thread_arc.lock().unwrap();
+
+                               out.write_all(b"\x1b[2J\x1b[;H\n").expect("stdout broken?");
+                               for line in stats.lines.iter() {
+                                       out.write_all(line.as_bytes()).expect("stdout broken?");
+                                       out.write_all(b"\n").expect("stdout broken?");
+                               }
+
+                               out.write_all(b"\nNode counts by status:\n").expect("stdout broken?");
+                               out.write_all(format!("Untested:               {}\n", store.get_node_count(AddressState::Untested)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Low Block Count:        {}\n", store.get_node_count(AddressState::LowBlockCount)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("High Block Count:       {}\n", store.get_node_count(AddressState::HighBlockCount)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Low Version:            {}\n", store.get_node_count(AddressState::LowVersion)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Bad Version:            {}\n", store.get_node_count(AddressState::BadVersion)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Not Full Node:          {}\n", store.get_node_count(AddressState::NotFullNode)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Protocol Violation:     {}\n", store.get_node_count(AddressState::ProtocolViolation)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Timeout:                {}\n", store.get_node_count(AddressState::Timeout)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Timeout During Request: {}\n", store.get_node_count(AddressState::TimeoutDuringRequest)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("Good:                   {}\n", store.get_node_count(AddressState::Good)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!("WasGood:                {}\n", store.get_node_count(AddressState::WasGood)
+                                               ).as_bytes()).expect("stdout broken?");
+
+                               out.write_all(format!(
+                                               "\nCurrent connections open/in progress: {}\n", stats.connection_count).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Connections opened each second: {} (\"c x\" to change to x seconds)\n", store.get_u64(U64Setting::ConnsPerSec)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Current block count: {}\n", stats.header_count).as_bytes()).expect("stdout broken?");
+
+                               out.write_all(format!(
+                                               "Timeout for full run (in seconds): {} (\"t x\" to change to x seconds)\n", store.get_u64(U64Setting::RunTimeout)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Minimum protocol version: {} (\"v x\" to change value to x)\n", store.get_u64(U64Setting::MinProtocolVersion)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Subversion match regex: {} (\"s x\" to change value to x)\n", store.get_string(StringSetting::SubverRegex)
+                                               ).as_bytes()).expect("stdout broken?");
+
+                               out.write_all(b"\nRetry times (in seconds):\n").expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Untested:               {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Untested))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Low Block Count:        {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "High Block Count        {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Low Version:            {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::LowVersion))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Bad Version:            {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::BadVersion))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Not Full Node:          {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Protocol Violation:     {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Timeout:                {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Timeout))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Timeout During Request: {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Good:                   {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::Good))
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "Was Good:               {}\n", store.get_u64(U64Setting::RescanInterval(AddressState::WasGood))
+                                               ).as_bytes()).expect("stdout broken?");
+
+                               out.write_all(b"\nCommands:\n").expect("stdout broken?");
+                               out.write_all(b"q: quit\n").expect("stdout broken?");
+                               out.write_all(format!(
+                                               "r x y: Change retry time for status x (int value, see retry times section for name mappings) to y (in seconds)\n"
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(format!(
+                                               "w x: Change the amount of time a node is considered WAS_GOOD after it fails to x from {} (in seconds)\n",
+                                               store.get_u64(U64Setting::WasGoodTimeout)
+                                               ).as_bytes()).expect("stdout broken?");
+                               out.write_all(b"p: Enable/disable updating these stats\n").expect("stdout broken?");
+                               out.write_all(b"a x: Scan node x\n").expect("stdout broken?");
+                               out.write_all(b"\x1b[s").expect("stdout broken?"); // Save cursor position and provide a blank line before cursor
+                               out.write_all(b"\x1b[;H\x1b[2K").expect("stdout broken?");
+                               out.write_all(b"Most recent log:\n").expect("stdout broken?");
+                               out.write_all(b"\x1b[u").expect("stdout broken?"); // Restore cursor position and go up one line
+
+                               out.flush().expect("stdout broken?");
+                       }
+               });
+               Printer {
+                       stats,
+               }
+       }
+
+       pub fn add_line(&self, line: String, _err: bool) {
+               let mut stats = self.stats.lock().unwrap();
+               stats.lines.push_back(line);
+               if stats.lines.len() > 50 {
+                       stats.lines.pop_front();
+               }
+       }
+
+       pub fn set_stat(&self, s: Stat) {
+               match s {
+                       Stat::HeaderCount(c) => self.stats.lock().unwrap().header_count = c,
+                       Stat::NewConnection => self.stats.lock().unwrap().connection_count += 1,
+                       Stat::ConnectionClosed => self.stats.lock().unwrap().connection_count -= 1,
+               }
+       }
+}
diff --git a/src/timeout_stream.rs b/src/timeout_stream.rs
new file mode 100644 (file)
index 0000000..57664f8
--- /dev/null
@@ -0,0 +1,42 @@
+use tokio::prelude::*;
+use tokio::timer::Delay;
+
+use std::time::{Duration, Instant};
+
+pub struct TimeoutStream<S> where S : Stream {
+       stream: S,
+       next_deadline: Delay,
+       timeout: Duration,
+}
+
+impl<S> TimeoutStream<S> where S : Stream {
+       pub fn new(stream: S, timeout: Duration) -> Self {
+               let next_deadline = Delay::new(Instant::now() + timeout);
+               Self {
+                       stream,
+                       next_deadline,
+                       timeout,
+               }
+       }
+}
+
+impl<S> Stream for TimeoutStream<S> where S : Stream {
+       type Item = S::Item;
+       type Error = S::Error;
+       fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
+               match self.next_deadline.poll() {
+                       Ok(Async::Ready(_)) => Ok(Async::Ready(None)),
+                       Ok(Async::NotReady) => {
+                               match self.stream.poll() {
+                                       Ok(Async::Ready(v)) => {
+                                               self.next_deadline.reset(Instant::now() + self.timeout);
+                                               Ok(Async::Ready(v))
+                                       },
+                                       Ok(Async::NotReady) => Ok(Async::NotReady),
+                                       Err(e) => Err(e),
+                               }
+                       },
+                       Err(_) => Ok(Async::Ready(None)), // TODO: If I want to upstream TimeoutStream this is gonna need some love
+               }
+       }
+}