Bump rust-bitcoin, add addrv2
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::{Address, AddrV2Message};
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
23
24 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
25 pub enum AddressState {
26         Untested,
27         LowBlockCount,
28         HighBlockCount,
29         LowVersion,
30         BadVersion,
31         NotFullNode,
32         ProtocolViolation,
33         Timeout,
34         TimeoutDuringRequest,
35         TimeoutAwaitingPong,
36         TimeoutAwaitingAddr,
37         TimeoutAwaitingBlock,
38         Good,
39         WasGood,
40         EvilNode,
41 }
42
43 impl AddressState {
44         pub fn from_num(num: u8) -> Option<AddressState> {
45                 match num {
46                         0x0 => Some(AddressState::Untested),
47                         0x1 => Some(AddressState::LowBlockCount),
48                         0x2 => Some(AddressState::HighBlockCount),
49                         0x3 => Some(AddressState::LowVersion),
50                         0x4 => Some(AddressState::BadVersion),
51                         0x5 => Some(AddressState::NotFullNode),
52                         0x6 => Some(AddressState::ProtocolViolation),
53                         0x7 => Some(AddressState::Timeout),
54                         0x8 => Some(AddressState::TimeoutDuringRequest),
55                         0x9 => Some(AddressState::TimeoutAwaitingPong),
56                         0xa => Some(AddressState::TimeoutAwaitingAddr),
57                         0xb => Some(AddressState::TimeoutAwaitingBlock),
58                         0xc => Some(AddressState::Good),
59                         0xd => Some(AddressState::WasGood),
60                         0xe => Some(AddressState::EvilNode),
61                         _   => None,
62                 }
63         }
64
65         pub fn to_num(&self) -> u8 {
66                 match *self {
67                         AddressState::Untested => 0,
68                         AddressState::LowBlockCount => 1,
69                         AddressState::HighBlockCount => 2,
70                         AddressState::LowVersion => 3,
71                         AddressState::BadVersion => 4,
72                         AddressState::NotFullNode => 5,
73                         AddressState::ProtocolViolation => 6,
74                         AddressState::Timeout => 7,
75                         AddressState::TimeoutDuringRequest => 8,
76                         AddressState::TimeoutAwaitingPong => 9,
77                         AddressState::TimeoutAwaitingAddr => 10,
78                         AddressState::TimeoutAwaitingBlock => 11,
79                         AddressState::Good => 12,
80                         AddressState::WasGood => 13,
81                         AddressState::EvilNode => 14,
82                 }
83         }
84
85         pub fn to_str(&self) -> &'static str {
86                 match *self {
87                         AddressState::Untested => "Untested",
88                         AddressState::LowBlockCount => "Low Block Count",
89                         AddressState::HighBlockCount => "High Block Count",
90                         AddressState::LowVersion => "Low Version",
91                         AddressState::BadVersion => "Bad Version",
92                         AddressState::NotFullNode => "Not Full Node",
93                         AddressState::ProtocolViolation => "Protocol Violation",
94                         AddressState::Timeout => "Timeout",
95                         AddressState::TimeoutDuringRequest => "Timeout During Request",
96                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
97                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
98                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
99                         AddressState::Good => "Good",
100                         AddressState::WasGood => "Was Good",
101                         AddressState::EvilNode => "Evil Node",
102                 }
103         }
104
105         pub const fn get_count() -> u8 {
106                 15
107         }
108 }
109
110 #[derive(Hash, PartialEq, Eq)]
111 pub enum U64Setting {
112         RunTimeout,
113         WasGoodTimeout,
114         RescanInterval(AddressState),
115         MinProtocolVersion,
116 }
117
118 #[derive(Hash, PartialEq, Eq)]
119 pub enum RegexSetting {
120         SubverRegex,
121 }
122
123 struct Node {
124         last_update: Instant,
125         last_good: Instant, // Ignored unless state is Good or WasGood
126         last_services: u64,
127         state: AddressState,
128         queued: bool,
129 }
130
131 /// Essentially SocketAddr but without a traffic class or scope
132 #[derive(Clone, PartialEq, Eq, Hash)]
133 enum SockAddr {
134         V4(SocketAddrV4),
135         V6((Ipv6Addr, u16)),
136 }
137 impl From<SocketAddr> for SockAddr {
138         fn from(addr: SocketAddr) -> SockAddr {
139                 match addr {
140                         SocketAddr::V4(sa) => SockAddr::V4(sa),
141                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().clone(), sa.port())),
142                 }
143         }
144 }
145 impl Into<SocketAddr> for &SockAddr {
146         fn into(self) -> SocketAddr {
147                 match self {
148                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
149                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(sa.0, sa.1, 0, 0))
150                 }
151         }
152 }
153 impl ToString for SockAddr {
154         fn to_string(&self) -> String {
155                 let sa: SocketAddr = self.into();
156                 sa.to_string()
157         }
158 }
159 impl SockAddr {
160         pub fn port(&self) -> u16 {
161                 match *self {
162                         SockAddr::V4(sa) => sa.port(),
163                         SockAddr::V6((_, port)) => port,
164                 }
165         }
166         pub fn ip(&self) -> IpAddr {
167                 match *self {
168                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
169                         SockAddr::V6((ip, _)) => IpAddr::V6(ip),
170                 }
171         }
172 }
173
174 struct Nodes {
175         good_node_services: [HashSet<SockAddr>; 64],
176         nodes_to_state: HashMap<SockAddr, Node>,
177         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
178 }
179 struct NodesMutRef<'a> {
180         good_node_services: &'a mut [HashSet<SockAddr>; 64],
181         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
182         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
183 }
184
185 impl Nodes {
186         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
187                 NodesMutRef {
188                         good_node_services: &mut self.good_node_services,
189                         nodes_to_state: &mut self.nodes_to_state,
190                         state_next_scan: &mut self.state_next_scan,
191                 }
192         }
193 }
194
195 pub struct Store {
196         u64_settings: RwLock<HashMap<U64Setting, u64>>,
197         subver_regex: RwLock<Arc<Regex>>,
198         nodes: RwLock<Nodes>,
199         store: String,
200 }
201
202 impl Store {
203         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
204                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
205                         let mut l = BufReader::new(f).lines();
206                         macro_rules! try_read {
207                                 ($lines: expr, $ty: ty) => { {
208                                         match $lines.next() {
209                                                 Some(line) => match line {
210                                                         Ok(line) => match line.parse::<$ty>() {
211                                                                 Ok(res) => res,
212                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
213                                                         },
214                                                         Err(e) => return future::err(e),
215                                                 },
216                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
217                                         }
218                                 } }
219                         }
220                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
221                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
222                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
223                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
224                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
225                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
226                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
227                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
237                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
238                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
239                         future::ok((u64s, try_read!(l, Regex)))
240                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
241                         let mut u64s = HashMap::with_capacity(15);
242                         u64s.insert(U64Setting::RunTimeout, 120);
243                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
244                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
245                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
246                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
247                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
257                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
258                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
259                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
260                         future::ok((u64s, Regex::new(".*").unwrap()))
261                 });
262
263                 macro_rules! nodes_uninitd {
264                         () => { {
265                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
266                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
267                                 Nodes {
268                                         good_node_services,
269                                         nodes_to_state: HashMap::new(),
270                                         state_next_scan: state_vecs,
271                                 }
272                         } }
273                 }
274
275                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
276                         let mut res = nodes_uninitd!();
277                         let l = BufReader::new(f).lines();
278                         for line_res in l {
279                                 let line = match line_res {
280                                         Ok(l) => l,
281                                         Err(_) => return future::ok(res),
282                                 };
283                                 let mut line_iter = line.split(',');
284                                 macro_rules! try_read {
285                                         ($lines: expr, $ty: ty) => { {
286                                                 match $lines.next() {
287                                                         Some(line) => match line.parse::<$ty>() {
288                                                                 Ok(res) => res,
289                                                                 Err(_) => return future::ok(res),
290                                                         },
291                                                         None => return future::ok(res),
292                                                 }
293                                         } }
294                                 }
295                                 let sockaddr = try_read!(line_iter, SocketAddr);
296                                 let state = try_read!(line_iter, u8);
297                                 let last_services = try_read!(line_iter, u64);
298                                 let node = Node {
299                                         state: match AddressState::from_num(state) {
300                                                 Some(v) => v,
301                                                 None => return future::ok(res),
302                                         },
303                                         last_services,
304                                         last_update: Instant::now(),
305                                         last_good: Instant::now(),
306                                         queued: true,
307                                 };
308                                 if node.state == AddressState::Good {
309                                         for i in 0..64 {
310                                                 if node.last_services & (1 << i) != 0 {
311                                                         res.good_node_services[i].insert(sockaddr.into());
312                                                 }
313                                         }
314                                 }
315                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
316                                 res.nodes_to_state.insert(sockaddr.into(), node);
317                         }
318                         future::ok(res)
319                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
320                         future::ok(nodes_uninitd!())
321                 });
322                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
323                         future::ok(Store {
324                                 u64_settings: RwLock::new(u64_settings),
325                                 subver_regex: RwLock::new(Arc::new(regex)),
326                                 nodes: RwLock::new(nodes),
327                                 store,
328                         })
329                 })
330         }
331
332         pub fn get_u64(&self, setting: U64Setting) -> u64 {
333                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
334         }
335
336         pub fn set_u64(&self, setting: U64Setting, value: u64) {
337                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
338         }
339
340         pub fn get_node_count(&self, state: AddressState) -> usize {
341                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
342         }
343
344         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
345                 Arc::clone(&*self.subver_regex.read().unwrap())
346         }
347
348         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
349                 *self.subver_regex.write().unwrap() = Arc::new(value);
350         }
351
352         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
353                 let mut res = 0;
354                 let mut nodes = self.nodes.write().unwrap();
355                 let cur_time = Instant::now();
356                 for addr in addresses {
357                         match nodes.nodes_to_state.entry(addr.into()) {
358                                 hash_map::Entry::Vacant(e) => {
359                                         e.insert(Node {
360                                                 state: AddressState::Untested,
361                                                 last_services: 0,
362                                                 last_update: cur_time,
363                                                 last_good: cur_time,
364                                                 queued: true,
365                                         });
366                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
367                                         res += 1;
368                                 },
369                                 hash_map::Entry::Occupied(_) => {},
370                         }
371                 }
372                 res
373         }
374
375         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
376                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
377                         match addr.socket_addr() {
378                                 Ok(socketaddr) => Some(socketaddr),
379                                 Err(_) => None, // TODO: Handle onions
380                         }
381                 }));
382         }
383         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
384                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
385                         match addr.socket_addr() {
386                                 Ok(socketaddr) => Some(socketaddr),
387                                 Err(_) => None, // TODO: Handle onions
388                         }
389                 }));
390         }
391
392         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
393                 let addr: SockAddr = sockaddr.into();
394                 let now = Instant::now();
395
396                 let mut nodes_lock = self.nodes.write().unwrap();
397                 let nodes = nodes_lock.borrow_mut();
398
399                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
400                         state: AddressState::Untested,
401                         last_services: 0,
402                         last_update: now,
403                         last_good: now,
404                         queued: false,
405                 });
406                 let ret = state_ref.state;
407                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
408                                 && state != AddressState::Good
409                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
410                         state_ref.state = AddressState::WasGood;
411                         for i in 0..64 {
412                                 if state_ref.last_services & (1 << i) != 0 {
413                                         nodes.good_node_services[i].remove(&addr);
414                                 }
415                         }
416                         state_ref.last_services = 0;
417                         if !state_ref.queued {
418                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
419                                 state_ref.queued = true;
420                         }
421                 } else {
422                         state_ref.state = state;
423                         if state == AddressState::Good {
424                                 for i in 0..64 {
425                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
426                                                 nodes.good_node_services[i].insert(addr.clone());
427                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
428                                                 nodes.good_node_services[i].remove(&addr);
429                                         }
430                                 }
431                                 state_ref.last_services = services;
432                                 state_ref.last_good = now;
433                         }
434                         if !state_ref.queued {
435                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
436                                 state_ref.queued = true;
437                         }
438                 }
439                 state_ref.last_update = now;
440                 ret
441         }
442
443         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
444                 let settings_file = self.store.clone() + "/settings";
445                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
446                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
447                                 self.get_u64(U64Setting::RunTimeout),
448                                 self.get_u64(U64Setting::WasGoodTimeout),
449                                 self.get_u64(U64Setting::MinProtocolVersion),
450                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
451                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
452                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
453                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
454                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
455                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
456                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
457                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
458                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
459                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
460                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
461                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
462                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
463                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
464                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
465                                 self.get_regex(RegexSetting::SubverRegex).as_str());
466                         write_all(f, settings_string).and_then(|(mut f, _)| {
467                                 f.poll_sync_all()
468                         }).and_then(|_| {
469                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
470                         })
471                 });
472
473                 let nodes_file = self.store.clone() + "/nodes";
474                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
475                         let mut nodes_buff = String::new();
476                         {
477                                 let nodes = self.nodes.read().unwrap();
478                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
479                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
480                                         nodes_buff += &sockaddr.to_string();
481                                         nodes_buff += ",";
482                                         nodes_buff += &node.state.to_num().to_string();
483                                         nodes_buff += ",";
484                                         nodes_buff += &node.last_services.to_string();
485                                         nodes_buff += "\n";
486                                 }
487                         }
488                         write_all(f, nodes_buff)
489                 }).and_then(|(mut f, _)| {
490                         f.poll_sync_all()
491                 }).and_then(|_| {
492                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
493                 });
494
495                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
496         }
497
498         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
499                 let dns_file = self.store.clone() + "/nodes.dump";
500                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
501                         let mut dns_buff = String::new();
502                         {
503                                 let mut rng = thread_rng();
504                                 for i in &[ 0b00000000001u64,
505                                             0b00000000100,
506                                             0b00000000101,
507                                             0b00000001000,
508                                             0b00000001001,
509                                             0b00000001100,
510                                             0b00000001101,
511                                             0b00001001001,
512                                             0b10000000000,
513                                             0b10000000001,
514                                             0b10000000100,
515                                             0b10000000101,
516                                             0b10000001000,
517                                             0b10000001001,
518                                             0b10000001100,
519                                             0b10000001101,
520                                             0b10001001000] {
521                                 //            ^ NODE_NETWORK_LIIMTED
522                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
523                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
524                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
525                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
526                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
527                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
528                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
529                                         macro_rules! add_addr { ($addr: expr) => {
530                                                 match $addr.ip() {
531                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
532                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
533                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
534                                                 }
535                                         } }
536                                         {
537                                                 let nodes = self.nodes.read().unwrap();
538                                                 if i.count_ones() == 1 {
539                                                         for j in 0..64 {
540                                                                 if i & (1 << j) != 0 {
541                                                                         let set_ref = &nodes.good_node_services[j];
542                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
543                                                                                 add_addr!(a);
544                                                                         }
545                                                                         break;
546                                                                 }
547                                                         }
548                                                 } else if i.count_ones() == 2 {
549                                                         let mut first_set = None;
550                                                         let mut second_set = None;
551                                                         for j in 0..64 {
552                                                                 if i & (1 << j) != 0 {
553                                                                         if first_set == None {
554                                                                                 first_set = Some(&nodes.good_node_services[j]);
555                                                                         } else {
556                                                                                 second_set = Some(&nodes.good_node_services[j]);
557                                                                                 break;
558                                                                         }
559                                                                 }
560                                                         }
561                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
562                                                                 add_addr!(a);
563                                                         }
564                                                 } else {
565                                                         //TODO: Could optimize this one a bit
566                                                         let mut intersection;
567                                                         let mut intersection_set_ref = None;
568                                                         for j in 0..64 {
569                                                                 if i & (1 << j) != 0 {
570                                                                         if intersection_set_ref == None {
571                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
572                                                                         } else {
573                                                                                 let new_intersection = intersection_set_ref.unwrap()
574                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
575                                                                                 intersection = Some(new_intersection);
576                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
577                                                                         }
578                                                                 }
579                                                         }
580                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
581                                                                 add_addr!(a);
582                                                         }
583                                                 }
584                                         }
585                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
586                                         asn_set.insert(0);
587                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
588                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
589                                         }
590                                         asn_set.clear();
591                                         asn_set.insert(0);
592                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
593                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
594                                         }
595                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
596                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
597                                         }
598                                 }
599                         }
600                         write_all(f, dns_buff)
601                 }).and_then(|(mut f, _)| {
602                         f.poll_sync_all()
603                 }).and_then(|_| {
604                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
605                 }).then(|_| { future::ok(()) })
606         }
607
608         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
609                 let mut res = Vec::with_capacity(128);
610
611                 {
612                         let mut nodes_lock = self.nodes.write().unwrap();
613                         let nodes = nodes_lock.borrow_mut();
614                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
615                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
616                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
617                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
618                                                 state_nodes.len() as u64);
619                                 for node in state_nodes.drain(..split_point as usize) {
620                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
621                                         res.push((&node).into());
622                                 }
623                         }
624                 }
625                 res.shuffle(&mut thread_rng());
626                 res
627         }
628 }