2 use std::collections::{HashSet, HashMap, hash_map};
4 use std::net::SocketAddr;
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
8 use bitcoin::network::address::Address;
11 use rand::seq::SliceRandom;
13 use tokio::prelude::*;
15 use tokio::io::write_all;
17 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
18 pub enum AddressState {
32 #[derive(Hash, PartialEq, Eq)]
37 RescanInterval(AddressState),
41 #[derive(Hash, PartialEq, Eq)]
42 pub enum StringSetting {
53 good_node_services: HashMap<u8, HashSet<SocketAddr>>,
54 nodes_to_state: HashMap<SocketAddr, Node>,
55 state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
57 struct NodesMutRef<'a> {
58 good_node_services: &'a mut HashMap<u8, HashSet<SocketAddr>>,
59 nodes_to_state: &'a mut HashMap<SocketAddr, Node>,
60 state_next_scan: &'a mut HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
64 fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
66 good_node_services: &mut self.good_node_services,
67 nodes_to_state: &mut self.nodes_to_state,
68 state_next_scan: &mut self.state_next_scan,
74 u64_settings: RwLock<HashMap<U64Setting, u64>>,
75 subver_regex: RwLock<String>,
81 pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
82 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
83 let mut l = BufReader::new(f).lines();
84 macro_rules! try_read {
85 ($lines: expr, $ty: ty) => { {
87 Some(line) => match line {
88 Ok(line) => match line.parse::<$ty>() {
90 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
92 Err(e) => return future::err(e),
94 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
98 let mut u64s = HashMap::with_capacity(15);
99 u64s.insert(U64Setting::ConnsPerSec, try_read!(l, u64));
100 u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
101 u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
102 u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
103 u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
104 u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
105 u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
106 u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
107 u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
108 u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
109 u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
110 u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
111 u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
112 u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
113 u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
114 future::ok((u64s, try_read!(l, String)))
115 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, String), ()> {
116 let mut u64s = HashMap::with_capacity(15);
117 u64s.insert(U64Setting::ConnsPerSec, 50);
118 u64s.insert(U64Setting::RunTimeout, 120);
119 u64s.insert(U64Setting::WasGoodTimeout, 21600);
120 u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
121 u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
122 u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
123 u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
124 u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
125 u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
126 u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
127 u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
128 u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
129 u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
130 u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
131 u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
132 future::ok((u64s, ".*".to_string()))
135 macro_rules! nodes_uninitd {
137 let mut state_vecs = HashMap::with_capacity(11);
138 state_vecs.insert(AddressState::Untested, Vec::new());
139 state_vecs.insert(AddressState::LowBlockCount, Vec::new());
140 state_vecs.insert(AddressState::HighBlockCount, Vec::new());
141 state_vecs.insert(AddressState::LowVersion, Vec::new());
142 state_vecs.insert(AddressState::BadVersion, Vec::new());
143 state_vecs.insert(AddressState::NotFullNode, Vec::new());
144 state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
145 state_vecs.insert(AddressState::Timeout, Vec::new());
146 state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
147 state_vecs.insert(AddressState::Good, Vec::new());
148 state_vecs.insert(AddressState::WasGood, Vec::new());
149 let mut good_node_services = HashMap::with_capacity(64);
151 good_node_services.insert(i, HashSet::new());
155 nodes_to_state: HashMap::new(),
156 state_next_scan: state_vecs,
161 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
162 let mut res = nodes_uninitd!();
163 let l = BufReader::new(f).lines();
165 let line = match line_res {
167 Err(_) => return future::ok(res),
169 let mut line_iter = line.split(',');
170 macro_rules! try_read {
171 ($lines: expr, $ty: ty) => { {
172 match $lines.next() {
173 Some(line) => match line.parse::<$ty>() {
175 Err(_) => return future::ok(res),
177 None => return future::ok(res),
181 let sockaddr = try_read!(line_iter, SocketAddr);
182 let state = try_read!(line_iter, u8);
183 let last_services = try_read!(line_iter, u64);
186 0x0 => AddressState::Untested,
187 0x1 => AddressState::LowBlockCount,
188 0x2 => AddressState::HighBlockCount,
189 0x3 => AddressState::LowVersion,
190 0x4 => AddressState::BadVersion,
191 0x5 => AddressState::NotFullNode,
192 0x6 => AddressState::ProtocolViolation,
193 0x7 => AddressState::Timeout,
194 0x8 => AddressState::TimeoutDuringRequest,
195 0x9 => AddressState::Good,
196 0xa => AddressState::WasGood,
197 _ => return future::ok(res),
200 last_update: Instant::now(),
202 if node.state == AddressState::Good {
204 if node.last_services & (1 << i) != 0 {
205 res.good_node_services.get_mut(&i).unwrap().insert(sockaddr);
209 res.state_next_scan.get_mut(&node.state).unwrap().push((Instant::now(), sockaddr));
210 res.nodes_to_state.insert(sockaddr, node);
213 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
214 future::ok(nodes_uninitd!())
216 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
218 u64_settings: RwLock::new(u64_settings),
219 subver_regex: RwLock::new(regex),
220 nodes: RwLock::new(nodes),
226 pub fn get_u64(&self, setting: U64Setting) -> u64 {
227 *self.u64_settings.read().unwrap().get(&setting).unwrap()
230 pub fn get_node_count(&self, state: AddressState) -> usize {
231 self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
234 pub fn get_string(&self, _setting: StringSetting) -> String {
235 self.subver_regex.read().unwrap().clone()
238 pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
239 let mut nodes = self.nodes.write().unwrap();
240 let cur_time = Instant::now();
241 for &(_, ref addr) in addresses {
242 if let Ok(socketaddr) = addr.socket_addr() {
243 match nodes.nodes_to_state.entry(socketaddr.clone()) {
244 hash_map::Entry::Vacant(e) => {
246 state: AddressState::Untested,
248 last_update: cur_time,
250 nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
252 hash_map::Entry::Occupied(_) => {},
255 //TODO: Handle onions
260 pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
261 let mut nodes_lock = self.nodes.write().unwrap();
262 let nodes = nodes_lock.borrow_mut();
263 let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
264 state_ref.last_update = Instant::now();
265 if state_ref.state == AddressState::Good && state != AddressState::Good {
266 state_ref.state = AddressState::WasGood;
268 if state_ref.last_services & (1 << i) != 0 {
269 nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
272 state_ref.last_services = 0;
273 nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((state_ref.last_update, addr));
275 state_ref.state = state;
276 if state == AddressState::Good {
278 if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
279 nodes.good_node_services.get_mut(&i).unwrap().insert(addr);
280 } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
281 nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
285 nodes.state_next_scan.get_mut(&state).unwrap().push((state_ref.last_update, addr));
289 pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
290 let settings_file = self.store.clone() + "/settings";
291 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
292 let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
293 self.get_u64(U64Setting::ConnsPerSec),
294 self.get_u64(U64Setting::RunTimeout),
295 self.get_u64(U64Setting::WasGoodTimeout),
296 self.get_u64(U64Setting::MinProtocolVersion),
297 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
298 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
299 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
300 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
301 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
302 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
303 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
304 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
305 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
306 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
307 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)));
308 write_all(f, settings_string).and_then(|(mut f, _)| {
311 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
315 let nodes_file = self.store.clone() + "/nodes";
316 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
317 let mut nodes_buff = String::new();
319 let nodes = self.nodes.read().unwrap();
320 nodes_buff.reserve(nodes.nodes_to_state.len() * 20);
321 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
322 nodes_buff += &sockaddr.to_string();
324 nodes_buff += &match node.state {
325 AddressState::Untested => 0u8,
326 AddressState::LowBlockCount => 1u8,
327 AddressState::HighBlockCount => 2u8,
328 AddressState::LowVersion => 3u8,
329 AddressState::BadVersion => 4u8,
330 AddressState::NotFullNode => 5u8,
331 AddressState::ProtocolViolation => 6u8,
332 AddressState::Timeout => 7u8,
333 AddressState::TimeoutDuringRequest => 8u8,
334 AddressState::Good => 9u8,
335 AddressState::WasGood => 10u8,
337 nodes_buff += &node.last_services.to_string();
341 write_all(f, nodes_buff)
342 }).and_then(|(mut f, _)| {
345 tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
347 settings_future.join(nodes_future).then(|_| { future::ok(()) })
350 pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
351 let mut res = Vec::with_capacity(600);
352 let cur_time = Instant::now();
353 let mut nodes = self.nodes.write().unwrap();
354 for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
355 let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
356 let split_point = cmp::min(cmp::min(600 - res.len(), 60),
357 state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
358 let mut new_nodes = state_nodes.split_off(split_point);
359 mem::swap(&mut new_nodes, state_nodes);
360 for (_, node) in new_nodes.drain(..) {
364 res.shuffle(&mut thread_rng());