mod printer;
+mod reader;
mod peer;
mod timeout_stream;
mod datastore;
use std::env;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{Ordering, AtomicBool};
use std::time::{Duration, Instant};
use std::net::SocketAddr;
use printer::{Printer, Stat};
use peer::Peer;
-use datastore::{AddressState, Store, U64Setting};
+use datastore::{AddressState, Store, U64Setting, RegexSetting};
+use timeout_stream::TimeoutStream;
use tokio::prelude::*;
use tokio::timer::Delay;
static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
static mut DATA_STORE: Option<Box<Store>> = None;
static mut PRINTER: Option<Box<Printer>> = None;
+pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
struct PeerState {
recvd_version: bool,
request: (u64, sha256d::Hash),
}
-fn scan_node(scan_time: Instant, node: SocketAddr) {
+pub fn scan_node(scan_time: Instant, node: SocketAddr) {
+ if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
let printer = unsafe { PRINTER.as_ref().unwrap() };
let store = unsafe { DATA_STORE.as_ref().unwrap() };
let peer = Delay::new(scan_time).then(move |_| {
printer.set_stat(Stat::NewConnection);
let timeout = store.get_u64(U64Setting::RunTimeout);
- Peer::new(node.clone(), Duration::from_secs(timeout), printer) //TODO: timeout for total run
+ Peer::new(node.clone(), Duration::from_secs(timeout), printer)
});
- tokio::spawn(peer.and_then(move |conn_split| {
+ tokio::spawn(peer.and_then(move |(mut write, read)| {
let requested_height = unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 1008;
let requested_block = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().get(&requested_height).unwrap().clone();
peer_state.lock().unwrap().request = (requested_height, requested_block);
- let (mut write, read) = conn_split;
- read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
let mut state_lock = peer_state.lock().unwrap();
macro_rules! check_set_flag {
($recvd_flag: ident, $msg: expr) => { {
state_lock.fail_reason = AddressState::LowVersion;
return future::err(());
}
- if ver.services & 1 != 1 {
+ if ver.services & (1 | (1 << 10)) == 0 {
printer.add_line(format!("Updating {} to NotFullNode (services {:x})", node, ver.services), true);
state_lock.fail_reason = AddressState::NotFullNode;
return future::err(());
}
+ if !store.get_regex(RegexSetting::SubverRegex).is_match(&ver.user_agent) {
+ printer.add_line(format!("Updating {} to BadVersion subver {}", node, ver.user_agent.replace(|c: char| !c.is_ascii() || c < ' ' || c > '~', "")), true);
+ state_lock.fail_reason = AddressState::BadVersion;
+ return future::err(());
+ }
check_set_flag!(recvd_version, "version");
state_lock.node_services = ver.services;
if let Err(_) = write.try_send(NetworkMessage::Verack) {
}
},
NetworkMessage::Addr(addrs) => {
- if addrs.len() > 1 {
- check_set_flag!(recvd_addrs, "addr");
- unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
+ if addrs.len() > 1000 {
+ state_lock.fail_reason = AddressState::ProtocolViolation;
+ printer.add_line(format!("Updating {} to ProtocolViolation due to oversized addr: {}", node, addrs.len()), true);
+ state_lock.recvd_addrs = false;
+ return future::err(());
}
+ state_lock.recvd_addrs = true;
+ unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
},
NetworkMessage::Block(block) => {
if block.header.bitcoin_hash() != state_lock.request.1 ||
Delay::new(iter_time).then(|_| {
let store = unsafe { DATA_STORE.as_ref().unwrap() };
store.save_data().then(|_| {
- scan_net();
+ if !START_SHUTDOWN.load(Ordering::Relaxed) {
+ scan_net();
+ }
future::ok(())
})
})
fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
let printer = unsafe { PRINTER.as_ref().unwrap() };
let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
- tokio::spawn(trusted_peer.and_then(move |trusted_split| {
+ tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
printer.add_line("Connected to local peer".to_string(), false);
- let (mut trusted_write, trusted_read) = trusted_split;
let mut starting_height = 0;
- trusted_read.map_err(|_| { () }).for_each(move |msg| {
+ TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
+ if START_SHUTDOWN.load(Ordering::Relaxed) {
+ return future::err(());
+ }
match msg {
NetworkMessage::Version(ver) => {
if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
future::err(())
})
}).then(move |_: Result<(), ()>| {
- printer.add_line("Lost connection from trusted peer".to_string(), true);
- make_trusted_conn(trusted_sockaddr);
+ if !START_SHUTDOWN.load(Ordering::Relaxed) {
+ printer.add_line("Lost connection from trusted peer".to_string(), true);
+ make_trusted_conn(trusted_sockaddr);
+ }
future::ok(())
}));
}
Store::new(path).and_then(move |store| {
unsafe { DATA_STORE = Some(Box::new(store)) };
- unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) };
+ let store = unsafe { DATA_STORE.as_ref().unwrap() };
+ unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
let trusted_sockaddr: SocketAddr = addr.parse().unwrap();
make_trusted_conn(trusted_sockaddr);
+ reader::read(store, unsafe { PRINTER.as_ref().unwrap() });
+
future::ok(())
}).or_else(|_| {
future::err(())
})
}));
+
+ tokio::run(future::lazy(|| {
+ unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
+ }));
}