Print ASN lookups in output zonefile comments
[dnsseed-rust] / src / datastore.rs
1 use std::{cmp, mem};
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::Address;
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
23
24 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
25 pub enum AddressState {
26         Untested,
27         LowBlockCount,
28         HighBlockCount,
29         LowVersion,
30         BadVersion,
31         NotFullNode,
32         ProtocolViolation,
33         Timeout,
34         TimeoutDuringRequest,
35         TimeoutAwaitingPong,
36         TimeoutAwaitingAddr,
37         TimeoutAwaitingBlock,
38         Good,
39         WasGood,
40         EvilNode,
41 }
42
43 impl AddressState {
44         pub fn from_num(num: u8) -> Option<AddressState> {
45                 match num {
46                         0x0 => Some(AddressState::Untested),
47                         0x1 => Some(AddressState::LowBlockCount),
48                         0x2 => Some(AddressState::HighBlockCount),
49                         0x3 => Some(AddressState::LowVersion),
50                         0x4 => Some(AddressState::BadVersion),
51                         0x5 => Some(AddressState::NotFullNode),
52                         0x6 => Some(AddressState::ProtocolViolation),
53                         0x7 => Some(AddressState::Timeout),
54                         0x8 => Some(AddressState::TimeoutDuringRequest),
55                         0x9 => Some(AddressState::TimeoutAwaitingPong),
56                         0xa => Some(AddressState::TimeoutAwaitingAddr),
57                         0xb => Some(AddressState::TimeoutAwaitingBlock),
58                         0xc => Some(AddressState::Good),
59                         0xd => Some(AddressState::WasGood),
60                         0xe => Some(AddressState::EvilNode),
61                         _   => None,
62                 }
63         }
64
65         pub fn to_num(&self) -> u8 {
66                 match *self {
67                         AddressState::Untested => 0,
68                         AddressState::LowBlockCount => 1,
69                         AddressState::HighBlockCount => 2,
70                         AddressState::LowVersion => 3,
71                         AddressState::BadVersion => 4,
72                         AddressState::NotFullNode => 5,
73                         AddressState::ProtocolViolation => 6,
74                         AddressState::Timeout => 7,
75                         AddressState::TimeoutDuringRequest => 8,
76                         AddressState::TimeoutAwaitingPong => 9,
77                         AddressState::TimeoutAwaitingAddr => 10,
78                         AddressState::TimeoutAwaitingBlock => 11,
79                         AddressState::Good => 12,
80                         AddressState::WasGood => 13,
81                         AddressState::EvilNode => 14,
82                 }
83         }
84
85         pub fn to_str(&self) -> &'static str {
86                 match *self {
87                         AddressState::Untested => "Untested",
88                         AddressState::LowBlockCount => "Low Block Count",
89                         AddressState::HighBlockCount => "High Block Count",
90                         AddressState::LowVersion => "Low Version",
91                         AddressState::BadVersion => "Bad Version",
92                         AddressState::NotFullNode => "Not Full Node",
93                         AddressState::ProtocolViolation => "Protocol Violation",
94                         AddressState::Timeout => "Timeout",
95                         AddressState::TimeoutDuringRequest => "Timeout During Request",
96                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
97                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
98                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
99                         AddressState::Good => "Good",
100                         AddressState::WasGood => "Was Good",
101                         AddressState::EvilNode => "Evil Node",
102                 }
103         }
104
105         pub const fn get_count() -> u8 {
106                 15
107         }
108 }
109
110 #[derive(Hash, PartialEq, Eq)]
111 pub enum U64Setting {
112         RunTimeout,
113         WasGoodTimeout,
114         RescanInterval(AddressState),
115         MinProtocolVersion,
116 }
117
118 #[derive(Hash, PartialEq, Eq)]
119 pub enum RegexSetting {
120         SubverRegex,
121 }
122
123 struct Node {
124         last_update: Instant,
125         last_good: Instant, // Ignored unless state is Good or WasGood
126         last_services: u64,
127         state: AddressState,
128         queued: bool,
129 }
130
131 /// Essentially SocketAddr but without a traffic class or scope
132 #[derive(Clone, PartialEq, Eq, Hash)]
133 enum SockAddr {
134         V4(SocketAddrV4),
135         V6((Ipv6Addr, u16)),
136 }
137 impl From<SocketAddr> for SockAddr {
138         fn from(addr: SocketAddr) -> SockAddr {
139                 match addr {
140                         SocketAddr::V4(sa) => SockAddr::V4(sa),
141                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().clone(), sa.port())),
142                 }
143         }
144 }
145 impl Into<SocketAddr> for &SockAddr {
146         fn into(self) -> SocketAddr {
147                 match self {
148                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
149                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(sa.0, sa.1, 0, 0))
150                 }
151         }
152 }
153 impl ToString for SockAddr {
154         fn to_string(&self) -> String {
155                 let sa: SocketAddr = self.into();
156                 sa.to_string()
157         }
158 }
159 impl SockAddr {
160         pub fn port(&self) -> u16 {
161                 match *self {
162                         SockAddr::V4(sa) => sa.port(),
163                         SockAddr::V6((_, port)) => port,
164                 }
165         }
166         pub fn ip(&self) -> IpAddr {
167                 match *self {
168                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
169                         SockAddr::V6((ip, _)) => IpAddr::V6(ip),
170                 }
171         }
172 }
173
174 struct Nodes {
175         good_node_services: [HashSet<SockAddr>; 64],
176         nodes_to_state: HashMap<SockAddr, Node>,
177         state_next_scan: Vec<Vec<SockAddr>>,
178 }
179 struct NodesMutRef<'a> {
180         good_node_services: &'a mut [HashSet<SockAddr>; 64],
181         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
182         state_next_scan: &'a mut Vec<Vec<SockAddr>>,
183 }
184
185 impl Nodes {
186         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
187                 NodesMutRef {
188                         good_node_services: &mut self.good_node_services,
189                         nodes_to_state: &mut self.nodes_to_state,
190                         state_next_scan: &mut self.state_next_scan,
191                 }
192         }
193 }
194
195 pub struct Store {
196         u64_settings: RwLock<HashMap<U64Setting, u64>>,
197         subver_regex: RwLock<Arc<Regex>>,
198         nodes: RwLock<Nodes>,
199         store: String,
200 }
201
202 impl Store {
203         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
204                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
205                         let mut l = BufReader::new(f).lines();
206                         macro_rules! try_read {
207                                 ($lines: expr, $ty: ty) => { {
208                                         match $lines.next() {
209                                                 Some(line) => match line {
210                                                         Ok(line) => match line.parse::<$ty>() {
211                                                                 Ok(res) => res,
212                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
213                                                         },
214                                                         Err(e) => return future::err(e),
215                                                 },
216                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
217                                         }
218                                 } }
219                         }
220                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
221                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
222                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
223                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
224                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
225                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
226                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
227                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
237                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
238                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
239                         future::ok((u64s, try_read!(l, Regex)))
240                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
241                         let mut u64s = HashMap::with_capacity(15);
242                         u64s.insert(U64Setting::RunTimeout, 120);
243                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
244                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
245                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
246                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
247                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
257                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
258                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
259                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
260                         future::ok((u64s, Regex::new(".*").unwrap()))
261                 });
262
263                 macro_rules! nodes_uninitd {
264                         () => { {
265                                 let mut state_vecs = Vec::with_capacity(AddressState::get_count() as usize);
266                                 for _ in 0..AddressState::get_count() {
267                                         state_vecs.push(Vec::new());
268                                 }
269                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
270                                 Nodes {
271                                         good_node_services,
272                                         nodes_to_state: HashMap::new(),
273                                         state_next_scan: state_vecs,
274                                 }
275                         } }
276                 }
277
278                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
279                         let mut res = nodes_uninitd!();
280                         let l = BufReader::new(f).lines();
281                         for line_res in l {
282                                 let line = match line_res {
283                                         Ok(l) => l,
284                                         Err(_) => return future::ok(res),
285                                 };
286                                 let mut line_iter = line.split(',');
287                                 macro_rules! try_read {
288                                         ($lines: expr, $ty: ty) => { {
289                                                 match $lines.next() {
290                                                         Some(line) => match line.parse::<$ty>() {
291                                                                 Ok(res) => res,
292                                                                 Err(_) => return future::ok(res),
293                                                         },
294                                                         None => return future::ok(res),
295                                                 }
296                                         } }
297                                 }
298                                 let sockaddr = try_read!(line_iter, SocketAddr);
299                                 let state = try_read!(line_iter, u8);
300                                 let last_services = try_read!(line_iter, u64);
301                                 let node = Node {
302                                         state: match AddressState::from_num(state) {
303                                                 Some(v) => v,
304                                                 None => return future::ok(res),
305                                         },
306                                         last_services,
307                                         last_update: Instant::now(),
308                                         last_good: Instant::now(),
309                                         queued: true,
310                                 };
311                                 if node.state == AddressState::Good {
312                                         for i in 0..64 {
313                                                 if node.last_services & (1 << i) != 0 {
314                                                         res.good_node_services[i].insert(sockaddr.into());
315                                                 }
316                                         }
317                                 }
318                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
319                                 res.nodes_to_state.insert(sockaddr.into(), node);
320                         }
321                         future::ok(res)
322                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
323                         future::ok(nodes_uninitd!())
324                 });
325                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
326                         future::ok(Store {
327                                 u64_settings: RwLock::new(u64_settings),
328                                 subver_regex: RwLock::new(Arc::new(regex)),
329                                 nodes: RwLock::new(nodes),
330                                 store,
331                         })
332                 })
333         }
334
335         pub fn get_u64(&self, setting: U64Setting) -> u64 {
336                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
337         }
338
339         pub fn set_u64(&self, setting: U64Setting, value: u64) {
340                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
341         }
342
343         pub fn get_node_count(&self, state: AddressState) -> usize {
344                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
345         }
346
347         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
348                 Arc::clone(&*self.subver_regex.read().unwrap())
349         }
350
351         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
352                 *self.subver_regex.write().unwrap() = Arc::new(value);
353         }
354
355         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
356                 let mut res = 0;
357                 let mut nodes = self.nodes.write().unwrap();
358                 let cur_time = Instant::now();
359                 for addr in addresses {
360                         match nodes.nodes_to_state.entry(addr.into()) {
361                                 hash_map::Entry::Vacant(e) => {
362                                         e.insert(Node {
363                                                 state: AddressState::Untested,
364                                                 last_services: 0,
365                                                 last_update: cur_time,
366                                                 last_good: cur_time,
367                                                 queued: true,
368                                         });
369                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
370                                         res += 1;
371                                 },
372                                 hash_map::Entry::Occupied(_) => {},
373                         }
374                 }
375                 res
376         }
377
378         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
379                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
380                         match addr.socket_addr() {
381                                 Ok(socketaddr) => Some(socketaddr),
382                                 Err(_) => None, // TODO: Handle onions
383                         }
384                 }));
385         }
386
387         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
388                 let addr: SockAddr = sockaddr.into();
389                 let now = Instant::now();
390
391                 let mut nodes_lock = self.nodes.write().unwrap();
392                 let nodes = nodes_lock.borrow_mut();
393
394                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
395                         state: AddressState::Untested,
396                         last_services: 0,
397                         last_update: now,
398                         last_good: now,
399                         queued: false,
400                 });
401                 let ret = state_ref.state;
402                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
403                                 && state != AddressState::Good
404                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
405                         state_ref.state = AddressState::WasGood;
406                         for i in 0..64 {
407                                 if state_ref.last_services & (1 << i) != 0 {
408                                         nodes.good_node_services[i].remove(&addr);
409                                 }
410                         }
411                         state_ref.last_services = 0;
412                         if !state_ref.queued {
413                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
414                                 state_ref.queued = true;
415                         }
416                 } else {
417                         state_ref.state = state;
418                         if state == AddressState::Good {
419                                 for i in 0..64 {
420                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
421                                                 nodes.good_node_services[i].insert(addr.clone());
422                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
423                                                 nodes.good_node_services[i].remove(&addr);
424                                         }
425                                 }
426                                 state_ref.last_services = services;
427                                 state_ref.last_good = now;
428                         }
429                         if !state_ref.queued {
430                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
431                                 state_ref.queued = true;
432                         }
433                 }
434                 state_ref.last_update = now;
435                 ret
436         }
437
438         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
439                 let settings_file = self.store.clone() + "/settings";
440                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
441                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
442                                 self.get_u64(U64Setting::RunTimeout),
443                                 self.get_u64(U64Setting::WasGoodTimeout),
444                                 self.get_u64(U64Setting::MinProtocolVersion),
445                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
446                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
447                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
448                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
449                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
450                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
451                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
452                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
453                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
454                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
455                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
456                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
457                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
458                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
459                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
460                                 self.get_regex(RegexSetting::SubverRegex).as_str());
461                         write_all(f, settings_string).and_then(|(mut f, _)| {
462                                 f.poll_sync_all()
463                         }).and_then(|_| {
464                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
465                         })
466                 });
467
468                 let nodes_file = self.store.clone() + "/nodes";
469                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
470                         let mut nodes_buff = String::new();
471                         {
472                                 let nodes = self.nodes.read().unwrap();
473                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 20);
474                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
475                                         nodes_buff += &sockaddr.to_string();
476                                         nodes_buff += ",";
477                                         nodes_buff += &node.state.to_num().to_string();
478                                         nodes_buff += ",";
479                                         nodes_buff += &node.last_services.to_string();
480                                         nodes_buff += "\n";
481                                 }
482                         }
483                         write_all(f, nodes_buff)
484                 }).and_then(|(mut f, _)| {
485                         f.poll_sync_all()
486                 }).and_then(|_| {
487                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
488                 });
489
490                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
491         }
492
493         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
494                 let dns_file = self.store.clone() + "/nodes.dump";
495                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
496                         let mut dns_buff = String::new();
497                         {
498                                 let mut rng = thread_rng();
499                                 for i in &[ 0b00000000001u64,
500                                             0b00000000100,
501                                             0b00000000101,
502                                             0b00000001000,
503                                             0b00000001001,
504                                             0b00000001100,
505                                             0b00000001101,
506                                             0b00001001001,
507                                             0b10000000000,
508                                             0b10000000001,
509                                             0b10000000100,
510                                             0b10000000101,
511                                             0b10000001000,
512                                             0b10000001001,
513                                             0b10000001100,
514                                             0b10000001101,
515                                             0b10001001000] {
516                                 //            ^ NODE_NETWORK_LIIMTED
517                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
518                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
519                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
520                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
521                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
522                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
523                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
524                                         macro_rules! add_addr { ($addr: expr) => {
525                                                 match $addr.ip() {
526                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
527                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
528                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
529                                                 }
530                                         } }
531                                         {
532                                                 let nodes = self.nodes.read().unwrap();
533                                                 if i.count_ones() == 1 {
534                                                         for j in 0..64 {
535                                                                 if i & (1 << j) != 0 {
536                                                                         let set_ref = &nodes.good_node_services[j];
537                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
538                                                                                 add_addr!(a);
539                                                                         }
540                                                                         break;
541                                                                 }
542                                                         }
543                                                 } else if i.count_ones() == 2 {
544                                                         let mut first_set = None;
545                                                         let mut second_set = None;
546                                                         for j in 0..64 {
547                                                                 if i & (1 << j) != 0 {
548                                                                         if first_set == None {
549                                                                                 first_set = Some(&nodes.good_node_services[j]);
550                                                                         } else {
551                                                                                 second_set = Some(&nodes.good_node_services[j]);
552                                                                                 break;
553                                                                         }
554                                                                 }
555                                                         }
556                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
557                                                                 add_addr!(a);
558                                                         }
559                                                 } else {
560                                                         //TODO: Could optimize this one a bit
561                                                         let mut intersection;
562                                                         let mut intersection_set_ref = None;
563                                                         for j in 0..64 {
564                                                                 if i & (1 << j) != 0 {
565                                                                         if intersection_set_ref == None {
566                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
567                                                                         } else {
568                                                                                 let new_intersection = intersection_set_ref.unwrap()
569                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
570                                                                                 intersection = Some(new_intersection);
571                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
572                                                                         }
573                                                                 }
574                                                         }
575                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
576                                                                 add_addr!(a);
577                                                         }
578                                                 }
579                                         }
580                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
581                                         asn_set.insert(0);
582                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
583                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
584                                         }
585                                         asn_set.clear();
586                                         asn_set.insert(0);
587                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
588                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
589                                         }
590                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
591                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
592                                         }
593                                 }
594                         }
595                         write_all(f, dns_buff)
596                 }).and_then(|(mut f, _)| {
597                         f.poll_sync_all()
598                 }).and_then(|_| {
599                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
600                 }).then(|_| { future::ok(()) })
601         }
602
603         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
604                 let mut res = Vec::with_capacity(128);
605
606                 {
607                         let mut nodes_lock = self.nodes.write().unwrap();
608                         let nodes = nodes_lock.borrow_mut();
609                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
610                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
611                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
612                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
613                                                 state_nodes.len() as u64);
614                                 let mut new_nodes = state_nodes.split_off(split_point as usize);
615                                 mem::swap(&mut new_nodes, state_nodes);
616                                 for node in new_nodes.drain(..) {
617                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
618                                         res.push((&node).into());
619                                 }
620                         }
621                 }
622                 res.shuffle(&mut thread_rng());
623                 res
624         }
625 }