Store nodes that timed out in a rolling bloom filter
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::convert::TryInto;
3 use std::collections::{HashSet, HashMap, hash_map};
4 use std::sync::{Arc, RwLock};
5 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
6 use std::time::{Duration, Instant};
7 use std::io::{BufRead, BufReader};
8
9 use bitcoin::network::address::{Address, AddrV2Message};
10
11 use rand::thread_rng;
12 use rand::seq::{SliceRandom, IteratorRandom};
13
14 use tokio::prelude::*;
15 use tokio::fs::File;
16 use tokio::io::write_all;
17
18 use regex::Regex;
19
20 use crate::bloom::RollingBloomFilter;
21 use crate::bgp_client::BGPClient;
22
23 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
24 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 500;
25
26 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
27 pub enum AddressState {
28         Untested,
29         LowBlockCount,
30         HighBlockCount,
31         LowVersion,
32         BadVersion,
33         NotFullNode,
34         ProtocolViolation,
35         Timeout,
36         TimeoutDuringRequest,
37         TimeoutAwaitingPong,
38         TimeoutAwaitingAddr,
39         TimeoutAwaitingBlock,
40         Good,
41         WasGood,
42         EvilNode,
43 }
44
45 impl AddressState {
46         pub fn from_num(num: u8) -> Option<AddressState> {
47                 match num {
48                         0x0 => Some(AddressState::Untested),
49                         0x1 => Some(AddressState::LowBlockCount),
50                         0x2 => Some(AddressState::HighBlockCount),
51                         0x3 => Some(AddressState::LowVersion),
52                         0x4 => Some(AddressState::BadVersion),
53                         0x5 => Some(AddressState::NotFullNode),
54                         0x6 => Some(AddressState::ProtocolViolation),
55                         0x7 => Some(AddressState::Timeout),
56                         0x8 => Some(AddressState::TimeoutDuringRequest),
57                         0x9 => Some(AddressState::TimeoutAwaitingPong),
58                         0xa => Some(AddressState::TimeoutAwaitingAddr),
59                         0xb => Some(AddressState::TimeoutAwaitingBlock),
60                         0xc => Some(AddressState::Good),
61                         0xd => Some(AddressState::WasGood),
62                         0xe => Some(AddressState::EvilNode),
63                         _   => None,
64                 }
65         }
66
67         pub fn to_num(&self) -> u8 {
68                 match *self {
69                         AddressState::Untested => 0,
70                         AddressState::LowBlockCount => 1,
71                         AddressState::HighBlockCount => 2,
72                         AddressState::LowVersion => 3,
73                         AddressState::BadVersion => 4,
74                         AddressState::NotFullNode => 5,
75                         AddressState::ProtocolViolation => 6,
76                         AddressState::Timeout => 7,
77                         AddressState::TimeoutDuringRequest => 8,
78                         AddressState::TimeoutAwaitingPong => 9,
79                         AddressState::TimeoutAwaitingAddr => 10,
80                         AddressState::TimeoutAwaitingBlock => 11,
81                         AddressState::Good => 12,
82                         AddressState::WasGood => 13,
83                         AddressState::EvilNode => 14,
84                 }
85         }
86
87         pub fn to_str(&self) -> &'static str {
88                 match *self {
89                         AddressState::Untested => "Untested",
90                         AddressState::LowBlockCount => "Low Block Count",
91                         AddressState::HighBlockCount => "High Block Count",
92                         AddressState::LowVersion => "Low Version",
93                         AddressState::BadVersion => "Bad Version",
94                         AddressState::NotFullNode => "Not Full Node",
95                         AddressState::ProtocolViolation => "Protocol Violation",
96                         AddressState::Timeout => "Timeout",
97                         AddressState::TimeoutDuringRequest => "Timeout During Request",
98                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
99                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
100                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
101                         AddressState::Good => "Good",
102                         AddressState::WasGood => "Was Good",
103                         AddressState::EvilNode => "Evil Node",
104                 }
105         }
106
107         pub const fn get_count() -> u8 {
108                 15
109         }
110 }
111
112 #[derive(Hash, PartialEq, Eq)]
113 pub enum U64Setting {
114         RunTimeout,
115         WasGoodTimeout,
116         RescanInterval(AddressState),
117         MinProtocolVersion,
118 }
119
120 #[derive(Hash, PartialEq, Eq)]
121 pub enum RegexSetting {
122         SubverRegex,
123 }
124
125 struct Node {
126         // Times in seconds-since-startup
127         last_good: u32, // Ignored unless state is Good or WasGood
128         // Since everything is is 4-byte aligned, using a u64 for services blows up our size
129         // substantially. Instead, use a u32 pair and bit shift as needed.
130         last_services: (u32, u32),
131         state: AddressState,
132         queued: bool,
133 }
134 impl Node {
135         #[inline]
136         fn last_services(&self) -> u64 {
137                 ((self.last_services.0 as u64) << 32) |
138                 ((self.last_services.1 as u64)      )
139         }
140         #[inline]
141         fn services(inp: u64) -> (u32, u32) {
142                 (
143                         ((inp & 0xffffffff00000000) >> 32) as u32,
144                         ((inp & 0x00000000ffffffff)      ) as u32
145                 )
146         }
147 }
148
149 #[test]
150 fn services_test() {
151         assert_eq!(
152                 Node { last_good: 0, state: AddressState::Good, queued: false, last_services: Node::services(0x1badcafedeadbeef) }
153                         .last_services(),
154                 0x1badcafedeadbeef);
155 }
156
157 /// Essentially SocketAddr but without a traffic class or scope
158 #[derive(Clone, PartialEq, Eq, Hash)]
159 enum SockAddr {
160         V4(SocketAddrV4),
161         V6(([u16; 8], u16)),
162 }
163 #[inline]
164 fn segs_to_ip6(segs: &[u16; 8]) -> Ipv6Addr {
165         Ipv6Addr::new(segs[0], segs[1], segs[2], segs[3], segs[4], segs[5], segs[6], segs[7])
166 }
167 impl From<SocketAddr> for SockAddr {
168         fn from(addr: SocketAddr) -> SockAddr {
169                 match addr {
170                         SocketAddr::V4(sa) => SockAddr::V4(sa),
171                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().segments(), sa.port())),
172                 }
173         }
174 }
175 impl Into<SocketAddr> for &SockAddr {
176         fn into(self) -> SocketAddr {
177                 match self {
178                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
179                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(segs_to_ip6(&sa.0), sa.1, 0, 0))
180                 }
181         }
182 }
183 impl ToString for SockAddr {
184         fn to_string(&self) -> String {
185                 let sa: SocketAddr = self.into();
186                 sa.to_string()
187         }
188 }
189 impl SockAddr {
190         pub fn port(&self) -> u16 {
191                 match *self {
192                         SockAddr::V4(sa) => sa.port(),
193                         SockAddr::V6((_, port)) => port,
194                 }
195         }
196         pub fn ip(&self) -> IpAddr {
197                 match *self {
198                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
199                         SockAddr::V6((ip, _)) => IpAddr::V6(segs_to_ip6(&ip)),
200                 }
201         }
202 }
203
204 struct Nodes {
205         good_node_services: [HashSet<SockAddr>; 64],
206         nodes_to_state: HashMap<SockAddr, Node>,
207         timeout_nodes: RollingBloomFilter<SockAddr>,
208         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
209 }
210 struct NodesMutRef<'a> {
211         good_node_services: &'a mut [HashSet<SockAddr>; 64],
212         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
213         timeout_nodes: &'a mut RollingBloomFilter<SockAddr>,
214         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
215 }
216
217 impl Nodes {
218         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
219                 NodesMutRef {
220                         good_node_services: &mut self.good_node_services,
221                         nodes_to_state: &mut self.nodes_to_state,
222                         timeout_nodes: &mut self.timeout_nodes,
223                         state_next_scan: &mut self.state_next_scan,
224                 }
225         }
226 }
227
228 pub struct Store {
229         u64_settings: RwLock<HashMap<U64Setting, u64>>,
230         subver_regex: RwLock<Arc<Regex>>,
231         nodes: RwLock<Nodes>,
232         start_time: Instant,
233         store: String,
234 }
235
236 impl Store {
237         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
238                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
239                         let mut l = BufReader::new(f).lines();
240                         macro_rules! try_read {
241                                 ($lines: expr, $ty: ty) => { {
242                                         match $lines.next() {
243                                                 Some(line) => match line {
244                                                         Ok(line) => match line.parse::<$ty>() {
245                                                                 Ok(res) => res,
246                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
247                                                         },
248                                                         Err(e) => return future::err(e),
249                                                 },
250                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
251                                         }
252                                 } }
253                         }
254                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
255                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
256                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
257                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
258                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
259                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
260                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
261                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
262                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
263                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
264                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
265                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
266                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
267                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
268                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
269                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
270                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
271                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
272                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
273                         future::ok((u64s, try_read!(l, Regex)))
274                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
275                         let mut u64s = HashMap::with_capacity(15);
276                         u64s.insert(U64Setting::RunTimeout, 120);
277                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
278                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
279                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
280                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
281                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
282                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
283                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
284                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
285                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
286                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
287                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
288                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
289                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
290                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
291                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
292                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
293                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
294                         future::ok((u64s, Regex::new(".*").unwrap()))
295                 });
296
297                 macro_rules! nodes_uninitd {
298                         () => { {
299                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
300                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
301                                 Nodes {
302                                         good_node_services,
303                                         nodes_to_state: HashMap::new(),
304                                         timeout_nodes: RollingBloomFilter::new(),
305                                         state_next_scan: state_vecs,
306                                 }
307                         } }
308                 }
309
310                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
311                         let mut res = nodes_uninitd!();
312                         let l = BufReader::new(f).lines();
313                         for line_res in l {
314                                 let line = match line_res {
315                                         Ok(l) => l,
316                                         Err(_) => return future::ok(res),
317                                 };
318                                 let mut line_iter = line.split(',');
319                                 macro_rules! try_read {
320                                         ($lines: expr, $ty: ty) => { {
321                                                 match $lines.next() {
322                                                         Some(line) => match line.parse::<$ty>() {
323                                                                 Ok(res) => res,
324                                                                 Err(_) => return future::ok(res),
325                                                         },
326                                                         None => return future::ok(res),
327                                                 }
328                                         } }
329                                 }
330                                 let sockaddr = try_read!(line_iter, SocketAddr);
331                                 let state = try_read!(line_iter, u8);
332                                 let last_services = try_read!(line_iter, u64);
333                                 let node = Node {
334                                         state: match AddressState::from_num(state) {
335                                                 Some(v) => v,
336                                                 None => return future::ok(res),
337                                         },
338                                         last_services: Node::services(last_services),
339                                         last_good: 0,
340                                         queued: true,
341                                 };
342                                 if node.state == AddressState::Good {
343                                         for i in 0..64 {
344                                                 if node.last_services() & (1 << i) != 0 {
345                                                         res.good_node_services[i].insert(sockaddr.into());
346                                                 }
347                                         }
348                                 }
349                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
350                                 res.nodes_to_state.insert(sockaddr.into(), node);
351                         }
352                         future::ok(res)
353                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
354                         future::ok(nodes_uninitd!())
355                 });
356                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
357                         future::ok(Store {
358                                 u64_settings: RwLock::new(u64_settings),
359                                 subver_regex: RwLock::new(Arc::new(regex)),
360                                 nodes: RwLock::new(nodes),
361                                 store,
362                                 start_time: Instant::now(),
363                         })
364                 })
365         }
366
367         pub fn get_u64(&self, setting: U64Setting) -> u64 {
368                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
369         }
370
371         pub fn set_u64(&self, setting: U64Setting, value: u64) {
372                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
373         }
374
375         pub fn get_node_count(&self, state: AddressState) -> usize {
376                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
377         }
378         pub fn get_bloom_node_count(&self) -> [usize; crate::bloom::GENERATION_COUNT] {
379                 self.nodes.read().unwrap().timeout_nodes.get_element_count()
380         }
381
382         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
383                 Arc::clone(&*self.subver_regex.read().unwrap())
384         }
385
386         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
387                 *self.subver_regex.write().unwrap() = Arc::new(value);
388         }
389
390         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
391                 let mut res = 0;
392                 let cur_time = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
393                 let mut nodes = self.nodes.write().unwrap();
394                 for addr in addresses {
395                         match nodes.nodes_to_state.entry(addr.into()) {
396                                 hash_map::Entry::Vacant(e) => {
397                                         e.insert(Node {
398                                                 state: AddressState::Untested,
399                                                 last_services: (0, 0),
400                                                 last_good: cur_time,
401                                                 queued: true,
402                                         });
403                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
404                                         res += 1;
405                                 },
406                                 hash_map::Entry::Occupied(_) => {},
407                         }
408                 }
409                 res
410         }
411
412         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
413                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
414                         match addr.socket_addr() {
415                                 Ok(socketaddr) => Some(socketaddr),
416                                 Err(_) => None, // TODO: Handle onions
417                         }
418                 }));
419         }
420         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
421                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
422                         match addr.socket_addr() {
423                                 Ok(socketaddr) => Some(socketaddr),
424                                 Err(_) => None, // TODO: Handle onions
425                         }
426                 }));
427         }
428
429         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
430                 let addr: SockAddr = sockaddr.into();
431
432                 let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
433
434                 let mut nodes_lock = self.nodes.write().unwrap();
435                 let nodes = nodes_lock.borrow_mut();
436
437                 let node_entry = nodes.nodes_to_state.entry(addr.clone());
438                 match node_entry {
439                         hash_map::Entry::Occupied(entry)
440                                         if entry.get().state == AddressState::Untested &&
441                                            entry.get().last_services() == 0 &&
442                                            state == AddressState::Timeout => {
443                                 entry.remove_entry();
444                                 nodes.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
445                                 return AddressState::Untested;
446                         },
447                         hash_map::Entry::Vacant(_) if state == AddressState::Timeout => {
448                                 nodes.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
449                                 return AddressState::Untested;
450                         },
451                         hash_map::Entry::Vacant(_) if nodes.timeout_nodes.contains(&addr) => {
452                                 return AddressState::Timeout;
453                         },
454                         _ => {},
455                 }
456
457                 let state_ref = node_entry.or_insert(Node {
458                         state: AddressState::Untested,
459                         last_services: (0, 0),
460                         last_good: now,
461                         queued: false,
462                 });
463                 let ret = state_ref.state;
464                 let was_good_timeout: u32 = self.get_u64(U64Setting::WasGoodTimeout)
465                         .try_into().expect("Need WasGood timeout that fits in a u32");
466                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
467                                 && state != AddressState::Good
468                                 && state_ref.last_good >= now - was_good_timeout {
469                         state_ref.state = AddressState::WasGood;
470                         for i in 0..64 {
471                                 if state_ref.last_services() & (1 << i) != 0 {
472                                         nodes.good_node_services[i].remove(&addr);
473                                 }
474                         }
475                         if !state_ref.queued {
476                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
477                                 state_ref.queued = true;
478                         }
479                 } else {
480                         state_ref.state = state;
481                         if state == AddressState::Good {
482                                 for i in 0..64 {
483                                         if services & (1 << i) != 0 && state_ref.last_services() & (1 << i) == 0 {
484                                                 nodes.good_node_services[i].insert(addr.clone());
485                                         } else if services & (1 << i) == 0 && state_ref.last_services() & (1 << i) != 0 {
486                                                 nodes.good_node_services[i].remove(&addr);
487                                         }
488                                 }
489                                 state_ref.last_services = Node::services(services);
490                                 state_ref.last_good = now;
491                         }
492                         if !state_ref.queued {
493                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
494                                 state_ref.queued = true;
495                         }
496                 }
497                 ret
498         }
499
500         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
501                 let settings_file = self.store.clone() + "/settings";
502                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
503                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
504                                 self.get_u64(U64Setting::RunTimeout),
505                                 self.get_u64(U64Setting::WasGoodTimeout),
506                                 self.get_u64(U64Setting::MinProtocolVersion),
507                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
508                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
509                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
510                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
511                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
512                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
513                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
514                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
515                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
516                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
517                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
518                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
519                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
520                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
521                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
522                                 self.get_regex(RegexSetting::SubverRegex).as_str());
523                         write_all(f, settings_string).and_then(|(mut f, _)| {
524                                 f.poll_sync_all()
525                         }).and_then(|_| {
526                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
527                         })
528                 });
529
530                 let nodes_file = self.store.clone() + "/nodes";
531                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
532                         let mut nodes_buff = String::new();
533                         {
534                                 let nodes = self.nodes.read().unwrap();
535                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
536                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
537                                         nodes_buff += &sockaddr.to_string();
538                                         nodes_buff += ",";
539                                         nodes_buff += &node.state.to_num().to_string();
540                                         nodes_buff += ",";
541                                         nodes_buff += &node.last_services().to_string();
542                                         nodes_buff += "\n";
543                                 }
544                         }
545                         write_all(f, nodes_buff)
546                 }).and_then(|(mut f, _)| {
547                         f.poll_sync_all()
548                 }).and_then(|_| {
549                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
550                 });
551
552                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
553         }
554
555         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
556                 let dns_file = self.store.clone() + "/nodes.dump";
557                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
558                         let mut dns_buff = String::new();
559                         {
560                                 let mut rng = thread_rng();
561                                 for i in &[ 0b00000000001u64,
562                                             0b00000000100,
563                                             0b00000000101,
564                                             0b00000001000,
565                                             0b00000001001,
566                                             0b00000001100,
567                                             0b00000001101,
568                                             0b00001001001,
569                                             0b10000000000,
570                                             0b10000000001,
571                                             0b10000000100,
572                                             0b10000000101,
573                                             0b10000001000,
574                                             0b10000001001,
575                                             0b10000001100,
576                                             0b10000001101,
577                                             0b10001001000] {
578                                 //            ^ NODE_NETWORK_LIIMTED
579                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
580                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
581                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
582                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
583                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
584                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
585                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
586                                         macro_rules! add_addr { ($addr: expr) => {
587                                                 match $addr.ip() {
588                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
589                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
590                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
591                                                 }
592                                         } }
593                                         {
594                                                 let nodes = self.nodes.read().unwrap();
595                                                 if i.count_ones() == 1 {
596                                                         for j in 0..64 {
597                                                                 if i & (1 << j) != 0 {
598                                                                         let set_ref = &nodes.good_node_services[j];
599                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
600                                                                                 add_addr!(a);
601                                                                         }
602                                                                         break;
603                                                                 }
604                                                         }
605                                                 } else if i.count_ones() == 2 {
606                                                         let mut first_set = None;
607                                                         let mut second_set = None;
608                                                         for j in 0..64 {
609                                                                 if i & (1 << j) != 0 {
610                                                                         if first_set == None {
611                                                                                 first_set = Some(&nodes.good_node_services[j]);
612                                                                         } else {
613                                                                                 second_set = Some(&nodes.good_node_services[j]);
614                                                                                 break;
615                                                                         }
616                                                                 }
617                                                         }
618                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
619                                                                 add_addr!(a);
620                                                         }
621                                                 } else {
622                                                         //TODO: Could optimize this one a bit
623                                                         let mut intersection;
624                                                         let mut intersection_set_ref = None;
625                                                         for j in 0..64 {
626                                                                 if i & (1 << j) != 0 {
627                                                                         if intersection_set_ref == None {
628                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
629                                                                         } else {
630                                                                                 let new_intersection = intersection_set_ref.unwrap()
631                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
632                                                                                 intersection = Some(new_intersection);
633                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
634                                                                         }
635                                                                 }
636                                                         }
637                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
638                                                                 add_addr!(a);
639                                                         }
640                                                 }
641                                         }
642                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
643                                         asn_set.insert(0);
644                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
645                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
646                                         }
647                                         asn_set.clear();
648                                         asn_set.insert(0);
649                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
650                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
651                                         }
652                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
653                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
654                                         }
655                                 }
656                         }
657                         write_all(f, dns_buff)
658                 }).and_then(|(mut f, _)| {
659                         f.poll_sync_all()
660                 }).and_then(|_| {
661                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
662                 }).then(|_| { future::ok(()) })
663         }
664
665         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
666                 let mut res = Vec::with_capacity(128);
667
668                 {
669                         let mut nodes_lock = self.nodes.write().unwrap();
670                         let nodes = nodes_lock.borrow_mut();
671                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
672                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
673                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
674                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
675                                                 state_nodes.len() as u64);
676                                 for node in state_nodes.drain(..split_point as usize) {
677                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
678                                         res.push((&node).into());
679                                 }
680                         }
681                 }
682                 res.shuffle(&mut thread_rng());
683                 res
684         }
685 }