Add some basic COMPACT_FILTERS support
[dnsseed-rust] / src / datastore.rs
1 use std::{cmp, mem};
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::Address;
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22
23 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
24 pub enum AddressState {
25         Untested,
26         LowBlockCount,
27         HighBlockCount,
28         LowVersion,
29         BadVersion,
30         NotFullNode,
31         ProtocolViolation,
32         Timeout,
33         TimeoutDuringRequest,
34         TimeoutAwaitingPong,
35         TimeoutAwaitingAddr,
36         TimeoutAwaitingBlock,
37         Good,
38         WasGood,
39         EvilNode,
40 }
41
42 impl AddressState {
43         pub fn from_num(num: u8) -> Option<AddressState> {
44                 match num {
45                         0x0 => Some(AddressState::Untested),
46                         0x1 => Some(AddressState::LowBlockCount),
47                         0x2 => Some(AddressState::HighBlockCount),
48                         0x3 => Some(AddressState::LowVersion),
49                         0x4 => Some(AddressState::BadVersion),
50                         0x5 => Some(AddressState::NotFullNode),
51                         0x6 => Some(AddressState::ProtocolViolation),
52                         0x7 => Some(AddressState::Timeout),
53                         0x8 => Some(AddressState::TimeoutDuringRequest),
54                         0x9 => Some(AddressState::TimeoutAwaitingPong),
55                         0xa => Some(AddressState::TimeoutAwaitingAddr),
56                         0xb => Some(AddressState::TimeoutAwaitingBlock),
57                         0xc => Some(AddressState::Good),
58                         0xd => Some(AddressState::WasGood),
59                         0xe => Some(AddressState::EvilNode),
60                         _   => None,
61                 }
62         }
63
64         pub fn to_num(&self) -> u8 {
65                 match *self {
66                         AddressState::Untested => 0,
67                         AddressState::LowBlockCount => 1,
68                         AddressState::HighBlockCount => 2,
69                         AddressState::LowVersion => 3,
70                         AddressState::BadVersion => 4,
71                         AddressState::NotFullNode => 5,
72                         AddressState::ProtocolViolation => 6,
73                         AddressState::Timeout => 7,
74                         AddressState::TimeoutDuringRequest => 8,
75                         AddressState::TimeoutAwaitingPong => 9,
76                         AddressState::TimeoutAwaitingAddr => 10,
77                         AddressState::TimeoutAwaitingBlock => 11,
78                         AddressState::Good => 12,
79                         AddressState::WasGood => 13,
80                         AddressState::EvilNode => 14,
81                 }
82         }
83
84         pub fn to_str(&self) -> &'static str {
85                 match *self {
86                         AddressState::Untested => "Untested",
87                         AddressState::LowBlockCount => "Low Block Count",
88                         AddressState::HighBlockCount => "High Block Count",
89                         AddressState::LowVersion => "Low Version",
90                         AddressState::BadVersion => "Bad Version",
91                         AddressState::NotFullNode => "Not Full Node",
92                         AddressState::ProtocolViolation => "Protocol Violation",
93                         AddressState::Timeout => "Timeout",
94                         AddressState::TimeoutDuringRequest => "Timeout During Request",
95                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
96                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
97                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
98                         AddressState::Good => "Good",
99                         AddressState::WasGood => "Was Good",
100                         AddressState::EvilNode => "Evil Node",
101                 }
102         }
103
104         pub const fn get_count() -> u8 {
105                 15
106         }
107 }
108
109 #[derive(Hash, PartialEq, Eq)]
110 pub enum U64Setting {
111         RunTimeout,
112         WasGoodTimeout,
113         RescanInterval(AddressState),
114         MinProtocolVersion,
115 }
116
117 #[derive(Hash, PartialEq, Eq)]
118 pub enum RegexSetting {
119         SubverRegex,
120 }
121
122 struct Node {
123         last_update: Instant,
124         last_good: Instant, // Ignored unless state is Good or WasGood
125         last_services: u64,
126         state: AddressState,
127 }
128
129 /// Essentially SocketAddr but without a traffic class or scope
130 #[derive(Clone, PartialEq, Eq, Hash)]
131 enum SockAddr {
132         V4(SocketAddrV4),
133         V6((Ipv6Addr, u16)),
134 }
135 impl From<SocketAddr> for SockAddr {
136         fn from(addr: SocketAddr) -> SockAddr {
137                 match addr {
138                         SocketAddr::V4(sa) => SockAddr::V4(sa),
139                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().clone(), sa.port())),
140                 }
141         }
142 }
143 impl Into<SocketAddr> for &SockAddr {
144         fn into(self) -> SocketAddr {
145                 match self {
146                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
147                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(sa.0, sa.1, 0, 0))
148                 }
149         }
150 }
151 impl ToString for SockAddr {
152         fn to_string(&self) -> String {
153                 let sa: SocketAddr = self.into();
154                 sa.to_string()
155         }
156 }
157 impl SockAddr {
158         pub fn port(&self) -> u16 {
159                 match *self {
160                         SockAddr::V4(sa) => sa.port(),
161                         SockAddr::V6((_, port)) => port,
162                 }
163         }
164         pub fn ip(&self) -> IpAddr {
165                 match *self {
166                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
167                         SockAddr::V6((ip, _)) => IpAddr::V6(ip),
168                 }
169         }
170 }
171
172 struct Nodes {
173         good_node_services: [HashSet<SockAddr>; 64],
174         nodes_to_state: HashMap<SockAddr, Node>,
175         state_next_scan: Vec<Vec<(Instant, SockAddr)>>,
176 }
177 struct NodesMutRef<'a> {
178         good_node_services: &'a mut [HashSet<SockAddr>; 64],
179         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
180         state_next_scan: &'a mut Vec<Vec<(Instant, SockAddr)>>,
181 }
182
183 impl Nodes {
184         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
185                 NodesMutRef {
186                         good_node_services: &mut self.good_node_services,
187                         nodes_to_state: &mut self.nodes_to_state,
188                         state_next_scan: &mut self.state_next_scan,
189                 }
190         }
191 }
192
193 pub struct Store {
194         u64_settings: RwLock<HashMap<U64Setting, u64>>,
195         subver_regex: RwLock<Arc<Regex>>,
196         nodes: RwLock<Nodes>,
197         store: String,
198 }
199
200 impl Store {
201         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
202                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
203                         let mut l = BufReader::new(f).lines();
204                         macro_rules! try_read {
205                                 ($lines: expr, $ty: ty) => { {
206                                         match $lines.next() {
207                                                 Some(line) => match line {
208                                                         Ok(line) => match line.parse::<$ty>() {
209                                                                 Ok(res) => res,
210                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
211                                                         },
212                                                         Err(e) => return future::err(e),
213                                                 },
214                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
215                                         }
216                                 } }
217                         }
218                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
219                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
220                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
221                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
222                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
223                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
224                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
225                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
226                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
227                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
237                         future::ok((u64s, try_read!(l, Regex)))
238                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
239                         let mut u64s = HashMap::with_capacity(15);
240                         u64s.insert(U64Setting::RunTimeout, 120);
241                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
242                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
243                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
244                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
245                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
246                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
247                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
257                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
258                         future::ok((u64s, Regex::new(".*").unwrap()))
259                 });
260
261                 macro_rules! nodes_uninitd {
262                         () => { {
263                                 let mut state_vecs = Vec::with_capacity(AddressState::get_count() as usize);
264                                 for _ in 0..AddressState::get_count() {
265                                         state_vecs.push(Vec::new());
266                                 }
267                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
268                                 Nodes {
269                                         good_node_services,
270                                         nodes_to_state: HashMap::new(),
271                                         state_next_scan: state_vecs,
272                                 }
273                         } }
274                 }
275
276                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
277                         let start_time = Instant::now() - Duration::from_secs(60 * 60 * 24);
278                         let mut res = nodes_uninitd!();
279                         let l = BufReader::new(f).lines();
280                         for line_res in l {
281                                 let line = match line_res {
282                                         Ok(l) => l,
283                                         Err(_) => return future::ok(res),
284                                 };
285                                 let mut line_iter = line.split(',');
286                                 macro_rules! try_read {
287                                         ($lines: expr, $ty: ty) => { {
288                                                 match $lines.next() {
289                                                         Some(line) => match line.parse::<$ty>() {
290                                                                 Ok(res) => res,
291                                                                 Err(_) => return future::ok(res),
292                                                         },
293                                                         None => return future::ok(res),
294                                                 }
295                                         } }
296                                 }
297                                 let sockaddr = try_read!(line_iter, SocketAddr);
298                                 let state = try_read!(line_iter, u8);
299                                 let last_services = try_read!(line_iter, u64);
300                                 let node = Node {
301                                         state: match AddressState::from_num(state) {
302                                                 Some(v) => v,
303                                                 None => return future::ok(res),
304                                         },
305                                         last_services,
306                                         last_update: Instant::now(),
307                                         last_good: Instant::now(),
308                                 };
309                                 if node.state == AddressState::Good {
310                                         for i in 0..64 {
311                                                 if node.last_services & (1 << i) != 0 {
312                                                         res.good_node_services[i].insert(sockaddr.into());
313                                                 }
314                                         }
315                                 }
316                                 res.state_next_scan[node.state.to_num() as usize].push((start_time, sockaddr.into()));
317                                 res.nodes_to_state.insert(sockaddr.into(), node);
318                         }
319                         future::ok(res)
320                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
321                         future::ok(nodes_uninitd!())
322                 });
323                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
324                         future::ok(Store {
325                                 u64_settings: RwLock::new(u64_settings),
326                                 subver_regex: RwLock::new(Arc::new(regex)),
327                                 nodes: RwLock::new(nodes),
328                                 store,
329                         })
330                 })
331         }
332
333         pub fn get_u64(&self, setting: U64Setting) -> u64 {
334                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
335         }
336
337         pub fn set_u64(&self, setting: U64Setting, value: u64) {
338                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
339         }
340
341         pub fn get_node_count(&self, state: AddressState) -> usize {
342                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
343         }
344
345         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
346                 Arc::clone(&*self.subver_regex.read().unwrap())
347         }
348
349         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
350                 *self.subver_regex.write().unwrap() = Arc::new(value);
351         }
352
353         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
354                 let mut res = 0;
355                 let mut nodes = self.nodes.write().unwrap();
356                 let cur_time = Instant::now();
357                 for addr in addresses {
358                         match nodes.nodes_to_state.entry(addr.into()) {
359                                 hash_map::Entry::Vacant(e) => {
360                                         e.insert(Node {
361                                                 state: AddressState::Untested,
362                                                 last_services: 0,
363                                                 last_update: cur_time,
364                                                 last_good: cur_time,
365                                         });
366                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push((cur_time, addr.into()));
367                                         res += 1;
368                                 },
369                                 hash_map::Entry::Occupied(_) => {},
370                         }
371                 }
372                 res
373         }
374
375         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
376                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
377                         match addr.socket_addr() {
378                                 Ok(socketaddr) => Some(socketaddr),
379                                 Err(_) => None, // TODO: Handle onions
380                         }
381                 }));
382         }
383
384         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
385                 let addr: SockAddr = sockaddr.into();
386                 let now = Instant::now();
387
388                 let mut nodes_lock = self.nodes.write().unwrap();
389                 let nodes = nodes_lock.borrow_mut();
390
391                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
392                         state: AddressState::Untested,
393                         last_services: 0,
394                         last_update: now,
395                         last_good: now,
396                 });
397                 let ret = state_ref.state;
398                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
399                                 && state != AddressState::Good
400                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
401                         state_ref.state = AddressState::WasGood;
402                         for i in 0..64 {
403                                 if state_ref.last_services & (1 << i) != 0 {
404                                         nodes.good_node_services[i].remove(&addr);
405                                 }
406                         }
407                         state_ref.last_services = 0;
408                         nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push((now, addr));
409                 } else {
410                         state_ref.state = state;
411                         if state == AddressState::Good {
412                                 for i in 0..64 {
413                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
414                                                 nodes.good_node_services[i].insert(addr.clone());
415                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
416                                                 nodes.good_node_services[i].remove(&addr);
417                                         }
418                                 }
419                                 state_ref.last_services = services;
420                                 state_ref.last_good = now;
421                         }
422                         nodes.state_next_scan[state.to_num() as usize].push((now, addr));
423                 }
424                 state_ref.last_update = now;
425                 ret
426         }
427
428         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
429                 let settings_file = self.store.clone() + "/settings";
430                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
431                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
432                                 self.get_u64(U64Setting::RunTimeout),
433                                 self.get_u64(U64Setting::WasGoodTimeout),
434                                 self.get_u64(U64Setting::MinProtocolVersion),
435                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
436                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
437                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
438                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
439                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
440                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
441                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
442                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
443                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
444                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
445                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
446                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
447                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
448                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
449                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
450                                 self.get_regex(RegexSetting::SubverRegex).as_str());
451                         write_all(f, settings_string).and_then(|(mut f, _)| {
452                                 f.poll_sync_all()
453                         }).and_then(|_| {
454                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
455                         })
456                 });
457
458                 let nodes_file = self.store.clone() + "/nodes";
459                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
460                         let mut nodes_buff = String::new();
461                         {
462                                 let nodes = self.nodes.read().unwrap();
463                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 20);
464                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
465                                         nodes_buff += &sockaddr.to_string();
466                                         nodes_buff += ",";
467                                         nodes_buff += &node.state.to_num().to_string();
468                                         nodes_buff += ",";
469                                         nodes_buff += &node.last_services.to_string();
470                                         nodes_buff += "\n";
471                                 }
472                         }
473                         write_all(f, nodes_buff)
474                 }).and_then(|(mut f, _)| {
475                         f.poll_sync_all()
476                 }).and_then(|_| {
477                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
478                 });
479
480                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
481         }
482
483         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
484                 let dns_file = self.store.clone() + "/nodes.dump";
485                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
486                         let mut dns_buff = String::new();
487                         {
488                                 let mut rng = thread_rng();
489                                 for i in &[ 0b00000000001u64,
490                                             0b00000000100,
491                                             0b00000000101,
492                                             0b00000001000,
493                                             0b00000001001,
494                                             0b00000001100,
495                                             0b00000001101,
496                                             0b00001001001,
497                                             0b10000000000,
498                                             0b10000000001,
499                                             0b10000000100,
500                                             0b10000000101,
501                                             0b10000001000,
502                                             0b10000001001,
503                                             0b10000001100,
504                                             0b10000001101,
505                                             0b10001001000] {
506                                 //            ^ NODE_NETWORK_LIIMTED
507                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
508                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
509                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
510                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
511                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
512                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
513                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
514                                         macro_rules! add_addr { ($addr: expr) => {
515                                                 match $addr.ip() {
516                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
517                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
518                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
519                                                 }
520                                         } }
521                                         {
522                                                 let nodes = self.nodes.read().unwrap();
523                                                 if i.count_ones() == 1 {
524                                                         for j in 0..64 {
525                                                                 if i & (1 << j) != 0 {
526                                                                         let set_ref = &nodes.good_node_services[j];
527                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
528                                                                                 add_addr!(a);
529                                                                         }
530                                                                         break;
531                                                                 }
532                                                         }
533                                                 } else if i.count_ones() == 2 {
534                                                         let mut first_set = None;
535                                                         let mut second_set = None;
536                                                         for j in 0..64 {
537                                                                 if i & (1 << j) != 0 {
538                                                                         if first_set == None {
539                                                                                 first_set = Some(&nodes.good_node_services[j]);
540                                                                         } else {
541                                                                                 second_set = Some(&nodes.good_node_services[j]);
542                                                                                 break;
543                                                                         }
544                                                                 }
545                                                         }
546                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
547                                                                 add_addr!(a);
548                                                         }
549                                                 } else {
550                                                         //TODO: Could optimize this one a bit
551                                                         let mut intersection;
552                                                         let mut intersection_set_ref = None;
553                                                         for j in 0..64 {
554                                                                 if i & (1 << j) != 0 {
555                                                                         if intersection_set_ref == None {
556                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
557                                                                         } else {
558                                                                                 let new_intersection = intersection_set_ref.unwrap()
559                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
560                                                                                 intersection = Some(new_intersection);
561                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
562                                                                         }
563                                                                 }
564                                                         }
565                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
566                                                                 add_addr!(a);
567                                                         }
568                                                 }
569                                         }
570                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
571                                         asn_set.insert(0);
572                                         for a in v4_set.iter().filter(|a| asn_set.insert(bgp_client.get_asn(IpAddr::V4(**a)))).choose_multiple(&mut rng, 21) {
573                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{}\n", i, a);
574                                         }
575                                         asn_set.clear();
576                                         asn_set.insert(0);
577                                         for a in v6_set.iter().filter(|a| asn_set.insert(bgp_client.get_asn(IpAddr::V6(**a)))).choose_multiple(&mut rng, 10) {
578                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{}\n", i, a);
579                                         }
580                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
581                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{}\n", i, a);
582                                         }
583                                 }
584                         }
585                         write_all(f, dns_buff)
586                 }).and_then(|(mut f, _)| {
587                         f.poll_sync_all()
588                 }).and_then(|_| {
589                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
590                 }).then(|_| { future::ok(()) })
591         }
592
593         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
594                 let mut res = Vec::with_capacity(128);
595                 let cur_time = Instant::now();
596
597                 {
598                         let mut nodes = self.nodes.write().unwrap();
599                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
600                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
601                                 let cmp_time = cur_time - Duration::from_secs(rescan_interval);
602                                 let split_point = cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
603                                                 state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx) as u64);
604                                 let mut new_nodes = state_nodes.split_off(split_point as usize);
605                                 mem::swap(&mut new_nodes, state_nodes);
606                                 for (_, node) in new_nodes.drain(..) {
607                                         res.push((&node).into());
608                                 }
609                         }
610                 }
611                 res.shuffle(&mut thread_rng());
612                 res
613         }
614 }