Fewer full vec allocations
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::Address;
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
23
24 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
25 pub enum AddressState {
26         Untested,
27         LowBlockCount,
28         HighBlockCount,
29         LowVersion,
30         BadVersion,
31         NotFullNode,
32         ProtocolViolation,
33         Timeout,
34         TimeoutDuringRequest,
35         TimeoutAwaitingPong,
36         TimeoutAwaitingAddr,
37         TimeoutAwaitingBlock,
38         Good,
39         WasGood,
40         EvilNode,
41 }
42
43 impl AddressState {
44         pub fn from_num(num: u8) -> Option<AddressState> {
45                 match num {
46                         0x0 => Some(AddressState::Untested),
47                         0x1 => Some(AddressState::LowBlockCount),
48                         0x2 => Some(AddressState::HighBlockCount),
49                         0x3 => Some(AddressState::LowVersion),
50                         0x4 => Some(AddressState::BadVersion),
51                         0x5 => Some(AddressState::NotFullNode),
52                         0x6 => Some(AddressState::ProtocolViolation),
53                         0x7 => Some(AddressState::Timeout),
54                         0x8 => Some(AddressState::TimeoutDuringRequest),
55                         0x9 => Some(AddressState::TimeoutAwaitingPong),
56                         0xa => Some(AddressState::TimeoutAwaitingAddr),
57                         0xb => Some(AddressState::TimeoutAwaitingBlock),
58                         0xc => Some(AddressState::Good),
59                         0xd => Some(AddressState::WasGood),
60                         0xe => Some(AddressState::EvilNode),
61                         _   => None,
62                 }
63         }
64
65         pub fn to_num(&self) -> u8 {
66                 match *self {
67                         AddressState::Untested => 0,
68                         AddressState::LowBlockCount => 1,
69                         AddressState::HighBlockCount => 2,
70                         AddressState::LowVersion => 3,
71                         AddressState::BadVersion => 4,
72                         AddressState::NotFullNode => 5,
73                         AddressState::ProtocolViolation => 6,
74                         AddressState::Timeout => 7,
75                         AddressState::TimeoutDuringRequest => 8,
76                         AddressState::TimeoutAwaitingPong => 9,
77                         AddressState::TimeoutAwaitingAddr => 10,
78                         AddressState::TimeoutAwaitingBlock => 11,
79                         AddressState::Good => 12,
80                         AddressState::WasGood => 13,
81                         AddressState::EvilNode => 14,
82                 }
83         }
84
85         pub fn to_str(&self) -> &'static str {
86                 match *self {
87                         AddressState::Untested => "Untested",
88                         AddressState::LowBlockCount => "Low Block Count",
89                         AddressState::HighBlockCount => "High Block Count",
90                         AddressState::LowVersion => "Low Version",
91                         AddressState::BadVersion => "Bad Version",
92                         AddressState::NotFullNode => "Not Full Node",
93                         AddressState::ProtocolViolation => "Protocol Violation",
94                         AddressState::Timeout => "Timeout",
95                         AddressState::TimeoutDuringRequest => "Timeout During Request",
96                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
97                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
98                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
99                         AddressState::Good => "Good",
100                         AddressState::WasGood => "Was Good",
101                         AddressState::EvilNode => "Evil Node",
102                 }
103         }
104
105         pub const fn get_count() -> u8 {
106                 15
107         }
108 }
109
110 #[derive(Hash, PartialEq, Eq)]
111 pub enum U64Setting {
112         RunTimeout,
113         WasGoodTimeout,
114         RescanInterval(AddressState),
115         MinProtocolVersion,
116 }
117
118 #[derive(Hash, PartialEq, Eq)]
119 pub enum RegexSetting {
120         SubverRegex,
121 }
122
123 struct Node {
124         last_update: Instant,
125         last_good: Instant, // Ignored unless state is Good or WasGood
126         last_services: u64,
127         state: AddressState,
128         queued: bool,
129 }
130
131 /// Essentially SocketAddr but without a traffic class or scope
132 #[derive(Clone, PartialEq, Eq, Hash)]
133 enum SockAddr {
134         V4(SocketAddrV4),
135         V6((Ipv6Addr, u16)),
136 }
137 impl From<SocketAddr> for SockAddr {
138         fn from(addr: SocketAddr) -> SockAddr {
139                 match addr {
140                         SocketAddr::V4(sa) => SockAddr::V4(sa),
141                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().clone(), sa.port())),
142                 }
143         }
144 }
145 impl Into<SocketAddr> for &SockAddr {
146         fn into(self) -> SocketAddr {
147                 match self {
148                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
149                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(sa.0, sa.1, 0, 0))
150                 }
151         }
152 }
153 impl ToString for SockAddr {
154         fn to_string(&self) -> String {
155                 let sa: SocketAddr = self.into();
156                 sa.to_string()
157         }
158 }
159 impl SockAddr {
160         pub fn port(&self) -> u16 {
161                 match *self {
162                         SockAddr::V4(sa) => sa.port(),
163                         SockAddr::V6((_, port)) => port,
164                 }
165         }
166         pub fn ip(&self) -> IpAddr {
167                 match *self {
168                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
169                         SockAddr::V6((ip, _)) => IpAddr::V6(ip),
170                 }
171         }
172 }
173
174 struct Nodes {
175         good_node_services: [HashSet<SockAddr>; 64],
176         nodes_to_state: HashMap<SockAddr, Node>,
177         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
178 }
179 struct NodesMutRef<'a> {
180         good_node_services: &'a mut [HashSet<SockAddr>; 64],
181         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
182         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
183 }
184
185 impl Nodes {
186         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
187                 NodesMutRef {
188                         good_node_services: &mut self.good_node_services,
189                         nodes_to_state: &mut self.nodes_to_state,
190                         state_next_scan: &mut self.state_next_scan,
191                 }
192         }
193 }
194
195 pub struct Store {
196         u64_settings: RwLock<HashMap<U64Setting, u64>>,
197         subver_regex: RwLock<Arc<Regex>>,
198         nodes: RwLock<Nodes>,
199         store: String,
200 }
201
202 impl Store {
203         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
204                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
205                         let mut l = BufReader::new(f).lines();
206                         macro_rules! try_read {
207                                 ($lines: expr, $ty: ty) => { {
208                                         match $lines.next() {
209                                                 Some(line) => match line {
210                                                         Ok(line) => match line.parse::<$ty>() {
211                                                                 Ok(res) => res,
212                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
213                                                         },
214                                                         Err(e) => return future::err(e),
215                                                 },
216                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
217                                         }
218                                 } }
219                         }
220                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
221                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
222                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
223                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
224                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
225                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
226                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
227                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
237                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
238                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
239                         future::ok((u64s, try_read!(l, Regex)))
240                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
241                         let mut u64s = HashMap::with_capacity(15);
242                         u64s.insert(U64Setting::RunTimeout, 120);
243                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
244                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
245                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
246                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
247                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
257                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
258                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
259                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
260                         future::ok((u64s, Regex::new(".*").unwrap()))
261                 });
262
263                 macro_rules! nodes_uninitd {
264                         () => { {
265                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
266                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
267                                 Nodes {
268                                         good_node_services,
269                                         nodes_to_state: HashMap::new(),
270                                         state_next_scan: state_vecs,
271                                 }
272                         } }
273                 }
274
275                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
276                         let mut res = nodes_uninitd!();
277                         let l = BufReader::new(f).lines();
278                         for line_res in l {
279                                 let line = match line_res {
280                                         Ok(l) => l,
281                                         Err(_) => return future::ok(res),
282                                 };
283                                 let mut line_iter = line.split(',');
284                                 macro_rules! try_read {
285                                         ($lines: expr, $ty: ty) => { {
286                                                 match $lines.next() {
287                                                         Some(line) => match line.parse::<$ty>() {
288                                                                 Ok(res) => res,
289                                                                 Err(_) => return future::ok(res),
290                                                         },
291                                                         None => return future::ok(res),
292                                                 }
293                                         } }
294                                 }
295                                 let sockaddr = try_read!(line_iter, SocketAddr);
296                                 let state = try_read!(line_iter, u8);
297                                 let last_services = try_read!(line_iter, u64);
298                                 let node = Node {
299                                         state: match AddressState::from_num(state) {
300                                                 Some(v) => v,
301                                                 None => return future::ok(res),
302                                         },
303                                         last_services,
304                                         last_update: Instant::now(),
305                                         last_good: Instant::now(),
306                                         queued: true,
307                                 };
308                                 if node.state == AddressState::Good {
309                                         for i in 0..64 {
310                                                 if node.last_services & (1 << i) != 0 {
311                                                         res.good_node_services[i].insert(sockaddr.into());
312                                                 }
313                                         }
314                                 }
315                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
316                                 res.nodes_to_state.insert(sockaddr.into(), node);
317                         }
318                         future::ok(res)
319                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
320                         future::ok(nodes_uninitd!())
321                 });
322                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
323                         future::ok(Store {
324                                 u64_settings: RwLock::new(u64_settings),
325                                 subver_regex: RwLock::new(Arc::new(regex)),
326                                 nodes: RwLock::new(nodes),
327                                 store,
328                         })
329                 })
330         }
331
332         pub fn get_u64(&self, setting: U64Setting) -> u64 {
333                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
334         }
335
336         pub fn set_u64(&self, setting: U64Setting, value: u64) {
337                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
338         }
339
340         pub fn get_node_count(&self, state: AddressState) -> usize {
341                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
342         }
343
344         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
345                 Arc::clone(&*self.subver_regex.read().unwrap())
346         }
347
348         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
349                 *self.subver_regex.write().unwrap() = Arc::new(value);
350         }
351
352         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
353                 let mut res = 0;
354                 let mut nodes = self.nodes.write().unwrap();
355                 let cur_time = Instant::now();
356                 for addr in addresses {
357                         match nodes.nodes_to_state.entry(addr.into()) {
358                                 hash_map::Entry::Vacant(e) => {
359                                         e.insert(Node {
360                                                 state: AddressState::Untested,
361                                                 last_services: 0,
362                                                 last_update: cur_time,
363                                                 last_good: cur_time,
364                                                 queued: true,
365                                         });
366                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
367                                         res += 1;
368                                 },
369                                 hash_map::Entry::Occupied(_) => {},
370                         }
371                 }
372                 res
373         }
374
375         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
376                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
377                         match addr.socket_addr() {
378                                 Ok(socketaddr) => Some(socketaddr),
379                                 Err(_) => None, // TODO: Handle onions
380                         }
381                 }));
382         }
383
384         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
385                 let addr: SockAddr = sockaddr.into();
386                 let now = Instant::now();
387
388                 let mut nodes_lock = self.nodes.write().unwrap();
389                 let nodes = nodes_lock.borrow_mut();
390
391                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
392                         state: AddressState::Untested,
393                         last_services: 0,
394                         last_update: now,
395                         last_good: now,
396                         queued: false,
397                 });
398                 let ret = state_ref.state;
399                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
400                                 && state != AddressState::Good
401                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
402                         state_ref.state = AddressState::WasGood;
403                         for i in 0..64 {
404                                 if state_ref.last_services & (1 << i) != 0 {
405                                         nodes.good_node_services[i].remove(&addr);
406                                 }
407                         }
408                         state_ref.last_services = 0;
409                         if !state_ref.queued {
410                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
411                                 state_ref.queued = true;
412                         }
413                 } else {
414                         state_ref.state = state;
415                         if state == AddressState::Good {
416                                 for i in 0..64 {
417                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
418                                                 nodes.good_node_services[i].insert(addr.clone());
419                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
420                                                 nodes.good_node_services[i].remove(&addr);
421                                         }
422                                 }
423                                 state_ref.last_services = services;
424                                 state_ref.last_good = now;
425                         }
426                         if !state_ref.queued {
427                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
428                                 state_ref.queued = true;
429                         }
430                 }
431                 state_ref.last_update = now;
432                 ret
433         }
434
435         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
436                 let settings_file = self.store.clone() + "/settings";
437                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
438                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
439                                 self.get_u64(U64Setting::RunTimeout),
440                                 self.get_u64(U64Setting::WasGoodTimeout),
441                                 self.get_u64(U64Setting::MinProtocolVersion),
442                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
443                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
444                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
445                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
446                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
447                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
448                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
449                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
450                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
451                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
452                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
453                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
454                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
455                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
456                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
457                                 self.get_regex(RegexSetting::SubverRegex).as_str());
458                         write_all(f, settings_string).and_then(|(mut f, _)| {
459                                 f.poll_sync_all()
460                         }).and_then(|_| {
461                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
462                         })
463                 });
464
465                 let nodes_file = self.store.clone() + "/nodes";
466                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
467                         let mut nodes_buff = String::new();
468                         {
469                                 let nodes = self.nodes.read().unwrap();
470                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
471                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
472                                         nodes_buff += &sockaddr.to_string();
473                                         nodes_buff += ",";
474                                         nodes_buff += &node.state.to_num().to_string();
475                                         nodes_buff += ",";
476                                         nodes_buff += &node.last_services.to_string();
477                                         nodes_buff += "\n";
478                                 }
479                         }
480                         write_all(f, nodes_buff)
481                 }).and_then(|(mut f, _)| {
482                         f.poll_sync_all()
483                 }).and_then(|_| {
484                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
485                 });
486
487                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
488         }
489
490         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
491                 let dns_file = self.store.clone() + "/nodes.dump";
492                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
493                         let mut dns_buff = String::new();
494                         {
495                                 let mut rng = thread_rng();
496                                 for i in &[ 0b00000000001u64,
497                                             0b00000000100,
498                                             0b00000000101,
499                                             0b00000001000,
500                                             0b00000001001,
501                                             0b00000001100,
502                                             0b00000001101,
503                                             0b00001001001,
504                                             0b10000000000,
505                                             0b10000000001,
506                                             0b10000000100,
507                                             0b10000000101,
508                                             0b10000001000,
509                                             0b10000001001,
510                                             0b10000001100,
511                                             0b10000001101,
512                                             0b10001001000] {
513                                 //            ^ NODE_NETWORK_LIIMTED
514                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
515                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
516                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
517                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
518                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
519                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
520                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
521                                         macro_rules! add_addr { ($addr: expr) => {
522                                                 match $addr.ip() {
523                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
524                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
525                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
526                                                 }
527                                         } }
528                                         {
529                                                 let nodes = self.nodes.read().unwrap();
530                                                 if i.count_ones() == 1 {
531                                                         for j in 0..64 {
532                                                                 if i & (1 << j) != 0 {
533                                                                         let set_ref = &nodes.good_node_services[j];
534                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
535                                                                                 add_addr!(a);
536                                                                         }
537                                                                         break;
538                                                                 }
539                                                         }
540                                                 } else if i.count_ones() == 2 {
541                                                         let mut first_set = None;
542                                                         let mut second_set = None;
543                                                         for j in 0..64 {
544                                                                 if i & (1 << j) != 0 {
545                                                                         if first_set == None {
546                                                                                 first_set = Some(&nodes.good_node_services[j]);
547                                                                         } else {
548                                                                                 second_set = Some(&nodes.good_node_services[j]);
549                                                                                 break;
550                                                                         }
551                                                                 }
552                                                         }
553                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
554                                                                 add_addr!(a);
555                                                         }
556                                                 } else {
557                                                         //TODO: Could optimize this one a bit
558                                                         let mut intersection;
559                                                         let mut intersection_set_ref = None;
560                                                         for j in 0..64 {
561                                                                 if i & (1 << j) != 0 {
562                                                                         if intersection_set_ref == None {
563                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
564                                                                         } else {
565                                                                                 let new_intersection = intersection_set_ref.unwrap()
566                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
567                                                                                 intersection = Some(new_intersection);
568                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
569                                                                         }
570                                                                 }
571                                                         }
572                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
573                                                                 add_addr!(a);
574                                                         }
575                                                 }
576                                         }
577                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
578                                         asn_set.insert(0);
579                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
580                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
581                                         }
582                                         asn_set.clear();
583                                         asn_set.insert(0);
584                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
585                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
586                                         }
587                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
588                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
589                                         }
590                                 }
591                         }
592                         write_all(f, dns_buff)
593                 }).and_then(|(mut f, _)| {
594                         f.poll_sync_all()
595                 }).and_then(|_| {
596                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
597                 }).then(|_| { future::ok(()) })
598         }
599
600         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
601                 let mut res = Vec::with_capacity(128);
602
603                 {
604                         let mut nodes_lock = self.nodes.write().unwrap();
605                         let nodes = nodes_lock.borrow_mut();
606                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
607                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
608                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
609                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
610                                                 state_nodes.len() as u64);
611                                 for node in state_nodes.drain(..split_point as usize) {
612                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
613                                         res.push((&node).into());
614                                 }
615                         }
616                 }
617                 res.shuffle(&mut thread_rng());
618                 res
619         }
620 }