1f2221029d6d2d76f79626f574c6fc64b066f86b
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::{Arc, RwLock};
4 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
5 use std::time::{Duration, Instant};
6 use std::io::{BufRead, BufReader};
7
8 use bitcoin::network::address::{Address, AddrV2Message};
9
10 use rand::thread_rng;
11 use rand::seq::{SliceRandom, IteratorRandom};
12
13 use tokio::prelude::*;
14 use tokio::fs::File;
15 use tokio::io::write_all;
16
17 use regex::Regex;
18
19 use crate::bgp_client::BGPClient;
20
21 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
22 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
23
24 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
25 pub enum AddressState {
26         Untested,
27         LowBlockCount,
28         HighBlockCount,
29         LowVersion,
30         BadVersion,
31         NotFullNode,
32         ProtocolViolation,
33         Timeout,
34         TimeoutDuringRequest,
35         TimeoutAwaitingPong,
36         TimeoutAwaitingAddr,
37         TimeoutAwaitingBlock,
38         Good,
39         WasGood,
40         EvilNode,
41 }
42
43 impl AddressState {
44         pub fn from_num(num: u8) -> Option<AddressState> {
45                 match num {
46                         0x0 => Some(AddressState::Untested),
47                         0x1 => Some(AddressState::LowBlockCount),
48                         0x2 => Some(AddressState::HighBlockCount),
49                         0x3 => Some(AddressState::LowVersion),
50                         0x4 => Some(AddressState::BadVersion),
51                         0x5 => Some(AddressState::NotFullNode),
52                         0x6 => Some(AddressState::ProtocolViolation),
53                         0x7 => Some(AddressState::Timeout),
54                         0x8 => Some(AddressState::TimeoutDuringRequest),
55                         0x9 => Some(AddressState::TimeoutAwaitingPong),
56                         0xa => Some(AddressState::TimeoutAwaitingAddr),
57                         0xb => Some(AddressState::TimeoutAwaitingBlock),
58                         0xc => Some(AddressState::Good),
59                         0xd => Some(AddressState::WasGood),
60                         0xe => Some(AddressState::EvilNode),
61                         _   => None,
62                 }
63         }
64
65         pub fn to_num(&self) -> u8 {
66                 match *self {
67                         AddressState::Untested => 0,
68                         AddressState::LowBlockCount => 1,
69                         AddressState::HighBlockCount => 2,
70                         AddressState::LowVersion => 3,
71                         AddressState::BadVersion => 4,
72                         AddressState::NotFullNode => 5,
73                         AddressState::ProtocolViolation => 6,
74                         AddressState::Timeout => 7,
75                         AddressState::TimeoutDuringRequest => 8,
76                         AddressState::TimeoutAwaitingPong => 9,
77                         AddressState::TimeoutAwaitingAddr => 10,
78                         AddressState::TimeoutAwaitingBlock => 11,
79                         AddressState::Good => 12,
80                         AddressState::WasGood => 13,
81                         AddressState::EvilNode => 14,
82                 }
83         }
84
85         pub fn to_str(&self) -> &'static str {
86                 match *self {
87                         AddressState::Untested => "Untested",
88                         AddressState::LowBlockCount => "Low Block Count",
89                         AddressState::HighBlockCount => "High Block Count",
90                         AddressState::LowVersion => "Low Version",
91                         AddressState::BadVersion => "Bad Version",
92                         AddressState::NotFullNode => "Not Full Node",
93                         AddressState::ProtocolViolation => "Protocol Violation",
94                         AddressState::Timeout => "Timeout",
95                         AddressState::TimeoutDuringRequest => "Timeout During Request",
96                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
97                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
98                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
99                         AddressState::Good => "Good",
100                         AddressState::WasGood => "Was Good",
101                         AddressState::EvilNode => "Evil Node",
102                 }
103         }
104
105         pub const fn get_count() -> u8 {
106                 15
107         }
108 }
109
110 #[derive(Hash, PartialEq, Eq)]
111 pub enum U64Setting {
112         RunTimeout,
113         WasGoodTimeout,
114         RescanInterval(AddressState),
115         MinProtocolVersion,
116 }
117
118 #[derive(Hash, PartialEq, Eq)]
119 pub enum RegexSetting {
120         SubverRegex,
121 }
122
123 struct Node {
124         last_update: Instant,
125         last_good: Instant, // Ignored unless state is Good or WasGood
126         last_services: u64,
127         state: AddressState,
128         queued: bool,
129 }
130
131 /// Essentially SocketAddr but without a traffic class or scope
132 #[derive(Clone, PartialEq, Eq, Hash)]
133 enum SockAddr {
134         V4(SocketAddrV4),
135         V6(([u16; 8], u16)),
136 }
137 #[inline]
138 fn segs_to_ip6(segs: &[u16; 8]) -> Ipv6Addr {
139         Ipv6Addr::new(segs[0], segs[1], segs[2], segs[3], segs[4], segs[5], segs[6], segs[7])
140 }
141 impl From<SocketAddr> for SockAddr {
142         fn from(addr: SocketAddr) -> SockAddr {
143                 match addr {
144                         SocketAddr::V4(sa) => SockAddr::V4(sa),
145                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().segments(), sa.port())),
146                 }
147         }
148 }
149 impl Into<SocketAddr> for &SockAddr {
150         fn into(self) -> SocketAddr {
151                 match self {
152                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
153                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(segs_to_ip6(&sa.0), sa.1, 0, 0))
154                 }
155         }
156 }
157 impl ToString for SockAddr {
158         fn to_string(&self) -> String {
159                 let sa: SocketAddr = self.into();
160                 sa.to_string()
161         }
162 }
163 impl SockAddr {
164         pub fn port(&self) -> u16 {
165                 match *self {
166                         SockAddr::V4(sa) => sa.port(),
167                         SockAddr::V6((_, port)) => port,
168                 }
169         }
170         pub fn ip(&self) -> IpAddr {
171                 match *self {
172                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
173                         SockAddr::V6((ip, _)) => IpAddr::V6(segs_to_ip6(&ip)),
174                 }
175         }
176 }
177
178 struct Nodes {
179         good_node_services: [HashSet<SockAddr>; 64],
180         nodes_to_state: HashMap<SockAddr, Node>,
181         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
182 }
183 struct NodesMutRef<'a> {
184         good_node_services: &'a mut [HashSet<SockAddr>; 64],
185         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
186         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
187 }
188
189 impl Nodes {
190         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
191                 NodesMutRef {
192                         good_node_services: &mut self.good_node_services,
193                         nodes_to_state: &mut self.nodes_to_state,
194                         state_next_scan: &mut self.state_next_scan,
195                 }
196         }
197 }
198
199 pub struct Store {
200         u64_settings: RwLock<HashMap<U64Setting, u64>>,
201         subver_regex: RwLock<Arc<Regex>>,
202         nodes: RwLock<Nodes>,
203         store: String,
204 }
205
206 impl Store {
207         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
208                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
209                         let mut l = BufReader::new(f).lines();
210                         macro_rules! try_read {
211                                 ($lines: expr, $ty: ty) => { {
212                                         match $lines.next() {
213                                                 Some(line) => match line {
214                                                         Ok(line) => match line.parse::<$ty>() {
215                                                                 Ok(res) => res,
216                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
217                                                         },
218                                                         Err(e) => return future::err(e),
219                                                 },
220                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
221                                         }
222                                 } }
223                         }
224                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
225                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
226                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
227                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
228                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
229                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
230                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
231                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
232                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
233                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
234                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
235                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
236                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
237                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
238                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
239                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
240                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
241                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
242                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
243                         future::ok((u64s, try_read!(l, Regex)))
244                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
245                         let mut u64s = HashMap::with_capacity(15);
246                         u64s.insert(U64Setting::RunTimeout, 120);
247                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
248                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
249                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
250                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
251                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
252                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
253                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
254                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
255                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
256                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
257                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
258                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
259                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
260                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
261                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
262                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
263                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
264                         future::ok((u64s, Regex::new(".*").unwrap()))
265                 });
266
267                 macro_rules! nodes_uninitd {
268                         () => { {
269                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
270                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
271                                 Nodes {
272                                         good_node_services,
273                                         nodes_to_state: HashMap::new(),
274                                         state_next_scan: state_vecs,
275                                 }
276                         } }
277                 }
278
279                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
280                         let mut res = nodes_uninitd!();
281                         let l = BufReader::new(f).lines();
282                         for line_res in l {
283                                 let line = match line_res {
284                                         Ok(l) => l,
285                                         Err(_) => return future::ok(res),
286                                 };
287                                 let mut line_iter = line.split(',');
288                                 macro_rules! try_read {
289                                         ($lines: expr, $ty: ty) => { {
290                                                 match $lines.next() {
291                                                         Some(line) => match line.parse::<$ty>() {
292                                                                 Ok(res) => res,
293                                                                 Err(_) => return future::ok(res),
294                                                         },
295                                                         None => return future::ok(res),
296                                                 }
297                                         } }
298                                 }
299                                 let sockaddr = try_read!(line_iter, SocketAddr);
300                                 let state = try_read!(line_iter, u8);
301                                 let last_services = try_read!(line_iter, u64);
302                                 let node = Node {
303                                         state: match AddressState::from_num(state) {
304                                                 Some(v) => v,
305                                                 None => return future::ok(res),
306                                         },
307                                         last_services,
308                                         last_update: Instant::now(),
309                                         last_good: Instant::now(),
310                                         queued: true,
311                                 };
312                                 if node.state == AddressState::Good {
313                                         for i in 0..64 {
314                                                 if node.last_services & (1 << i) != 0 {
315                                                         res.good_node_services[i].insert(sockaddr.into());
316                                                 }
317                                         }
318                                 }
319                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
320                                 res.nodes_to_state.insert(sockaddr.into(), node);
321                         }
322                         future::ok(res)
323                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
324                         future::ok(nodes_uninitd!())
325                 });
326                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
327                         future::ok(Store {
328                                 u64_settings: RwLock::new(u64_settings),
329                                 subver_regex: RwLock::new(Arc::new(regex)),
330                                 nodes: RwLock::new(nodes),
331                                 store,
332                         })
333                 })
334         }
335
336         pub fn get_u64(&self, setting: U64Setting) -> u64 {
337                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
338         }
339
340         pub fn set_u64(&self, setting: U64Setting, value: u64) {
341                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
342         }
343
344         pub fn get_node_count(&self, state: AddressState) -> usize {
345                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
346         }
347
348         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
349                 Arc::clone(&*self.subver_regex.read().unwrap())
350         }
351
352         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
353                 *self.subver_regex.write().unwrap() = Arc::new(value);
354         }
355
356         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
357                 let mut res = 0;
358                 let mut nodes = self.nodes.write().unwrap();
359                 let cur_time = Instant::now();
360                 for addr in addresses {
361                         match nodes.nodes_to_state.entry(addr.into()) {
362                                 hash_map::Entry::Vacant(e) => {
363                                         e.insert(Node {
364                                                 state: AddressState::Untested,
365                                                 last_services: 0,
366                                                 last_update: cur_time,
367                                                 last_good: cur_time,
368                                                 queued: true,
369                                         });
370                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
371                                         res += 1;
372                                 },
373                                 hash_map::Entry::Occupied(_) => {},
374                         }
375                 }
376                 res
377         }
378
379         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
380                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
381                         match addr.socket_addr() {
382                                 Ok(socketaddr) => Some(socketaddr),
383                                 Err(_) => None, // TODO: Handle onions
384                         }
385                 }));
386         }
387         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
388                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
389                         match addr.socket_addr() {
390                                 Ok(socketaddr) => Some(socketaddr),
391                                 Err(_) => None, // TODO: Handle onions
392                         }
393                 }));
394         }
395
396         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
397                 let addr: SockAddr = sockaddr.into();
398                 let now = Instant::now();
399
400                 let mut nodes_lock = self.nodes.write().unwrap();
401                 let nodes = nodes_lock.borrow_mut();
402
403                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
404                         state: AddressState::Untested,
405                         last_services: 0,
406                         last_update: now,
407                         last_good: now,
408                         queued: false,
409                 });
410                 let ret = state_ref.state;
411                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
412                                 && state != AddressState::Good
413                                 && state_ref.last_good >= now - Duration::from_secs(self.get_u64(U64Setting::WasGoodTimeout)) {
414                         state_ref.state = AddressState::WasGood;
415                         for i in 0..64 {
416                                 if state_ref.last_services & (1 << i) != 0 {
417                                         nodes.good_node_services[i].remove(&addr);
418                                 }
419                         }
420                         state_ref.last_services = 0;
421                         if !state_ref.queued {
422                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
423                                 state_ref.queued = true;
424                         }
425                 } else {
426                         state_ref.state = state;
427                         if state == AddressState::Good {
428                                 for i in 0..64 {
429                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
430                                                 nodes.good_node_services[i].insert(addr.clone());
431                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
432                                                 nodes.good_node_services[i].remove(&addr);
433                                         }
434                                 }
435                                 state_ref.last_services = services;
436                                 state_ref.last_good = now;
437                         }
438                         if !state_ref.queued {
439                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
440                                 state_ref.queued = true;
441                         }
442                 }
443                 state_ref.last_update = now;
444                 ret
445         }
446
447         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
448                 let settings_file = self.store.clone() + "/settings";
449                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
450                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
451                                 self.get_u64(U64Setting::RunTimeout),
452                                 self.get_u64(U64Setting::WasGoodTimeout),
453                                 self.get_u64(U64Setting::MinProtocolVersion),
454                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
455                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
456                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
457                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
458                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
459                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
460                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
461                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
462                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
463                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
464                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
465                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
466                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
467                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
468                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
469                                 self.get_regex(RegexSetting::SubverRegex).as_str());
470                         write_all(f, settings_string).and_then(|(mut f, _)| {
471                                 f.poll_sync_all()
472                         }).and_then(|_| {
473                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
474                         })
475                 });
476
477                 let nodes_file = self.store.clone() + "/nodes";
478                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
479                         let mut nodes_buff = String::new();
480                         {
481                                 let nodes = self.nodes.read().unwrap();
482                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
483                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
484                                         nodes_buff += &sockaddr.to_string();
485                                         nodes_buff += ",";
486                                         nodes_buff += &node.state.to_num().to_string();
487                                         nodes_buff += ",";
488                                         nodes_buff += &node.last_services.to_string();
489                                         nodes_buff += "\n";
490                                 }
491                         }
492                         write_all(f, nodes_buff)
493                 }).and_then(|(mut f, _)| {
494                         f.poll_sync_all()
495                 }).and_then(|_| {
496                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
497                 });
498
499                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
500         }
501
502         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
503                 let dns_file = self.store.clone() + "/nodes.dump";
504                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
505                         let mut dns_buff = String::new();
506                         {
507                                 let mut rng = thread_rng();
508                                 for i in &[ 0b00000000001u64,
509                                             0b00000000100,
510                                             0b00000000101,
511                                             0b00000001000,
512                                             0b00000001001,
513                                             0b00000001100,
514                                             0b00000001101,
515                                             0b00001001001,
516                                             0b10000000000,
517                                             0b10000000001,
518                                             0b10000000100,
519                                             0b10000000101,
520                                             0b10000001000,
521                                             0b10000001001,
522                                             0b10000001100,
523                                             0b10000001101,
524                                             0b10001001000] {
525                                 //            ^ NODE_NETWORK_LIIMTED
526                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
527                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
528                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
529                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
530                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
531                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
532                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
533                                         macro_rules! add_addr { ($addr: expr) => {
534                                                 match $addr.ip() {
535                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
536                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
537                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
538                                                 }
539                                         } }
540                                         {
541                                                 let nodes = self.nodes.read().unwrap();
542                                                 if i.count_ones() == 1 {
543                                                         for j in 0..64 {
544                                                                 if i & (1 << j) != 0 {
545                                                                         let set_ref = &nodes.good_node_services[j];
546                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
547                                                                                 add_addr!(a);
548                                                                         }
549                                                                         break;
550                                                                 }
551                                                         }
552                                                 } else if i.count_ones() == 2 {
553                                                         let mut first_set = None;
554                                                         let mut second_set = None;
555                                                         for j in 0..64 {
556                                                                 if i & (1 << j) != 0 {
557                                                                         if first_set == None {
558                                                                                 first_set = Some(&nodes.good_node_services[j]);
559                                                                         } else {
560                                                                                 second_set = Some(&nodes.good_node_services[j]);
561                                                                                 break;
562                                                                         }
563                                                                 }
564                                                         }
565                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
566                                                                 add_addr!(a);
567                                                         }
568                                                 } else {
569                                                         //TODO: Could optimize this one a bit
570                                                         let mut intersection;
571                                                         let mut intersection_set_ref = None;
572                                                         for j in 0..64 {
573                                                                 if i & (1 << j) != 0 {
574                                                                         if intersection_set_ref == None {
575                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
576                                                                         } else {
577                                                                                 let new_intersection = intersection_set_ref.unwrap()
578                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
579                                                                                 intersection = Some(new_intersection);
580                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
581                                                                         }
582                                                                 }
583                                                         }
584                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
585                                                                 add_addr!(a);
586                                                         }
587                                                 }
588                                         }
589                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
590                                         asn_set.insert(0);
591                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
592                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
593                                         }
594                                         asn_set.clear();
595                                         asn_set.insert(0);
596                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
597                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
598                                         }
599                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
600                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
601                                         }
602                                 }
603                         }
604                         write_all(f, dns_buff)
605                 }).and_then(|(mut f, _)| {
606                         f.poll_sync_all()
607                 }).and_then(|_| {
608                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
609                 }).then(|_| { future::ok(()) })
610         }
611
612         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
613                 let mut res = Vec::with_capacity(128);
614
615                 {
616                         let mut nodes_lock = self.nodes.write().unwrap();
617                         let nodes = nodes_lock.borrow_mut();
618                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
619                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
620                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
621                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
622                                                 state_nodes.len() as u64);
623                                 for node in state_nodes.drain(..split_point as usize) {
624                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
625                                         res.push((&node).into());
626                                 }
627                         }
628                 }
629                 res.shuffle(&mut thread_rng());
630                 res
631         }
632 }