Drop memory limit. It was useful to debug OOMs but is now unnecessary
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::convert::TryInto;
3 use std::collections::{HashSet, HashMap, hash_map};
4 use std::sync::{Arc, RwLock};
5 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
6 use std::time::{Duration, Instant};
7 use std::io::{BufRead, BufReader};
8
9 use bitcoin::network::address::{Address, AddrV2Message};
10
11 use rand::thread_rng;
12 use rand::seq::{SliceRandom, IteratorRandom};
13
14 use tokio::prelude::*;
15 use tokio::fs::File;
16 use tokio::io::write_all;
17
18 use regex::Regex;
19
20 use crate::bloom::RollingBloomFilter;
21 use crate::bgp_client::BGPClient;
22
23 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
24 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 2500;
25
26 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
27 pub enum AddressState {
28         Untested,
29         LowBlockCount,
30         HighBlockCount,
31         LowVersion,
32         BadVersion,
33         NotFullNode,
34         ProtocolViolation,
35         Timeout,
36         TimeoutDuringRequest,
37         TimeoutAwaitingPong,
38         TimeoutAwaitingAddr,
39         TimeoutAwaitingBlock,
40         Good,
41         WasGood,
42         EvilNode,
43 }
44
45 impl AddressState {
46         pub fn from_num(num: u8) -> Option<AddressState> {
47                 match num {
48                         0x0 => Some(AddressState::Untested),
49                         0x1 => Some(AddressState::LowBlockCount),
50                         0x2 => Some(AddressState::HighBlockCount),
51                         0x3 => Some(AddressState::LowVersion),
52                         0x4 => Some(AddressState::BadVersion),
53                         0x5 => Some(AddressState::NotFullNode),
54                         0x6 => Some(AddressState::ProtocolViolation),
55                         0x7 => Some(AddressState::Timeout),
56                         0x8 => Some(AddressState::TimeoutDuringRequest),
57                         0x9 => Some(AddressState::TimeoutAwaitingPong),
58                         0xa => Some(AddressState::TimeoutAwaitingAddr),
59                         0xb => Some(AddressState::TimeoutAwaitingBlock),
60                         0xc => Some(AddressState::Good),
61                         0xd => Some(AddressState::WasGood),
62                         0xe => Some(AddressState::EvilNode),
63                         _   => None,
64                 }
65         }
66
67         pub fn to_num(&self) -> u8 {
68                 match *self {
69                         AddressState::Untested => 0,
70                         AddressState::LowBlockCount => 1,
71                         AddressState::HighBlockCount => 2,
72                         AddressState::LowVersion => 3,
73                         AddressState::BadVersion => 4,
74                         AddressState::NotFullNode => 5,
75                         AddressState::ProtocolViolation => 6,
76                         AddressState::Timeout => 7,
77                         AddressState::TimeoutDuringRequest => 8,
78                         AddressState::TimeoutAwaitingPong => 9,
79                         AddressState::TimeoutAwaitingAddr => 10,
80                         AddressState::TimeoutAwaitingBlock => 11,
81                         AddressState::Good => 12,
82                         AddressState::WasGood => 13,
83                         AddressState::EvilNode => 14,
84                 }
85         }
86
87         pub fn to_str(&self) -> &'static str {
88                 match *self {
89                         AddressState::Untested => "Untested",
90                         AddressState::LowBlockCount => "Low Block Count",
91                         AddressState::HighBlockCount => "High Block Count",
92                         AddressState::LowVersion => "Low Version",
93                         AddressState::BadVersion => "Bad Version",
94                         AddressState::NotFullNode => "Not Full Node",
95                         AddressState::ProtocolViolation => "Protocol Violation",
96                         AddressState::Timeout => "Timeout",
97                         AddressState::TimeoutDuringRequest => "Timeout During Request",
98                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
99                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
100                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
101                         AddressState::Good => "Good",
102                         AddressState::WasGood => "Was Good",
103                         AddressState::EvilNode => "Evil Node",
104                 }
105         }
106
107         pub const fn get_count() -> u8 {
108                 15
109         }
110 }
111
112 #[derive(Hash, PartialEq, Eq)]
113 pub enum U64Setting {
114         RunTimeout,
115         WasGoodTimeout,
116         RescanInterval(AddressState),
117         MinProtocolVersion,
118 }
119
120 #[derive(Hash, PartialEq, Eq)]
121 pub enum RegexSetting {
122         SubverRegex,
123 }
124
125 struct Node {
126         // Times in seconds-since-startup
127         last_good: u32, // Ignored unless state is Good or WasGood
128         // Since everything is is 4-byte aligned, using a u64 for services blows up our size
129         // substantially. Instead, use a u32 pair and bit shift as needed.
130         last_services: (u32, u32),
131         state: AddressState,
132         queued: bool,
133 }
134 impl Node {
135         #[inline]
136         fn last_services(&self) -> u64 {
137                 ((self.last_services.0 as u64) << 32) |
138                 ((self.last_services.1 as u64)      )
139         }
140         #[inline]
141         fn services(inp: u64) -> (u32, u32) {
142                 (
143                         ((inp & 0xffffffff00000000) >> 32) as u32,
144                         ((inp & 0x00000000ffffffff)      ) as u32
145                 )
146         }
147 }
148
149 #[test]
150 fn services_test() {
151         assert_eq!(
152                 Node { last_good: 0, state: AddressState::Good, queued: false, last_services: Node::services(0x1badcafedeadbeef) }
153                         .last_services(),
154                 0x1badcafedeadbeef);
155 }
156
157 /// Essentially SocketAddr but without a traffic class or scope
158 #[derive(Clone, PartialEq, Eq, Hash)]
159 enum SockAddr {
160         V4(SocketAddrV4),
161         V6(([u16; 8], u16)),
162 }
163 #[inline]
164 fn segs_to_ip6(segs: &[u16; 8]) -> Ipv6Addr {
165         Ipv6Addr::new(segs[0], segs[1], segs[2], segs[3], segs[4], segs[5], segs[6], segs[7])
166 }
167 impl From<SocketAddr> for SockAddr {
168         fn from(addr: SocketAddr) -> SockAddr {
169                 match addr {
170                         SocketAddr::V4(sa) => SockAddr::V4(sa),
171                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().segments(), sa.port())),
172                 }
173         }
174 }
175 impl Into<SocketAddr> for &SockAddr {
176         fn into(self) -> SocketAddr {
177                 match self {
178                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
179                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(segs_to_ip6(&sa.0), sa.1, 0, 0))
180                 }
181         }
182 }
183 impl ToString for SockAddr {
184         fn to_string(&self) -> String {
185                 let sa: SocketAddr = self.into();
186                 sa.to_string()
187         }
188 }
189 impl SockAddr {
190         pub fn port(&self) -> u16 {
191                 match *self {
192                         SockAddr::V4(sa) => sa.port(),
193                         SockAddr::V6((_, port)) => port,
194                 }
195         }
196         pub fn ip(&self) -> IpAddr {
197                 match *self {
198                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
199                         SockAddr::V6((ip, _)) => IpAddr::V6(segs_to_ip6(&ip)),
200                 }
201         }
202 }
203
204 struct Nodes {
205         good_node_services: [HashSet<SockAddr>; 64],
206         nodes_to_state: HashMap<SockAddr, Node>,
207         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
208 }
209 struct NodesMutRef<'a> {
210         good_node_services: &'a mut [HashSet<SockAddr>; 64],
211         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
212         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
213 }
214
215 impl Nodes {
216         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
217                 NodesMutRef {
218                         good_node_services: &mut self.good_node_services,
219                         nodes_to_state: &mut self.nodes_to_state,
220                         state_next_scan: &mut self.state_next_scan,
221                 }
222         }
223 }
224
225 pub struct Store {
226         u64_settings: RwLock<HashMap<U64Setting, u64>>,
227         subver_regex: RwLock<Arc<Regex>>,
228         nodes: RwLock<Nodes>,
229         timeout_nodes: RollingBloomFilter<SockAddr>,
230         start_time: Instant,
231         store: String,
232 }
233
234 impl Store {
235         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
236                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
237                         let mut l = BufReader::new(f).lines();
238                         macro_rules! try_read {
239                                 ($lines: expr, $ty: ty) => { {
240                                         match $lines.next() {
241                                                 Some(line) => match line {
242                                                         Ok(line) => match line.parse::<$ty>() {
243                                                                 Ok(res) => res,
244                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
245                                                         },
246                                                         Err(e) => return future::err(e),
247                                                 },
248                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
249                                         }
250                                 } }
251                         }
252                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
253                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
254                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
255                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
256                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
257                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
258                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
259                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
260                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
261                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
262                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
263                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
264                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
265                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
266                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
267                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
268                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
269                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
270                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
271                         future::ok((u64s, try_read!(l, Regex)))
272                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
273                         let mut u64s = HashMap::with_capacity(15);
274                         u64s.insert(U64Setting::RunTimeout, 120);
275                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
276                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 3600);
277                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
278                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
279                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
280                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
281                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
282                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
283                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 604800);
284                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
285                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
286                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
287                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
288                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
289                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
290                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
291                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
292                         future::ok((u64s, Regex::new(".*").unwrap()))
293                 });
294
295                 macro_rules! nodes_uninitd {
296                         () => { {
297                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
298                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
299                                 Nodes {
300                                         good_node_services,
301                                         nodes_to_state: HashMap::new(),
302                                         state_next_scan: state_vecs,
303                                 }
304                         } }
305                 }
306
307                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
308                         let mut res = nodes_uninitd!();
309                         let l = BufReader::new(f).lines();
310                         for line_res in l {
311                                 let line = match line_res {
312                                         Ok(l) => l,
313                                         Err(_) => return future::ok(res),
314                                 };
315                                 let mut line_iter = line.split(',');
316                                 macro_rules! try_read {
317                                         ($lines: expr, $ty: ty) => { {
318                                                 match $lines.next() {
319                                                         Some(line) => match line.parse::<$ty>() {
320                                                                 Ok(res) => res,
321                                                                 Err(_) => return future::ok(res),
322                                                         },
323                                                         None => return future::ok(res),
324                                                 }
325                                         } }
326                                 }
327                                 let sockaddr = try_read!(line_iter, SocketAddr);
328                                 let state = try_read!(line_iter, u8);
329                                 let last_services = try_read!(line_iter, u64);
330                                 let node = Node {
331                                         state: match AddressState::from_num(state) {
332                                                 Some(v) => v,
333                                                 None => return future::ok(res),
334                                         },
335                                         last_services: Node::services(last_services),
336                                         last_good: 0,
337                                         queued: true,
338                                 };
339                                 if node.state == AddressState::Good {
340                                         for i in 0..64 {
341                                                 if node.last_services() & (1 << i) != 0 {
342                                                         res.good_node_services[i].insert(sockaddr.into());
343                                                 }
344                                         }
345                                 }
346                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
347                                 res.nodes_to_state.insert(sockaddr.into(), node);
348                         }
349                         future::ok(res)
350                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
351                         future::ok(nodes_uninitd!())
352                 });
353                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
354                         future::ok(Store {
355                                 u64_settings: RwLock::new(u64_settings),
356                                 subver_regex: RwLock::new(Arc::new(regex)),
357                                 nodes: RwLock::new(nodes),
358                                 timeout_nodes: RollingBloomFilter::new(),
359                                 store,
360                                 start_time: Instant::now(),
361                         })
362                 })
363         }
364
365         pub fn get_u64(&self, setting: U64Setting) -> u64 {
366                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
367         }
368
369         pub fn set_u64(&self, setting: U64Setting, value: u64) {
370                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
371         }
372
373         pub fn get_node_count(&self, state: AddressState) -> usize {
374                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
375         }
376         pub fn get_bloom_node_count(&self) -> [usize; crate::bloom::GENERATION_COUNT] {
377                 self.timeout_nodes.get_element_count()
378         }
379
380         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
381                 Arc::clone(&*self.subver_regex.read().unwrap())
382         }
383
384         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
385                 *self.subver_regex.write().unwrap() = Arc::new(value);
386         }
387
388         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
389                 let mut res = 0;
390                 let cur_time = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
391                 let mut nodes = self.nodes.write().unwrap();
392                 for addr in addresses {
393                         match nodes.nodes_to_state.entry(addr.into()) {
394                                 hash_map::Entry::Vacant(e) => {
395                                         e.insert(Node {
396                                                 state: AddressState::Untested,
397                                                 last_services: (0, 0),
398                                                 last_good: cur_time,
399                                                 queued: true,
400                                         });
401                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
402                                         res += 1;
403                                 },
404                                 hash_map::Entry::Occupied(_) => {},
405                         }
406                 }
407                 res
408         }
409
410         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
411                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
412                         match addr.socket_addr() {
413                                 Ok(socketaddr) => Some(socketaddr),
414                                 Err(_) => None, // TODO: Handle onions
415                         }
416                 }));
417         }
418         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
419                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
420                         match addr.socket_addr() {
421                                 Ok(socketaddr) => Some(socketaddr),
422                                 Err(_) => None, // TODO: Handle onions
423                         }
424                 }));
425         }
426
427         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
428                 let addr: SockAddr = sockaddr.into();
429
430                 if state == AddressState::Untested && self.timeout_nodes.contains(&addr) {
431                         return AddressState::Timeout;
432                 }
433
434                 let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
435
436                 let mut nodes_lock = self.nodes.write().unwrap();
437                 let nodes = nodes_lock.borrow_mut();
438
439                 let node_entry = nodes.nodes_to_state.entry(addr.clone());
440                 match node_entry {
441                         hash_map::Entry::Occupied(entry)
442                                         if entry.get().state == AddressState::Untested &&
443                                            entry.get().last_services() == 0 &&
444                                            state == AddressState::Timeout => {
445                                 entry.remove_entry();
446                                 self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
447                                 return AddressState::Untested;
448                         },
449                         hash_map::Entry::Vacant(_) if state == AddressState::Timeout => {
450                                 self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
451                                 return AddressState::Untested;
452                         },
453                         _ => {},
454                 }
455
456                 let state_ref = node_entry.or_insert(Node {
457                         state: AddressState::Untested,
458                         last_services: (0, 0),
459                         last_good: now,
460                         queued: false,
461                 });
462                 let ret = state_ref.state;
463                 let was_good_timeout: u32 = self.get_u64(U64Setting::WasGoodTimeout)
464                         .try_into().expect("Need WasGood timeout that fits in a u32");
465                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
466                                 && state != AddressState::Good
467                                 && state_ref.last_good >= now - was_good_timeout {
468                         state_ref.state = AddressState::WasGood;
469                         for i in 0..64 {
470                                 if state_ref.last_services() & (1 << i) != 0 {
471                                         nodes.good_node_services[i].remove(&addr);
472                                 }
473                         }
474                         if !state_ref.queued {
475                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
476                                 state_ref.queued = true;
477                         }
478                 } else {
479                         state_ref.state = state;
480                         if state == AddressState::Good {
481                                 for i in 0..64 {
482                                         if services & (1 << i) != 0 && state_ref.last_services() & (1 << i) == 0 {
483                                                 nodes.good_node_services[i].insert(addr.clone());
484                                         } else if services & (1 << i) == 0 && state_ref.last_services() & (1 << i) != 0 {
485                                                 nodes.good_node_services[i].remove(&addr);
486                                         }
487                                 }
488                                 state_ref.last_services = Node::services(services);
489                                 state_ref.last_good = now;
490                         }
491                         if !state_ref.queued {
492                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
493                                 state_ref.queued = true;
494                         }
495                 }
496                 ret
497         }
498
499         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
500                 let settings_file = self.store.clone() + "/settings";
501                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
502                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
503                                 self.get_u64(U64Setting::RunTimeout),
504                                 self.get_u64(U64Setting::WasGoodTimeout),
505                                 self.get_u64(U64Setting::MinProtocolVersion),
506                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
507                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
508                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
509                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
510                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
511                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
512                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
513                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
514                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
515                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
516                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
517                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
518                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
519                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
520                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
521                                 self.get_regex(RegexSetting::SubverRegex).as_str());
522                         write_all(f, settings_string).and_then(|(mut f, _)| {
523                                 f.poll_sync_all()
524                         }).and_then(|_| {
525                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
526                         })
527                 });
528
529                 let nodes_file = self.store.clone() + "/nodes";
530                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
531                         let mut nodes_buff = String::new();
532                         {
533                                 let nodes = self.nodes.read().unwrap();
534                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
535                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
536                                         nodes_buff += &sockaddr.to_string();
537                                         nodes_buff += ",";
538                                         nodes_buff += &node.state.to_num().to_string();
539                                         nodes_buff += ",";
540                                         nodes_buff += &node.last_services().to_string();
541                                         nodes_buff += "\n";
542                                 }
543                         }
544                         write_all(f, nodes_buff)
545                 }).and_then(|(mut f, _)| {
546                         f.poll_sync_all()
547                 }).and_then(|_| {
548                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
549                 });
550
551                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
552         }
553
554         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
555                 let dns_file = self.store.clone() + "/nodes.dump";
556                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
557                         let mut dns_buff = String::new();
558                         {
559                                 let mut rng = thread_rng();
560                                 for i in &[ 0b00000000001u64,
561                                             0b00000000100,
562                                             0b00000000101,
563                                             0b00000001000,
564                                             0b00000001001,
565                                             0b00000001100,
566                                             0b00000001101,
567                                             0b00001001001,
568                                             0b10000000000,
569                                             0b10000000001,
570                                             0b10000000100,
571                                             0b10000000101,
572                                             0b10000001000,
573                                             0b10000001001,
574                                             0b10000001100,
575                                             0b10000001101,
576                                             0b10001001000] {
577                                 //            ^ NODE_NETWORK_LIIMTED
578                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
579                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
580                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
581                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
582                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
583                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
584                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
585                                         macro_rules! add_addr { ($addr: expr) => {
586                                                 match $addr.ip() {
587                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
588                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
589                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
590                                                 }
591                                         } }
592                                         {
593                                                 let nodes = self.nodes.read().unwrap();
594                                                 if i.count_ones() == 1 {
595                                                         for j in 0..64 {
596                                                                 if i & (1 << j) != 0 {
597                                                                         let set_ref = &nodes.good_node_services[j];
598                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
599                                                                                 add_addr!(a);
600                                                                         }
601                                                                         break;
602                                                                 }
603                                                         }
604                                                 } else if i.count_ones() == 2 {
605                                                         let mut first_set = None;
606                                                         let mut second_set = None;
607                                                         for j in 0..64 {
608                                                                 if i & (1 << j) != 0 {
609                                                                         if first_set == None {
610                                                                                 first_set = Some(&nodes.good_node_services[j]);
611                                                                         } else {
612                                                                                 second_set = Some(&nodes.good_node_services[j]);
613                                                                                 break;
614                                                                         }
615                                                                 }
616                                                         }
617                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
618                                                                 add_addr!(a);
619                                                         }
620                                                 } else {
621                                                         //TODO: Could optimize this one a bit
622                                                         let mut intersection;
623                                                         let mut intersection_set_ref = None;
624                                                         for j in 0..64 {
625                                                                 if i & (1 << j) != 0 {
626                                                                         if intersection_set_ref == None {
627                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
628                                                                         } else {
629                                                                                 let new_intersection = intersection_set_ref.unwrap()
630                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
631                                                                                 intersection = Some(new_intersection);
632                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
633                                                                         }
634                                                                 }
635                                                         }
636                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
637                                                                 add_addr!(a);
638                                                         }
639                                                 }
640                                         }
641                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
642                                         asn_set.insert(0);
643                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
644                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
645                                         }
646                                         asn_set.clear();
647                                         asn_set.insert(0);
648                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
649                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
650                                         }
651                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
652                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
653                                         }
654                                 }
655                         }
656                         write_all(f, dns_buff)
657                 }).and_then(|(mut f, _)| {
658                         f.poll_sync_all()
659                 }).and_then(|_| {
660                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
661                 }).then(|_| { future::ok(()) })
662         }
663
664         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
665                 let mut res = Vec::with_capacity(128);
666
667                 {
668                         let mut nodes_lock = self.nodes.write().unwrap();
669                         let nodes = nodes_lock.borrow_mut();
670                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
671                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
672                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
673                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
674                                                 state_nodes.len() as u64);
675                                 for node in state_nodes.drain(..split_point as usize) {
676                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
677                                         res.push((&node).into());
678                                 }
679                         }
680                 }
681                 res.shuffle(&mut thread_rng());
682                 res
683         }
684 }