use std::sync::RwLock;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
+use std::io::{BufRead, BufReader};
use bitcoin::network::address::Address;
use rand::thread_rng;
use rand::seq::SliceRandom;
+use tokio::prelude::*;
+use tokio::fs::File;
+use tokio::io::write_all;
+
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
pub enum AddressState {
Untested,
u64_settings: RwLock<HashMap<U64Setting, u64>>,
subver_regex: RwLock<String>,
nodes: RwLock<Nodes>,
+ store: String,
}
impl Store {
- pub fn new() -> Store {
- let mut u64s = HashMap::with_capacity(15);
- u64s.insert(U64Setting::ConnsPerSec, 50);
- u64s.insert(U64Setting::RunTimeout, 120);
- u64s.insert(U64Setting::WasGoodTimeout, 21600);
- u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
- u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
- u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
- u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
- u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
- u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
- u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
- u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
- u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
- u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
- u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
- u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
- let mut state_vecs = HashMap::with_capacity(11);
- state_vecs.insert(AddressState::Untested, Vec::new());
- state_vecs.insert(AddressState::LowBlockCount, Vec::new());
- state_vecs.insert(AddressState::HighBlockCount, Vec::new());
- state_vecs.insert(AddressState::LowVersion, Vec::new());
- state_vecs.insert(AddressState::BadVersion, Vec::new());
- state_vecs.insert(AddressState::NotFullNode, Vec::new());
- state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
- state_vecs.insert(AddressState::Timeout, Vec::new());
- state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
- state_vecs.insert(AddressState::Good, Vec::new());
- state_vecs.insert(AddressState::WasGood, Vec::new());
- let mut good_node_services = HashMap::with_capacity(64);
- for i in 0..64 {
- good_node_services.insert(i, HashSet::new());
- }
- Store {
- u64_settings: RwLock::new(u64s),
- subver_regex: RwLock::new(".*".to_string()),
- nodes: RwLock::new(Nodes {
- good_node_services,
- nodes_to_state: HashMap::new(),
- state_next_scan: state_vecs,
- }),
+ pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
+ let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
+ let mut l = BufReader::new(f).lines();
+ macro_rules! try_read {
+ ($lines: expr, $ty: ty) => { {
+ match $lines.next() {
+ Some(line) => match line {
+ Ok(line) => match line.parse::<$ty>() {
+ Ok(res) => res,
+ Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
+ },
+ Err(e) => return future::err(e),
+ },
+ None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
+ }
+ } }
+ }
+ let mut u64s = HashMap::with_capacity(15);
+ u64s.insert(U64Setting::ConnsPerSec, try_read!(l, u64));
+ u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
+ u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
+ u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
+ u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
+ future::ok((u64s, try_read!(l, String)))
+ }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, String), ()> {
+ let mut u64s = HashMap::with_capacity(15);
+ u64s.insert(U64Setting::ConnsPerSec, 50);
+ u64s.insert(U64Setting::RunTimeout, 120);
+ u64s.insert(U64Setting::WasGoodTimeout, 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
+ u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
+ u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
+ u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
+ u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
+ u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
+ future::ok((u64s, ".*".to_string()))
+ });
+
+ macro_rules! nodes_uninitd {
+ () => { {
+ let mut state_vecs = HashMap::with_capacity(11);
+ state_vecs.insert(AddressState::Untested, Vec::new());
+ state_vecs.insert(AddressState::LowBlockCount, Vec::new());
+ state_vecs.insert(AddressState::HighBlockCount, Vec::new());
+ state_vecs.insert(AddressState::LowVersion, Vec::new());
+ state_vecs.insert(AddressState::BadVersion, Vec::new());
+ state_vecs.insert(AddressState::NotFullNode, Vec::new());
+ state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
+ state_vecs.insert(AddressState::Timeout, Vec::new());
+ state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
+ state_vecs.insert(AddressState::Good, Vec::new());
+ state_vecs.insert(AddressState::WasGood, Vec::new());
+ let mut good_node_services = HashMap::with_capacity(64);
+ for i in 0..64 {
+ good_node_services.insert(i, HashSet::new());
+ }
+ Nodes {
+ good_node_services,
+ nodes_to_state: HashMap::new(),
+ state_next_scan: state_vecs,
+ }
+ } }
}
+
+ let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
+ let mut res = nodes_uninitd!();
+ let l = BufReader::new(f).lines();
+ for line_res in l {
+ let line = match line_res {
+ Ok(l) => l,
+ Err(_) => return future::ok(res),
+ };
+ let mut line_iter = line.split(',');
+ macro_rules! try_read {
+ ($lines: expr, $ty: ty) => { {
+ match $lines.next() {
+ Some(line) => match line.parse::<$ty>() {
+ Ok(res) => res,
+ Err(_) => return future::ok(res),
+ },
+ None => return future::ok(res),
+ }
+ } }
+ }
+ let sockaddr = try_read!(line_iter, SocketAddr);
+ let state = try_read!(line_iter, u8);
+ let last_services = try_read!(line_iter, u64);
+ let node = Node {
+ state: match state {
+ 0x0 => AddressState::Untested,
+ 0x1 => AddressState::LowBlockCount,
+ 0x2 => AddressState::HighBlockCount,
+ 0x3 => AddressState::LowVersion,
+ 0x4 => AddressState::BadVersion,
+ 0x5 => AddressState::NotFullNode,
+ 0x6 => AddressState::ProtocolViolation,
+ 0x7 => AddressState::Timeout,
+ 0x8 => AddressState::TimeoutDuringRequest,
+ 0x9 => AddressState::Good,
+ 0xa => AddressState::WasGood,
+ _ => return future::ok(res),
+ },
+ last_services,
+ last_update: Instant::now(),
+ };
+ if node.state == AddressState::Good {
+ for i in 0..64 {
+ if node.last_services & (1 << i) != 0 {
+ res.good_node_services.get_mut(&i).unwrap().insert(sockaddr);
+ }
+ }
+ }
+ res.state_next_scan.get_mut(&node.state).unwrap().push((Instant::now(), sockaddr));
+ res.nodes_to_state.insert(sockaddr, node);
+ }
+ future::ok(res)
+ }).or_else(|_| -> future::FutureResult<Nodes, ()> {
+ future::ok(nodes_uninitd!())
+ });
+ settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
+ future::ok(Store {
+ u64_settings: RwLock::new(u64_settings),
+ subver_regex: RwLock::new(regex),
+ nodes: RwLock::new(nodes),
+ store,
+ })
+ })
}
pub fn get_u64(&self, setting: U64Setting) -> u64 {
}
}
+ pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
+ let settings_file = self.store.clone() + "/settings";
+ let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
+ let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+ self.get_u64(U64Setting::ConnsPerSec),
+ self.get_u64(U64Setting::RunTimeout),
+ self.get_u64(U64Setting::WasGoodTimeout),
+ self.get_u64(U64Setting::MinProtocolVersion),
+ self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
+ self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)));
+ write_all(f, settings_string).and_then(|(mut f, _)| {
+ f.poll_sync_all()
+ }).and_then(|_| {
+ tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
+ })
+ });
+
+ let nodes_file = self.store.clone() + "/nodes";
+ let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
+ let mut nodes_buff = String::new();
+ {
+ let nodes = self.nodes.read().unwrap();
+ nodes_buff.reserve(nodes.nodes_to_state.len() * 20);
+ for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
+ nodes_buff += &sockaddr.to_string();
+ nodes_buff += ",";
+ nodes_buff += &match node.state {
+ AddressState::Untested => 0u8,
+ AddressState::LowBlockCount => 1u8,
+ AddressState::HighBlockCount => 2u8,
+ AddressState::LowVersion => 3u8,
+ AddressState::BadVersion => 4u8,
+ AddressState::NotFullNode => 5u8,
+ AddressState::ProtocolViolation => 6u8,
+ AddressState::Timeout => 7u8,
+ AddressState::TimeoutDuringRequest => 8u8,
+ AddressState::Good => 9u8,
+ AddressState::WasGood => 10u8,
+ }.to_string();
+ nodes_buff += &node.last_services.to_string();
+ nodes_buff += "\n";
+ }
+ }
+ write_all(f, nodes_buff)
+ }).and_then(|(mut f, _)| {
+ f.poll_sync_all()
+ }).and_then(|_| {
+ tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
+ });
+ settings_future.join(nodes_future).then(|_| { future::ok(()) })
+ }
+
pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
let mut res = Vec::with_capacity(600);
let cur_time = Instant::now();