Use last_services to store if we ever connected
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::convert::TryInto;
3 use std::collections::{HashSet, HashMap, hash_map};
4 use std::sync::{Arc, RwLock};
5 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
6 use std::time::Instant;
7 use std::io::{BufRead, BufReader};
8
9 use bitcoin::network::address::{Address, AddrV2Message};
10
11 use rand::thread_rng;
12 use rand::seq::{SliceRandom, IteratorRandom};
13
14 use tokio::prelude::*;
15 use tokio::fs::File;
16 use tokio::io::write_all;
17
18 use regex::Regex;
19
20 use crate::bgp_client::BGPClient;
21
22 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
23 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 500;
24
25 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
26 pub enum AddressState {
27         Untested,
28         LowBlockCount,
29         HighBlockCount,
30         LowVersion,
31         BadVersion,
32         NotFullNode,
33         ProtocolViolation,
34         Timeout,
35         TimeoutDuringRequest,
36         TimeoutAwaitingPong,
37         TimeoutAwaitingAddr,
38         TimeoutAwaitingBlock,
39         Good,
40         WasGood,
41         EvilNode,
42 }
43
44 impl AddressState {
45         pub fn from_num(num: u8) -> Option<AddressState> {
46                 match num {
47                         0x0 => Some(AddressState::Untested),
48                         0x1 => Some(AddressState::LowBlockCount),
49                         0x2 => Some(AddressState::HighBlockCount),
50                         0x3 => Some(AddressState::LowVersion),
51                         0x4 => Some(AddressState::BadVersion),
52                         0x5 => Some(AddressState::NotFullNode),
53                         0x6 => Some(AddressState::ProtocolViolation),
54                         0x7 => Some(AddressState::Timeout),
55                         0x8 => Some(AddressState::TimeoutDuringRequest),
56                         0x9 => Some(AddressState::TimeoutAwaitingPong),
57                         0xa => Some(AddressState::TimeoutAwaitingAddr),
58                         0xb => Some(AddressState::TimeoutAwaitingBlock),
59                         0xc => Some(AddressState::Good),
60                         0xd => Some(AddressState::WasGood),
61                         0xe => Some(AddressState::EvilNode),
62                         _   => None,
63                 }
64         }
65
66         pub fn to_num(&self) -> u8 {
67                 match *self {
68                         AddressState::Untested => 0,
69                         AddressState::LowBlockCount => 1,
70                         AddressState::HighBlockCount => 2,
71                         AddressState::LowVersion => 3,
72                         AddressState::BadVersion => 4,
73                         AddressState::NotFullNode => 5,
74                         AddressState::ProtocolViolation => 6,
75                         AddressState::Timeout => 7,
76                         AddressState::TimeoutDuringRequest => 8,
77                         AddressState::TimeoutAwaitingPong => 9,
78                         AddressState::TimeoutAwaitingAddr => 10,
79                         AddressState::TimeoutAwaitingBlock => 11,
80                         AddressState::Good => 12,
81                         AddressState::WasGood => 13,
82                         AddressState::EvilNode => 14,
83                 }
84         }
85
86         pub fn to_str(&self) -> &'static str {
87                 match *self {
88                         AddressState::Untested => "Untested",
89                         AddressState::LowBlockCount => "Low Block Count",
90                         AddressState::HighBlockCount => "High Block Count",
91                         AddressState::LowVersion => "Low Version",
92                         AddressState::BadVersion => "Bad Version",
93                         AddressState::NotFullNode => "Not Full Node",
94                         AddressState::ProtocolViolation => "Protocol Violation",
95                         AddressState::Timeout => "Timeout",
96                         AddressState::TimeoutDuringRequest => "Timeout During Request",
97                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
98                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
99                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
100                         AddressState::Good => "Good",
101                         AddressState::WasGood => "Was Good",
102                         AddressState::EvilNode => "Evil Node",
103                 }
104         }
105
106         pub const fn get_count() -> u8 {
107                 15
108         }
109 }
110
111 #[derive(Hash, PartialEq, Eq)]
112 pub enum U64Setting {
113         RunTimeout,
114         WasGoodTimeout,
115         RescanInterval(AddressState),
116         MinProtocolVersion,
117 }
118
119 #[derive(Hash, PartialEq, Eq)]
120 pub enum RegexSetting {
121         SubverRegex,
122 }
123
124 struct Node {
125         // Times in seconds-since-startup
126         last_good: u32, // Ignored unless state is Good or WasGood
127         // Since everything is is 4-byte aligned, using a u64 for services blows up our size
128         // substantially. Instead, use a u32 pair and bit shift as needed.
129         last_services: (u32, u32),
130         state: AddressState,
131         queued: bool,
132 }
133 impl Node {
134         #[inline]
135         fn last_services(&self) -> u64 {
136                 ((self.last_services.0 as u64) << 32) |
137                 ((self.last_services.1 as u64)      )
138         }
139         #[inline]
140         fn services(inp: u64) -> (u32, u32) {
141                 (
142                         ((inp & 0xffffffff00000000) >> 32) as u32,
143                         ((inp & 0x00000000ffffffff)      ) as u32
144                 )
145         }
146 }
147
148 #[test]
149 fn services_test() {
150         assert_eq!(
151                 Node { last_good: 0, state: AddressState::Good, queued: false, last_services: Node::services(0x1badcafedeadbeef) }
152                         .last_services(),
153                 0x1badcafedeadbeef);
154 }
155
156 /// Essentially SocketAddr but without a traffic class or scope
157 #[derive(Clone, PartialEq, Eq, Hash)]
158 enum SockAddr {
159         V4(SocketAddrV4),
160         V6(([u16; 8], u16)),
161 }
162 #[inline]
163 fn segs_to_ip6(segs: &[u16; 8]) -> Ipv6Addr {
164         Ipv6Addr::new(segs[0], segs[1], segs[2], segs[3], segs[4], segs[5], segs[6], segs[7])
165 }
166 impl From<SocketAddr> for SockAddr {
167         fn from(addr: SocketAddr) -> SockAddr {
168                 match addr {
169                         SocketAddr::V4(sa) => SockAddr::V4(sa),
170                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().segments(), sa.port())),
171                 }
172         }
173 }
174 impl Into<SocketAddr> for &SockAddr {
175         fn into(self) -> SocketAddr {
176                 match self {
177                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
178                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(segs_to_ip6(&sa.0), sa.1, 0, 0))
179                 }
180         }
181 }
182 impl ToString for SockAddr {
183         fn to_string(&self) -> String {
184                 let sa: SocketAddr = self.into();
185                 sa.to_string()
186         }
187 }
188 impl SockAddr {
189         pub fn port(&self) -> u16 {
190                 match *self {
191                         SockAddr::V4(sa) => sa.port(),
192                         SockAddr::V6((_, port)) => port,
193                 }
194         }
195         pub fn ip(&self) -> IpAddr {
196                 match *self {
197                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
198                         SockAddr::V6((ip, _)) => IpAddr::V6(segs_to_ip6(&ip)),
199                 }
200         }
201 }
202
203 struct Nodes {
204         good_node_services: [HashSet<SockAddr>; 64],
205         nodes_to_state: HashMap<SockAddr, Node>,
206         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
207 }
208 struct NodesMutRef<'a> {
209         good_node_services: &'a mut [HashSet<SockAddr>; 64],
210         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
211         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
212 }
213
214 impl Nodes {
215         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
216                 NodesMutRef {
217                         good_node_services: &mut self.good_node_services,
218                         nodes_to_state: &mut self.nodes_to_state,
219                         state_next_scan: &mut self.state_next_scan,
220                 }
221         }
222 }
223
224 pub struct Store {
225         u64_settings: RwLock<HashMap<U64Setting, u64>>,
226         subver_regex: RwLock<Arc<Regex>>,
227         nodes: RwLock<Nodes>,
228         start_time: Instant,
229         store: String,
230 }
231
232 impl Store {
233         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
234                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
235                         let mut l = BufReader::new(f).lines();
236                         macro_rules! try_read {
237                                 ($lines: expr, $ty: ty) => { {
238                                         match $lines.next() {
239                                                 Some(line) => match line {
240                                                         Ok(line) => match line.parse::<$ty>() {
241                                                                 Ok(res) => res,
242                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
243                                                         },
244                                                         Err(e) => return future::err(e),
245                                                 },
246                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
247                                         }
248                                 } }
249                         }
250                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
251                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
252                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
253                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
254                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
255                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
256                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
257                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
258                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
259                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
260                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
261                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
262                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
263                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
264                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
265                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
266                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
267                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
268                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
269                         future::ok((u64s, try_read!(l, Regex)))
270                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
271                         let mut u64s = HashMap::with_capacity(15);
272                         u64s.insert(U64Setting::RunTimeout, 120);
273                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
274                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
275                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
276                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
277                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
278                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
279                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
280                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
281                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
282                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
283                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
284                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
285                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
286                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
287                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
288                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
289                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
290                         future::ok((u64s, Regex::new(".*").unwrap()))
291                 });
292
293                 macro_rules! nodes_uninitd {
294                         () => { {
295                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
296                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
297                                 Nodes {
298                                         good_node_services,
299                                         nodes_to_state: HashMap::new(),
300                                         state_next_scan: state_vecs,
301                                 }
302                         } }
303                 }
304
305                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
306                         let mut res = nodes_uninitd!();
307                         let l = BufReader::new(f).lines();
308                         for line_res in l {
309                                 let line = match line_res {
310                                         Ok(l) => l,
311                                         Err(_) => return future::ok(res),
312                                 };
313                                 let mut line_iter = line.split(',');
314                                 macro_rules! try_read {
315                                         ($lines: expr, $ty: ty) => { {
316                                                 match $lines.next() {
317                                                         Some(line) => match line.parse::<$ty>() {
318                                                                 Ok(res) => res,
319                                                                 Err(_) => return future::ok(res),
320                                                         },
321                                                         None => return future::ok(res),
322                                                 }
323                                         } }
324                                 }
325                                 let sockaddr = try_read!(line_iter, SocketAddr);
326                                 let state = try_read!(line_iter, u8);
327                                 let last_services = try_read!(line_iter, u64);
328                                 let node = Node {
329                                         state: match AddressState::from_num(state) {
330                                                 Some(v) => v,
331                                                 None => return future::ok(res),
332                                         },
333                                         last_services: Node::services(last_services),
334                                         last_good: 0,
335                                         queued: true,
336                                 };
337                                 if node.state == AddressState::Good {
338                                         for i in 0..64 {
339                                                 if node.last_services() & (1 << i) != 0 {
340                                                         res.good_node_services[i].insert(sockaddr.into());
341                                                 }
342                                         }
343                                 }
344                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
345                                 res.nodes_to_state.insert(sockaddr.into(), node);
346                         }
347                         future::ok(res)
348                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
349                         future::ok(nodes_uninitd!())
350                 });
351                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
352                         future::ok(Store {
353                                 u64_settings: RwLock::new(u64_settings),
354                                 subver_regex: RwLock::new(Arc::new(regex)),
355                                 nodes: RwLock::new(nodes),
356                                 store,
357                                 start_time: Instant::now(),
358                         })
359                 })
360         }
361
362         pub fn get_u64(&self, setting: U64Setting) -> u64 {
363                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
364         }
365
366         pub fn set_u64(&self, setting: U64Setting, value: u64) {
367                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
368         }
369
370         pub fn get_node_count(&self, state: AddressState) -> usize {
371                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
372         }
373
374         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
375                 Arc::clone(&*self.subver_regex.read().unwrap())
376         }
377
378         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
379                 *self.subver_regex.write().unwrap() = Arc::new(value);
380         }
381
382         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
383                 let mut res = 0;
384                 let cur_time = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
385                 let mut nodes = self.nodes.write().unwrap();
386                 for addr in addresses {
387                         match nodes.nodes_to_state.entry(addr.into()) {
388                                 hash_map::Entry::Vacant(e) => {
389                                         e.insert(Node {
390                                                 state: AddressState::Untested,
391                                                 last_services: (0, 0),
392                                                 last_good: cur_time,
393                                                 queued: true,
394                                         });
395                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
396                                         res += 1;
397                                 },
398                                 hash_map::Entry::Occupied(_) => {},
399                         }
400                 }
401                 res
402         }
403
404         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
405                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
406                         match addr.socket_addr() {
407                                 Ok(socketaddr) => Some(socketaddr),
408                                 Err(_) => None, // TODO: Handle onions
409                         }
410                 }));
411         }
412         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
413                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
414                         match addr.socket_addr() {
415                                 Ok(socketaddr) => Some(socketaddr),
416                                 Err(_) => None, // TODO: Handle onions
417                         }
418                 }));
419         }
420
421         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
422                 let addr: SockAddr = sockaddr.into();
423
424                 let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
425
426                 let mut nodes_lock = self.nodes.write().unwrap();
427                 let nodes = nodes_lock.borrow_mut();
428
429                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
430                         state: AddressState::Untested,
431                         last_services: (0, 0),
432                         last_good: now,
433                         queued: false,
434                 });
435                 let ret = state_ref.state;
436                 let was_good_timeout: u32 = self.get_u64(U64Setting::WasGoodTimeout)
437                         .try_into().expect("Need WasGood timeout that fits in a u32");
438                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
439                                 && state != AddressState::Good
440                                 && state_ref.last_good >= now - was_good_timeout {
441                         state_ref.state = AddressState::WasGood;
442                         for i in 0..64 {
443                                 if state_ref.last_services() & (1 << i) != 0 {
444                                         nodes.good_node_services[i].remove(&addr);
445                                 }
446                         }
447                         if !state_ref.queued {
448                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
449                                 state_ref.queued = true;
450                         }
451                 } else {
452                         state_ref.state = state;
453                         if state == AddressState::Good {
454                                 for i in 0..64 {
455                                         if services & (1 << i) != 0 && state_ref.last_services() & (1 << i) == 0 {
456                                                 nodes.good_node_services[i].insert(addr.clone());
457                                         } else if services & (1 << i) == 0 && state_ref.last_services() & (1 << i) != 0 {
458                                                 nodes.good_node_services[i].remove(&addr);
459                                         }
460                                 }
461                                 state_ref.last_services = Node::services(services);
462                                 state_ref.last_good = now;
463                         }
464                         if !state_ref.queued {
465                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
466                                 state_ref.queued = true;
467                         }
468                 }
469                 ret
470         }
471
472         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
473                 let settings_file = self.store.clone() + "/settings";
474                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
475                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
476                                 self.get_u64(U64Setting::RunTimeout),
477                                 self.get_u64(U64Setting::WasGoodTimeout),
478                                 self.get_u64(U64Setting::MinProtocolVersion),
479                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
480                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
481                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
482                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
483                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
484                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
485                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
486                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
487                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
488                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
489                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
490                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
491                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
492                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
493                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
494                                 self.get_regex(RegexSetting::SubverRegex).as_str());
495                         write_all(f, settings_string).and_then(|(mut f, _)| {
496                                 f.poll_sync_all()
497                         }).and_then(|_| {
498                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
499                         })
500                 });
501
502                 let nodes_file = self.store.clone() + "/nodes";
503                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
504                         let mut nodes_buff = String::new();
505                         {
506                                 let nodes = self.nodes.read().unwrap();
507                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
508                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
509                                         nodes_buff += &sockaddr.to_string();
510                                         nodes_buff += ",";
511                                         nodes_buff += &node.state.to_num().to_string();
512                                         nodes_buff += ",";
513                                         nodes_buff += &node.last_services().to_string();
514                                         nodes_buff += "\n";
515                                 }
516                         }
517                         write_all(f, nodes_buff)
518                 }).and_then(|(mut f, _)| {
519                         f.poll_sync_all()
520                 }).and_then(|_| {
521                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
522                 });
523
524                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
525         }
526
527         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
528                 let dns_file = self.store.clone() + "/nodes.dump";
529                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
530                         let mut dns_buff = String::new();
531                         {
532                                 let mut rng = thread_rng();
533                                 for i in &[ 0b00000000001u64,
534                                             0b00000000100,
535                                             0b00000000101,
536                                             0b00000001000,
537                                             0b00000001001,
538                                             0b00000001100,
539                                             0b00000001101,
540                                             0b00001001001,
541                                             0b10000000000,
542                                             0b10000000001,
543                                             0b10000000100,
544                                             0b10000000101,
545                                             0b10000001000,
546                                             0b10000001001,
547                                             0b10000001100,
548                                             0b10000001101,
549                                             0b10001001000] {
550                                 //            ^ NODE_NETWORK_LIIMTED
551                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
552                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
553                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
554                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
555                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
556                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
557                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
558                                         macro_rules! add_addr { ($addr: expr) => {
559                                                 match $addr.ip() {
560                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
561                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
562                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
563                                                 }
564                                         } }
565                                         {
566                                                 let nodes = self.nodes.read().unwrap();
567                                                 if i.count_ones() == 1 {
568                                                         for j in 0..64 {
569                                                                 if i & (1 << j) != 0 {
570                                                                         let set_ref = &nodes.good_node_services[j];
571                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
572                                                                                 add_addr!(a);
573                                                                         }
574                                                                         break;
575                                                                 }
576                                                         }
577                                                 } else if i.count_ones() == 2 {
578                                                         let mut first_set = None;
579                                                         let mut second_set = None;
580                                                         for j in 0..64 {
581                                                                 if i & (1 << j) != 0 {
582                                                                         if first_set == None {
583                                                                                 first_set = Some(&nodes.good_node_services[j]);
584                                                                         } else {
585                                                                                 second_set = Some(&nodes.good_node_services[j]);
586                                                                                 break;
587                                                                         }
588                                                                 }
589                                                         }
590                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
591                                                                 add_addr!(a);
592                                                         }
593                                                 } else {
594                                                         //TODO: Could optimize this one a bit
595                                                         let mut intersection;
596                                                         let mut intersection_set_ref = None;
597                                                         for j in 0..64 {
598                                                                 if i & (1 << j) != 0 {
599                                                                         if intersection_set_ref == None {
600                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
601                                                                         } else {
602                                                                                 let new_intersection = intersection_set_ref.unwrap()
603                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
604                                                                                 intersection = Some(new_intersection);
605                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
606                                                                         }
607                                                                 }
608                                                         }
609                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
610                                                                 add_addr!(a);
611                                                         }
612                                                 }
613                                         }
614                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
615                                         asn_set.insert(0);
616                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
617                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
618                                         }
619                                         asn_set.clear();
620                                         asn_set.insert(0);
621                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
622                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
623                                         }
624                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
625                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
626                                         }
627                                 }
628                         }
629                         write_all(f, dns_buff)
630                 }).and_then(|(mut f, _)| {
631                         f.poll_sync_all()
632                 }).and_then(|_| {
633                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
634                 }).then(|_| { future::ok(()) })
635         }
636
637         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
638                 let mut res = Vec::with_capacity(128);
639
640                 {
641                         let mut nodes_lock = self.nodes.write().unwrap();
642                         let nodes = nodes_lock.borrow_mut();
643                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
644                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
645                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
646                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
647                                                 state_nodes.len() as u64);
648                                 for node in state_nodes.drain(..split_point as usize) {
649                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
650                                         res.push((&node).into());
651                                 }
652                         }
653                 }
654                 res.shuffle(&mut thread_rng());
655                 res
656         }
657 }