Impose a max conns/sec
[dnsseed-rust] / src / datastore.rs
1 use std::{cmp, mem};
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::Address;
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
23
24 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
25 pub enum AddressState {
26         Untested,
27         LowBlockCount,
28         HighBlockCount,
29         LowVersion,
30         BadVersion,
31         NotFullNode,
32         ProtocolViolation,
33         Timeout,
34         TimeoutDuringRequest,
35         TimeoutAwaitingPong,
36         TimeoutAwaitingAddr,
37         TimeoutAwaitingBlock,
38         Good,
39         WasGood,
40         EvilNode,
41 }
42
43 impl AddressState {
44         pub fn from_num(num: u8) -> Option<AddressState> {
45                 match num {
46                         0x0 => Some(AddressState::Untested),
47                         0x1 => Some(AddressState::LowBlockCount),
48                         0x2 => Some(AddressState::HighBlockCount),
49                         0x3 => Some(AddressState::LowVersion),
50                         0x4 => Some(AddressState::BadVersion),
51                         0x5 => Some(AddressState::NotFullNode),
52                         0x6 => Some(AddressState::ProtocolViolation),
53                         0x7 => Some(AddressState::Timeout),
54                         0x8 => Some(AddressState::TimeoutDuringRequest),
55                         0x9 => Some(AddressState::TimeoutAwaitingPong),
56                         0xa => Some(AddressState::TimeoutAwaitingAddr),
57                         0xb => Some(AddressState::TimeoutAwaitingBlock),
58                         0xc => Some(AddressState::Good),
59                         0xd => Some(AddressState::WasGood),
60                         0xe => Some(AddressState::EvilNode),
61                         _   => None,
62                 }
63         }
64
65         pub fn to_num(&self) -> u8 {
66                 match *self {
67                         AddressState::Untested => 0,
68                         AddressState::LowBlockCount => 1,
69                         AddressState::HighBlockCount => 2,
70                         AddressState::LowVersion => 3,
71                         AddressState::BadVersion => 4,
72                         AddressState::NotFullNode => 5,
73                         AddressState::ProtocolViolation => 6,
74                         AddressState::Timeout => 7,
75                         AddressState::TimeoutDuringRequest => 8,
76                         AddressState::TimeoutAwaitingPong => 9,
77                         AddressState::TimeoutAwaitingAddr => 10,
78                         AddressState::TimeoutAwaitingBlock => 11,
79                         AddressState::Good => 12,
80                         AddressState::WasGood => 13,
81                         AddressState::EvilNode => 14,
82                 }
83         }
84
85         pub fn to_str(&self) -> &'static str {
86                 match *self {
87                         AddressState::Untested => "Untested",
88                         AddressState::LowBlockCount => "Low Block Count",
89                         AddressState::HighBlockCount => "High Block Count",
90                         AddressState::LowVersion => "Low Version",
91                         AddressState::BadVersion => "Bad Version",
92                         AddressState::NotFullNode => "Not Full Node",
93                         AddressState::ProtocolViolation => "Protocol Violation",
94                         AddressState::Timeout => "Timeout",
95                         AddressState::TimeoutDuringRequest => "Timeout During Request",
96                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
97                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
98                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
99                         AddressState::Good => "Good",
100                         AddressState::WasGood => "Was Good",
101                         AddressState::EvilNode => "Evil Node",
102                 }
103         }
104
105         pub const fn get_count() -> u8 {
106                 15
107         }
108 }
109
110 #[derive(Hash, PartialEq, Eq)]
111 pub enum U64Setting {
112         RunTimeout,
113         WasGoodTimeout,
114         RescanInterval(AddressState),
115         MinProtocolVersion,
116 }
117
118 #[derive(Hash, PartialEq, Eq)]
119 pub enum RegexSetting {
120         SubverRegex,
121 }
122
123 struct Node {
124         last_update: Instant,
125         last_good: Instant, // Ignored unless state is Good or WasGood
126         last_services: u64,
127         state: AddressState,
128         queued: bool,
129 }
130
131 /// Essentially SocketAddr but without a traffic class or scope
132 #[derive(Clone, PartialEq, Eq, Hash)]
133 enum SockAddr {
134         V4(SocketAddrV4),
135         V6((Ipv6Addr, u16)),
136 }
137 impl From<SocketAddr> for SockAddr {
138         fn from(addr: SocketAddr) -> SockAddr {
139                 match addr {
140                         SocketAddr::V4(sa) => SockAddr::V4(sa),
141                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().clone(), sa.port())),
142                 }
143         }
144 }
145 impl Into<SocketAddr> for &SockAddr {
146         fn into(self) -> SocketAddr {
147                 match self {
148                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
149                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(sa.0, sa.1, 0, 0))
150                 }
151         }
152 }
153 impl ToString for SockAddr {
154         fn to_string(&self) -> String {
155                 let sa: SocketAddr = self.into();
156                 sa.to_string()
157         }
158 }
159 impl SockAddr {
160         pub fn port(&self) -> u16 {
161                 match *self {
162                         SockAddr::V4(sa) => sa.port(),
163                         SockAddr::V6((_, port)) => port,
164                 }
165         }
166         pub fn ip(&self) -> IpAddr {
167                 match *self {
168                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
169                         SockAddr::V6((ip, _)) => IpAddr::V6(ip),
170                 }
171         }
172 }
173
174 struct Nodes {
175         good_node_services: [HashSet<SockAddr>; 64],
176         nodes_to_state: HashMap<SockAddr, Node>,
177         state_next_scan: Vec<Vec<(Instant, SockAddr)>>,
178 }
179 struct NodesMutRef<'a> {
180         good_node_services: &'a mut [HashSet<SockAddr>; 64],
181         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
182         state_next_scan: &'a mut Vec<Vec<(Instant, SockAddr)>>,
183 }
184
185 impl Nodes {
186         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
187                 NodesMutRef {
188                         good_node_services: &mut self.good_node_services,
189                         nodes_to_state: &mut self.nodes_to_state,
190                         state_next_scan: &mut self.state_next_scan,
191                 }
192         }
193 }
194
195 pub struct Store {
196         u64_settings: RwLock<HashMap<U64Setting, u64>>,
197         subver_regex: RwLock<Arc<Regex>>,
198         nodes: RwLock<Nodes>,
199         store: String,
200 }
201
202 impl Store {
203         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
204                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
205                         let mut l = BufReader::new(f).lines();
206                         macro_rules! try_read {
207                                 ($lines: expr, $ty: ty) => { {
208                                         match $lines.next() {
209                                                 Some(line) => match line {
210                                                         Ok(line) => match line.parse::<$ty>() {
211                                                                 Ok(res) => res,
212                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
213                                                         },
214                                                         Err(e) => return future::err(e),
215                                                 },
216                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
217                                         }
218                                 } }
219                         }
220                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
221                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
222                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
223                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
224                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
225                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
226                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
227                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
237                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
238                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
239                         future::ok((u64s, try_read!(l, Regex)))
240                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
241                         let mut u64s = HashMap::with_capacity(15);
242                         u64s.insert(U64Setting::RunTimeout, 120);
243                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
244                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
245                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
246                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
247                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
257                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
258                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
259                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
260                         future::ok((u64s, Regex::new(".*").unwrap()))
261                 });
262
263                 macro_rules! nodes_uninitd {
264                         () => { {
265                                 let mut state_vecs = Vec::with_capacity(AddressState::get_count() as usize);
266                                 for _ in 0..AddressState::get_count() {
267                                         state_vecs.push(Vec::new());
268                                 }
269                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
270                                 Nodes {
271                                         good_node_services,
272                                         nodes_to_state: HashMap::new(),
273                                         state_next_scan: state_vecs,
274                                 }
275                         } }
276                 }
277
278                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
279                         let start_time = Instant::now() - Duration::from_secs(60 * 60 * 24);
280                         let mut res = nodes_uninitd!();
281                         let l = BufReader::new(f).lines();
282                         for line_res in l {
283                                 let line = match line_res {
284                                         Ok(l) => l,
285                                         Err(_) => return future::ok(res),
286                                 };
287                                 let mut line_iter = line.split(',');
288                                 macro_rules! try_read {
289                                         ($lines: expr, $ty: ty) => { {
290                                                 match $lines.next() {
291                                                         Some(line) => match line.parse::<$ty>() {
292                                                                 Ok(res) => res,
293                                                                 Err(_) => return future::ok(res),
294                                                         },
295                                                         None => return future::ok(res),
296                                                 }
297                                         } }
298                                 }
299                                 let sockaddr = try_read!(line_iter, SocketAddr);
300                                 let state = try_read!(line_iter, u8);
301                                 let last_services = try_read!(line_iter, u64);
302                                 let node = Node {
303                                         state: match AddressState::from_num(state) {
304                                                 Some(v) => v,
305                                                 None => return future::ok(res),
306                                         },
307                                         last_services,
308                                         last_update: Instant::now(),
309                                         last_good: Instant::now(),
310                                         queued: true,
311                                 };
312                                 if node.state == AddressState::Good {
313                                         for i in 0..64 {
314                                                 if node.last_services & (1 << i) != 0 {
315                                                         res.good_node_services[i].insert(sockaddr.into());
316                                                 }
317                                         }
318                                 }
319                                 res.state_next_scan[node.state.to_num() as usize].push((start_time, sockaddr.into()));
320                                 res.nodes_to_state.insert(sockaddr.into(), node);
321                         }
322                         future::ok(res)
323                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
324                         future::ok(nodes_uninitd!())
325                 });
326                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
327                         future::ok(Store {
328                                 u64_settings: RwLock::new(u64_settings),
329                                 subver_regex: RwLock::new(Arc::new(regex)),
330                                 nodes: RwLock::new(nodes),
331                                 store,
332                         })
333                 })
334         }
335
336         pub fn get_u64(&self, setting: U64Setting) -> u64 {
337                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
338         }
339
340         pub fn set_u64(&self, setting: U64Setting, value: u64) {
341                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
342         }
343
344         pub fn get_node_count(&self, state: AddressState) -> usize {
345                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
346         }
347
348         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
349                 Arc::clone(&*self.subver_regex.read().unwrap())
350         }
351
352         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
353                 *self.subver_regex.write().unwrap() = Arc::new(value);
354         }
355
356         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
357                 let mut res = 0;
358                 let mut nodes = self.nodes.write().unwrap();
359                 let cur_time = Instant::now();
360                 for addr in addresses {
361                         match nodes.nodes_to_state.entry(addr.into()) {
362                                 hash_map::Entry::Vacant(e) => {
363                                         e.insert(Node {
364                                                 state: AddressState::Untested,
365                                                 last_services: 0,
366                                                 last_update: cur_time,
367                                                 last_good: cur_time,
368                                                 queued: true,
369                                         });
370                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push((cur_time, addr.into()));
371                                         res += 1;
372                                 },
373                                 hash_map::Entry::Occupied(_) => {},
374                         }
375                 }
376                 res
377         }
378
379         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
380                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
381                         match addr.socket_addr() {
382                                 Ok(socketaddr) => Some(socketaddr),
383                                 Err(_) => None, // TODO: Handle onions
384                         }
385                 }));
386         }
387
388         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
389                 let addr: SockAddr = sockaddr.into();
390                 let now = Instant::now();
391
392                 let mut nodes_lock = self.nodes.write().unwrap();
393                 let nodes = nodes_lock.borrow_mut();
394
395                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
396                         state: AddressState::Untested,
397                         last_services: 0,
398                         last_update: now,
399                         last_good: now,
400                         queued: false,
401                 });
402                 let ret = state_ref.state;
403                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
404                                 && state != AddressState::Good
405                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
406                         state_ref.state = AddressState::WasGood;
407                         for i in 0..64 {
408                                 if state_ref.last_services & (1 << i) != 0 {
409                                         nodes.good_node_services[i].remove(&addr);
410                                 }
411                         }
412                         state_ref.last_services = 0;
413                         if !state_ref.queued {
414                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push((now, addr));
415                                 state_ref.queued = true;
416                         }
417                 } else {
418                         state_ref.state = state;
419                         if state == AddressState::Good {
420                                 for i in 0..64 {
421                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
422                                                 nodes.good_node_services[i].insert(addr.clone());
423                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
424                                                 nodes.good_node_services[i].remove(&addr);
425                                         }
426                                 }
427                                 state_ref.last_services = services;
428                                 state_ref.last_good = now;
429                         }
430                         if !state_ref.queued {
431                                 nodes.state_next_scan[state.to_num() as usize].push((now, addr));
432                                 state_ref.queued = true;
433                         }
434                 }
435                 state_ref.last_update = now;
436                 ret
437         }
438
439         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
440                 let settings_file = self.store.clone() + "/settings";
441                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
442                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
443                                 self.get_u64(U64Setting::RunTimeout),
444                                 self.get_u64(U64Setting::WasGoodTimeout),
445                                 self.get_u64(U64Setting::MinProtocolVersion),
446                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
447                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
448                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
449                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
450                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
451                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
452                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
453                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
454                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
455                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
456                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
457                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
458                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
459                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
460                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
461                                 self.get_regex(RegexSetting::SubverRegex).as_str());
462                         write_all(f, settings_string).and_then(|(mut f, _)| {
463                                 f.poll_sync_all()
464                         }).and_then(|_| {
465                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
466                         })
467                 });
468
469                 let nodes_file = self.store.clone() + "/nodes";
470                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
471                         let mut nodes_buff = String::new();
472                         {
473                                 let nodes = self.nodes.read().unwrap();
474                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 20);
475                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
476                                         nodes_buff += &sockaddr.to_string();
477                                         nodes_buff += ",";
478                                         nodes_buff += &node.state.to_num().to_string();
479                                         nodes_buff += ",";
480                                         nodes_buff += &node.last_services.to_string();
481                                         nodes_buff += "\n";
482                                 }
483                         }
484                         write_all(f, nodes_buff)
485                 }).and_then(|(mut f, _)| {
486                         f.poll_sync_all()
487                 }).and_then(|_| {
488                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
489                 });
490
491                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
492         }
493
494         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
495                 let dns_file = self.store.clone() + "/nodes.dump";
496                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
497                         let mut dns_buff = String::new();
498                         {
499                                 let mut rng = thread_rng();
500                                 for i in &[ 0b00000000001u64,
501                                             0b00000000100,
502                                             0b00000000101,
503                                             0b00000001000,
504                                             0b00000001001,
505                                             0b00000001100,
506                                             0b00000001101,
507                                             0b00001001001,
508                                             0b10000000000,
509                                             0b10000000001,
510                                             0b10000000100,
511                                             0b10000000101,
512                                             0b10000001000,
513                                             0b10000001001,
514                                             0b10000001100,
515                                             0b10000001101,
516                                             0b10001001000] {
517                                 //            ^ NODE_NETWORK_LIIMTED
518                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
519                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
520                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
521                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
522                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
523                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
524                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
525                                         macro_rules! add_addr { ($addr: expr) => {
526                                                 match $addr.ip() {
527                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
528                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
529                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
530                                                 }
531                                         } }
532                                         {
533                                                 let nodes = self.nodes.read().unwrap();
534                                                 if i.count_ones() == 1 {
535                                                         for j in 0..64 {
536                                                                 if i & (1 << j) != 0 {
537                                                                         let set_ref = &nodes.good_node_services[j];
538                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
539                                                                                 add_addr!(a);
540                                                                         }
541                                                                         break;
542                                                                 }
543                                                         }
544                                                 } else if i.count_ones() == 2 {
545                                                         let mut first_set = None;
546                                                         let mut second_set = None;
547                                                         for j in 0..64 {
548                                                                 if i & (1 << j) != 0 {
549                                                                         if first_set == None {
550                                                                                 first_set = Some(&nodes.good_node_services[j]);
551                                                                         } else {
552                                                                                 second_set = Some(&nodes.good_node_services[j]);
553                                                                                 break;
554                                                                         }
555                                                                 }
556                                                         }
557                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
558                                                                 add_addr!(a);
559                                                         }
560                                                 } else {
561                                                         //TODO: Could optimize this one a bit
562                                                         let mut intersection;
563                                                         let mut intersection_set_ref = None;
564                                                         for j in 0..64 {
565                                                                 if i & (1 << j) != 0 {
566                                                                         if intersection_set_ref == None {
567                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
568                                                                         } else {
569                                                                                 let new_intersection = intersection_set_ref.unwrap()
570                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
571                                                                                 intersection = Some(new_intersection);
572                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
573                                                                         }
574                                                                 }
575                                                         }
576                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
577                                                                 add_addr!(a);
578                                                         }
579                                                 }
580                                         }
581                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
582                                         asn_set.insert(0);
583                                         for a in v4_set.iter().filter(|a| asn_set.insert(bgp_client.get_asn(IpAddr::V4(**a)))).choose_multiple(&mut rng, 21) {
584                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{}\n", i, a);
585                                         }
586                                         asn_set.clear();
587                                         asn_set.insert(0);
588                                         for a in v6_set.iter().filter(|a| asn_set.insert(bgp_client.get_asn(IpAddr::V6(**a)))).choose_multiple(&mut rng, 10) {
589                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{}\n", i, a);
590                                         }
591                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
592                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{}\n", i, a);
593                                         }
594                                 }
595                         }
596                         write_all(f, dns_buff)
597                 }).and_then(|(mut f, _)| {
598                         f.poll_sync_all()
599                 }).and_then(|_| {
600                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
601                 }).then(|_| { future::ok(()) })
602         }
603
604         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
605                 let mut res = Vec::with_capacity(128);
606                 let cur_time = Instant::now();
607
608                 {
609                         let mut nodes_lock = self.nodes.write().unwrap();
610                         let nodes = nodes_lock.borrow_mut();
611                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
612                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
613                                 let cmp_time = cur_time - Duration::from_secs(rescan_interval);
614                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
615                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
616                                                 state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx) as u64);
617                                 let mut new_nodes = state_nodes.split_off(split_point as usize);
618                                 mem::swap(&mut new_nodes, state_nodes);
619                                 for (_, node) in new_nodes.drain(..) {
620                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
621                                         res.push((&node).into());
622                                 }
623                         }
624                 }
625                 res.shuffle(&mut thread_rng());
626                 res
627         }
628 }