From b322239c1f9352cf5b58c4cd82c3544edd024ef7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 19 May 2019 14:41:49 -0400 Subject: [PATCH] implement storing/reading data in human-readable data files --- src/datastore.rs | 250 +++++++++++++++++++++++++++++++++++++++-------- src/main.rs | 26 +++-- 2 files changed, 227 insertions(+), 49 deletions(-) diff --git a/src/datastore.rs b/src/datastore.rs index 4bab7ef..886f32f 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -3,12 +3,17 @@ use std::collections::{HashSet, HashMap, hash_map}; use std::sync::RwLock; use std::net::SocketAddr; use std::time::{Duration, Instant}; +use std::io::{BufRead, BufReader}; use bitcoin::network::address::Address; use rand::thread_rng; use rand::seq::SliceRandom; +use tokio::prelude::*; +use tokio::fs::File; +use tokio::io::write_all; + #[derive(Clone, Copy, Hash, PartialEq, Eq)] pub enum AddressState { Untested, @@ -69,51 +74,153 @@ pub struct Store { u64_settings: RwLock>, subver_regex: RwLock, nodes: RwLock, + store: String, } impl Store { - pub fn new() -> Store { - let mut u64s = HashMap::with_capacity(15); - u64s.insert(U64Setting::ConnsPerSec, 50); - u64s.insert(U64Setting::RunTimeout, 120); - u64s.insert(U64Setting::WasGoodTimeout, 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0); - u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600); - u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200); - u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400); - u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400); - u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400); - u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800); - u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800); - u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX - let mut state_vecs = HashMap::with_capacity(11); - state_vecs.insert(AddressState::Untested, Vec::new()); - state_vecs.insert(AddressState::LowBlockCount, Vec::new()); - state_vecs.insert(AddressState::HighBlockCount, Vec::new()); - state_vecs.insert(AddressState::LowVersion, Vec::new()); - state_vecs.insert(AddressState::BadVersion, Vec::new()); - state_vecs.insert(AddressState::NotFullNode, Vec::new()); - state_vecs.insert(AddressState::ProtocolViolation, Vec::new()); - state_vecs.insert(AddressState::Timeout, Vec::new()); - state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new()); - state_vecs.insert(AddressState::Good, Vec::new()); - state_vecs.insert(AddressState::WasGood, Vec::new()); - let mut good_node_services = HashMap::with_capacity(64); - for i in 0..64 { - good_node_services.insert(i, HashSet::new()); - } - Store { - u64_settings: RwLock::new(u64s), - subver_regex: RwLock::new(".*".to_string()), - nodes: RwLock::new(Nodes { - good_node_services, - nodes_to_state: HashMap::new(), - state_next_scan: state_vecs, - }), + pub fn new(store: String) -> impl Future { + let settings_future = File::open(store.clone() + "/settings").and_then(|f| { + let mut l = BufReader::new(f).lines(); + macro_rules! try_read { + ($lines: expr, $ty: ty) => { { + match $lines.next() { + Some(line) => match line { + Ok(line) => match line.parse::<$ty>() { + Ok(res) => res, + Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + }, + Err(e) => return future::err(e), + }, + None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")), + } + } } + } + let mut u64s = HashMap::with_capacity(15); + u64s.insert(U64Setting::ConnsPerSec, try_read!(l, u64)); + u64s.insert(U64Setting::RunTimeout, try_read!(l, u64)); + u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64)); + u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64)); + future::ok((u64s, try_read!(l, String))) + }).or_else(|_| -> future::FutureResult<(HashMap, String), ()> { + let mut u64s = HashMap::with_capacity(15); + u64s.insert(U64Setting::ConnsPerSec, 50); + u64s.insert(U64Setting::RunTimeout, 120); + u64s.insert(U64Setting::WasGoodTimeout, 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0); + u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600); + u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200); + u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600); + u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800); + u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800); + u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX + future::ok((u64s, ".*".to_string())) + }); + + macro_rules! nodes_uninitd { + () => { { + let mut state_vecs = HashMap::with_capacity(11); + state_vecs.insert(AddressState::Untested, Vec::new()); + state_vecs.insert(AddressState::LowBlockCount, Vec::new()); + state_vecs.insert(AddressState::HighBlockCount, Vec::new()); + state_vecs.insert(AddressState::LowVersion, Vec::new()); + state_vecs.insert(AddressState::BadVersion, Vec::new()); + state_vecs.insert(AddressState::NotFullNode, Vec::new()); + state_vecs.insert(AddressState::ProtocolViolation, Vec::new()); + state_vecs.insert(AddressState::Timeout, Vec::new()); + state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new()); + state_vecs.insert(AddressState::Good, Vec::new()); + state_vecs.insert(AddressState::WasGood, Vec::new()); + let mut good_node_services = HashMap::with_capacity(64); + for i in 0..64 { + good_node_services.insert(i, HashSet::new()); + } + Nodes { + good_node_services, + nodes_to_state: HashMap::new(), + state_next_scan: state_vecs, + } + } } } + + let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| { + let mut res = nodes_uninitd!(); + let l = BufReader::new(f).lines(); + for line_res in l { + let line = match line_res { + Ok(l) => l, + Err(_) => return future::ok(res), + }; + let mut line_iter = line.split(','); + macro_rules! try_read { + ($lines: expr, $ty: ty) => { { + match $lines.next() { + Some(line) => match line.parse::<$ty>() { + Ok(res) => res, + Err(_) => return future::ok(res), + }, + None => return future::ok(res), + } + } } + } + let sockaddr = try_read!(line_iter, SocketAddr); + let state = try_read!(line_iter, u8); + let last_services = try_read!(line_iter, u64); + let node = Node { + state: match state { + 0x0 => AddressState::Untested, + 0x1 => AddressState::LowBlockCount, + 0x2 => AddressState::HighBlockCount, + 0x3 => AddressState::LowVersion, + 0x4 => AddressState::BadVersion, + 0x5 => AddressState::NotFullNode, + 0x6 => AddressState::ProtocolViolation, + 0x7 => AddressState::Timeout, + 0x8 => AddressState::TimeoutDuringRequest, + 0x9 => AddressState::Good, + 0xa => AddressState::WasGood, + _ => return future::ok(res), + }, + last_services, + last_update: Instant::now(), + }; + if node.state == AddressState::Good { + for i in 0..64 { + if node.last_services & (1 << i) != 0 { + res.good_node_services.get_mut(&i).unwrap().insert(sockaddr); + } + } + } + res.state_next_scan.get_mut(&node.state).unwrap().push((Instant::now(), sockaddr)); + res.nodes_to_state.insert(sockaddr, node); + } + future::ok(res) + }).or_else(|_| -> future::FutureResult { + future::ok(nodes_uninitd!()) + }); + settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| { + future::ok(Store { + u64_settings: RwLock::new(u64_settings), + subver_regex: RwLock::new(regex), + nodes: RwLock::new(nodes), + store, + }) + }) } pub fn get_u64(&self, setting: U64Setting) -> u64 { @@ -179,6 +286,67 @@ impl Store { } } + pub fn save_data(&'static self) -> impl Future { + let settings_file = self.store.clone() + "/settings"; + let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| { + let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + self.get_u64(U64Setting::ConnsPerSec), + self.get_u64(U64Setting::RunTimeout), + self.get_u64(U64Setting::WasGoodTimeout), + self.get_u64(U64Setting::MinProtocolVersion), + self.get_u64(U64Setting::RescanInterval(AddressState::Untested)), + self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)), + self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)), + self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)), + self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)), + self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)), + self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)), + self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)), + self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)), + self.get_u64(U64Setting::RescanInterval(AddressState::Good)), + self.get_u64(U64Setting::RescanInterval(AddressState::WasGood))); + write_all(f, settings_string).and_then(|(mut f, _)| { + f.poll_sync_all() + }).and_then(|_| { + tokio::fs::rename(settings_file.clone() + ".tmp", settings_file) + }) + }); + + let nodes_file = self.store.clone() + "/nodes"; + let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| { + let mut nodes_buff = String::new(); + { + let nodes = self.nodes.read().unwrap(); + nodes_buff.reserve(nodes.nodes_to_state.len() * 20); + for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() { + nodes_buff += &sockaddr.to_string(); + nodes_buff += ","; + nodes_buff += &match node.state { + AddressState::Untested => 0u8, + AddressState::LowBlockCount => 1u8, + AddressState::HighBlockCount => 2u8, + AddressState::LowVersion => 3u8, + AddressState::BadVersion => 4u8, + AddressState::NotFullNode => 5u8, + AddressState::ProtocolViolation => 6u8, + AddressState::Timeout => 7u8, + AddressState::TimeoutDuringRequest => 8u8, + AddressState::Good => 9u8, + AddressState::WasGood => 10u8, + }.to_string(); + nodes_buff += &node.last_services.to_string(); + nodes_buff += "\n"; + } + } + write_all(f, nodes_buff) + }).and_then(|(mut f, _)| { + f.poll_sync_all() + }).and_then(|_| { + tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file) + }); + settings_future.join(nodes_future).then(|_| { future::ok(()) }) + } + pub fn get_next_scan_nodes(&self) -> Vec { let mut res = Vec::with_capacity(600); let cur_time = Instant::now(); diff --git a/src/main.rs b/src/main.rs index c6a86a6..438d40f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -175,8 +175,11 @@ fn scan_net() { iter_time += per_iter_time; } Delay::new(iter_time).then(|_| { - scan_net(); - future::ok(()) + let store = unsafe { DATA_STORE.as_ref().unwrap() }; + store.save_data().then(|_| { + scan_net(); + future::ok(()) + }) }) })); } @@ -275,15 +278,22 @@ fn main() { unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).bitcoin_hash()); unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).bitcoin_hash(), 0)))) }; - unsafe { DATA_STORE = Some(Box::new(Store::new())) }; - unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) }; - tokio::run(future::lazy(|| { let mut args = env::args(); args.next(); - let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); - make_trusted_conn(trusted_sockaddr); + let path = args.next().unwrap(); + let addr = args.next().unwrap(); - future::ok(()) + Store::new(path).and_then(move |store| { + unsafe { DATA_STORE = Some(Box::new(store)) }; + unsafe { PRINTER = Some(Box::new(Printer::new(DATA_STORE.as_ref().unwrap()))) }; + + let trusted_sockaddr: SocketAddr = addr.parse().unwrap(); + make_trusted_conn(trusted_sockaddr); + + future::ok(()) + }).or_else(|_| { + future::err(()) + }) })); } -- 2.39.5