Decrease the size of Node from 48 to 16 (and a map entry from 72 to 36)
[dnsseed-rust] / src / datastore.rs
1 use std::cmp;
2 use std::convert::TryInto;
3 use std::collections::{HashSet, HashMap, hash_map};
4 use std::sync::{Arc, RwLock};
5 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
6 use std::time::Instant;
7 use std::io::{BufRead, BufReader};
8
9 use bitcoin::network::address::{Address, AddrV2Message};
10
11 use rand::thread_rng;
12 use rand::seq::{SliceRandom, IteratorRandom};
13
14 use tokio::prelude::*;
15 use tokio::fs::File;
16 use tokio::io::write_all;
17
18 use regex::Regex;
19
20 use crate::bgp_client::BGPClient;
21
22 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
23 const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
24
25 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
26 pub enum AddressState {
27         Untested,
28         LowBlockCount,
29         HighBlockCount,
30         LowVersion,
31         BadVersion,
32         NotFullNode,
33         ProtocolViolation,
34         Timeout,
35         TimeoutDuringRequest,
36         TimeoutAwaitingPong,
37         TimeoutAwaitingAddr,
38         TimeoutAwaitingBlock,
39         Good,
40         WasGood,
41         EvilNode,
42 }
43
44 impl AddressState {
45         pub fn from_num(num: u8) -> Option<AddressState> {
46                 match num {
47                         0x0 => Some(AddressState::Untested),
48                         0x1 => Some(AddressState::LowBlockCount),
49                         0x2 => Some(AddressState::HighBlockCount),
50                         0x3 => Some(AddressState::LowVersion),
51                         0x4 => Some(AddressState::BadVersion),
52                         0x5 => Some(AddressState::NotFullNode),
53                         0x6 => Some(AddressState::ProtocolViolation),
54                         0x7 => Some(AddressState::Timeout),
55                         0x8 => Some(AddressState::TimeoutDuringRequest),
56                         0x9 => Some(AddressState::TimeoutAwaitingPong),
57                         0xa => Some(AddressState::TimeoutAwaitingAddr),
58                         0xb => Some(AddressState::TimeoutAwaitingBlock),
59                         0xc => Some(AddressState::Good),
60                         0xd => Some(AddressState::WasGood),
61                         0xe => Some(AddressState::EvilNode),
62                         _   => None,
63                 }
64         }
65
66         pub fn to_num(&self) -> u8 {
67                 match *self {
68                         AddressState::Untested => 0,
69                         AddressState::LowBlockCount => 1,
70                         AddressState::HighBlockCount => 2,
71                         AddressState::LowVersion => 3,
72                         AddressState::BadVersion => 4,
73                         AddressState::NotFullNode => 5,
74                         AddressState::ProtocolViolation => 6,
75                         AddressState::Timeout => 7,
76                         AddressState::TimeoutDuringRequest => 8,
77                         AddressState::TimeoutAwaitingPong => 9,
78                         AddressState::TimeoutAwaitingAddr => 10,
79                         AddressState::TimeoutAwaitingBlock => 11,
80                         AddressState::Good => 12,
81                         AddressState::WasGood => 13,
82                         AddressState::EvilNode => 14,
83                 }
84         }
85
86         pub fn to_str(&self) -> &'static str {
87                 match *self {
88                         AddressState::Untested => "Untested",
89                         AddressState::LowBlockCount => "Low Block Count",
90                         AddressState::HighBlockCount => "High Block Count",
91                         AddressState::LowVersion => "Low Version",
92                         AddressState::BadVersion => "Bad Version",
93                         AddressState::NotFullNode => "Not Full Node",
94                         AddressState::ProtocolViolation => "Protocol Violation",
95                         AddressState::Timeout => "Timeout",
96                         AddressState::TimeoutDuringRequest => "Timeout During Request",
97                         AddressState::TimeoutAwaitingPong => "Timeout Awaiting Pong",
98                         AddressState::TimeoutAwaitingAddr => "Timeout Awaiting Addr",
99                         AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
100                         AddressState::Good => "Good",
101                         AddressState::WasGood => "Was Good",
102                         AddressState::EvilNode => "Evil Node",
103                 }
104         }
105
106         pub const fn get_count() -> u8 {
107                 15
108         }
109 }
110
111 #[derive(Hash, PartialEq, Eq)]
112 pub enum U64Setting {
113         RunTimeout,
114         WasGoodTimeout,
115         RescanInterval(AddressState),
116         MinProtocolVersion,
117 }
118
119 #[derive(Hash, PartialEq, Eq)]
120 pub enum RegexSetting {
121         SubverRegex,
122 }
123
124 struct Node {
125         // Times in seconds-since-startup
126         last_good: u32, // Ignored unless state is Good or WasGood
127         // Since everything is is 4-byte aligned, using a u64 for services blows up our size
128         // substantially. Instead, use a u32 pair and bit shift as needed.
129         last_services: (u32, u32),
130         state: AddressState,
131         queued: bool,
132 }
133 impl Node {
134         #[inline]
135         fn last_services(&self) -> u64 {
136                 ((self.last_services.0 as u64) << 32) |
137                 ((self.last_services.1 as u64)      )
138         }
139         #[inline]
140         fn services(inp: u64) -> (u32, u32) {
141                 (
142                         ((inp & 0xffffffff00000000) >> 32) as u32,
143                         ((inp & 0x00000000ffffffff)      ) as u32
144                 )
145         }
146 }
147
148 #[test]
149 fn services_test() {
150         assert_eq!(
151                 Node { last_good: 0, state: AddressState::Good, queued: false, last_services: Node::services(0x1badcafedeadbeef) }
152                         .last_services(),
153                 0x1badcafedeadbeef);
154 }
155
156 /// Essentially SocketAddr but without a traffic class or scope
157 #[derive(Clone, PartialEq, Eq, Hash)]
158 enum SockAddr {
159         V4(SocketAddrV4),
160         V6(([u16; 8], u16)),
161 }
162 #[inline]
163 fn segs_to_ip6(segs: &[u16; 8]) -> Ipv6Addr {
164         Ipv6Addr::new(segs[0], segs[1], segs[2], segs[3], segs[4], segs[5], segs[6], segs[7])
165 }
166 impl From<SocketAddr> for SockAddr {
167         fn from(addr: SocketAddr) -> SockAddr {
168                 match addr {
169                         SocketAddr::V4(sa) => SockAddr::V4(sa),
170                         SocketAddr::V6(sa) => SockAddr::V6((sa.ip().segments(), sa.port())),
171                 }
172         }
173 }
174 impl Into<SocketAddr> for &SockAddr {
175         fn into(self) -> SocketAddr {
176                 match self {
177                         &SockAddr::V4(sa) => SocketAddr::V4(sa),
178                         &SockAddr::V6(sa) => SocketAddr::V6(SocketAddrV6::new(segs_to_ip6(&sa.0), sa.1, 0, 0))
179                 }
180         }
181 }
182 impl ToString for SockAddr {
183         fn to_string(&self) -> String {
184                 let sa: SocketAddr = self.into();
185                 sa.to_string()
186         }
187 }
188 impl SockAddr {
189         pub fn port(&self) -> u16 {
190                 match *self {
191                         SockAddr::V4(sa) => sa.port(),
192                         SockAddr::V6((_, port)) => port,
193                 }
194         }
195         pub fn ip(&self) -> IpAddr {
196                 match *self {
197                         SockAddr::V4(sa) => IpAddr::V4(sa.ip().clone()),
198                         SockAddr::V6((ip, _)) => IpAddr::V6(segs_to_ip6(&ip)),
199                 }
200         }
201 }
202
203 struct Nodes {
204         good_node_services: [HashSet<SockAddr>; 64],
205         nodes_to_state: HashMap<SockAddr, Node>,
206         state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
207 }
208 struct NodesMutRef<'a> {
209         good_node_services: &'a mut [HashSet<SockAddr>; 64],
210         nodes_to_state: &'a mut HashMap<SockAddr, Node>,
211         state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
212 }
213
214 impl Nodes {
215         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
216                 NodesMutRef {
217                         good_node_services: &mut self.good_node_services,
218                         nodes_to_state: &mut self.nodes_to_state,
219                         state_next_scan: &mut self.state_next_scan,
220                 }
221         }
222 }
223
224 pub struct Store {
225         u64_settings: RwLock<HashMap<U64Setting, u64>>,
226         subver_regex: RwLock<Arc<Regex>>,
227         nodes: RwLock<Nodes>,
228         start_time: Instant,
229         store: String,
230 }
231
232 impl Store {
233         pub fn new(store: String) -> impl Future<Item=Store, Error=()> {
234                 let settings_future = File::open(store.clone() + "/settings").and_then(|f| {
235                         let mut l = BufReader::new(f).lines();
236                         macro_rules! try_read {
237                                 ($lines: expr, $ty: ty) => { {
238                                         match $lines.next() {
239                                                 Some(line) => match line {
240                                                         Ok(line) => match line.parse::<$ty>() {
241                                                                 Ok(res) => res,
242                                                                 Err(e) => return future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
243                                                         },
244                                                         Err(e) => return future::err(e),
245                                                 },
246                                                 None => return future::err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "")),
247                                         }
248                                 } }
249                         }
250                         let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4);
251                         u64s.insert(U64Setting::RunTimeout, try_read!(l, u64));
252                         u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64));
253                         u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64));
254                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), try_read!(l, u64));
255                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), try_read!(l, u64));
256                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), try_read!(l, u64));
257                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), try_read!(l, u64));
258                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), try_read!(l, u64));
259                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), try_read!(l, u64));
260                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), try_read!(l, u64));
261                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), try_read!(l, u64));
262                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), try_read!(l, u64));
263                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), try_read!(l, u64));
264                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), try_read!(l, u64));
265                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
266                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
267                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
268                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
269                         future::ok((u64s, try_read!(l, Regex)))
270                 }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
271                         let mut u64s = HashMap::with_capacity(15);
272                         u64s.insert(U64Setting::RunTimeout, 120);
273                         u64s.insert(U64Setting::WasGoodTimeout, 21600);
274                         u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
275                         u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
276                         u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
277                         u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
278                         u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
279                         u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
280                         u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
281                         u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
282                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
283                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
284                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
285                         u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
286                         u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
287                         u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
288                         u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
289                         u64s.insert(U64Setting::MinProtocolVersion, 70002);
290                         future::ok((u64s, Regex::new(".*").unwrap()))
291                 });
292
293                 macro_rules! nodes_uninitd {
294                         () => { {
295                                 let state_vecs = [Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()];
296                                 let good_node_services = [HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new(), HashSet::new()];
297                                 Nodes {
298                                         good_node_services,
299                                         nodes_to_state: HashMap::new(),
300                                         state_next_scan: state_vecs,
301                                 }
302                         } }
303                 }
304
305                 let nodes_future = File::open(store.clone() + "/nodes").and_then(|f| {
306                         let mut res = nodes_uninitd!();
307                         let l = BufReader::new(f).lines();
308                         for line_res in l {
309                                 let line = match line_res {
310                                         Ok(l) => l,
311                                         Err(_) => return future::ok(res),
312                                 };
313                                 let mut line_iter = line.split(',');
314                                 macro_rules! try_read {
315                                         ($lines: expr, $ty: ty) => { {
316                                                 match $lines.next() {
317                                                         Some(line) => match line.parse::<$ty>() {
318                                                                 Ok(res) => res,
319                                                                 Err(_) => return future::ok(res),
320                                                         },
321                                                         None => return future::ok(res),
322                                                 }
323                                         } }
324                                 }
325                                 let sockaddr = try_read!(line_iter, SocketAddr);
326                                 let state = try_read!(line_iter, u8);
327                                 let last_services = try_read!(line_iter, u64);
328                                 let node = Node {
329                                         state: match AddressState::from_num(state) {
330                                                 Some(v) => v,
331                                                 None => return future::ok(res),
332                                         },
333                                         last_services: Node::services(last_services),
334                                         last_good: 0,
335                                         queued: true,
336                                 };
337                                 if node.state == AddressState::Good {
338                                         for i in 0..64 {
339                                                 if node.last_services() & (1 << i) != 0 {
340                                                         res.good_node_services[i].insert(sockaddr.into());
341                                                 }
342                                         }
343                                 }
344                                 res.state_next_scan[node.state.to_num() as usize].push(sockaddr.into());
345                                 res.nodes_to_state.insert(sockaddr.into(), node);
346                         }
347                         future::ok(res)
348                 }).or_else(|_| -> future::FutureResult<Nodes, ()> {
349                         future::ok(nodes_uninitd!())
350                 });
351                 settings_future.join(nodes_future).and_then(move |((u64_settings, regex), nodes)| {
352                         future::ok(Store {
353                                 u64_settings: RwLock::new(u64_settings),
354                                 subver_regex: RwLock::new(Arc::new(regex)),
355                                 nodes: RwLock::new(nodes),
356                                 store,
357                                 start_time: Instant::now(),
358                         })
359                 })
360         }
361
362         pub fn get_u64(&self, setting: U64Setting) -> u64 {
363                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
364         }
365
366         pub fn set_u64(&self, setting: U64Setting, value: u64) {
367                 *self.u64_settings.write().unwrap().get_mut(&setting).unwrap() = value;
368         }
369
370         pub fn get_node_count(&self, state: AddressState) -> usize {
371                 self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
372         }
373
374         pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
375                 Arc::clone(&*self.subver_regex.read().unwrap())
376         }
377
378         pub fn set_regex(&self, _setting: RegexSetting, value: Regex) {
379                 *self.subver_regex.write().unwrap() = Arc::new(value);
380         }
381
382         pub fn add_fresh_addrs<I: Iterator<Item=SocketAddr>>(&self, addresses: I) -> u64 {
383                 let mut res = 0;
384                 let cur_time = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
385                 let mut nodes = self.nodes.write().unwrap();
386                 for addr in addresses {
387                         match nodes.nodes_to_state.entry(addr.into()) {
388                                 hash_map::Entry::Vacant(e) => {
389                                         e.insert(Node {
390                                                 state: AddressState::Untested,
391                                                 last_services: (0, 0),
392                                                 last_good: cur_time,
393                                                 queued: true,
394                                         });
395                                         nodes.state_next_scan[AddressState::Untested.to_num() as usize].push(addr.into());
396                                         res += 1;
397                                 },
398                                 hash_map::Entry::Occupied(_) => {},
399                         }
400                 }
401                 res
402         }
403
404         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
405                 self.add_fresh_addrs(addresses.iter().filter_map(|(_, addr)| {
406                         match addr.socket_addr() {
407                                 Ok(socketaddr) => Some(socketaddr),
408                                 Err(_) => None, // TODO: Handle onions
409                         }
410                 }));
411         }
412         pub fn add_fresh_nodes_v2(&self, addresses: &Vec<AddrV2Message>) {
413                 self.add_fresh_addrs(addresses.iter().filter_map(|addr| {
414                         match addr.socket_addr() {
415                                 Ok(socketaddr) => Some(socketaddr),
416                                 Err(_) => None, // TODO: Handle onions
417                         }
418                 }));
419         }
420
421         pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
422                 let addr: SockAddr = sockaddr.into();
423
424                 let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
425
426                 let mut nodes_lock = self.nodes.write().unwrap();
427                 let nodes = nodes_lock.borrow_mut();
428
429                 let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
430                         state: AddressState::Untested,
431                         last_services: (0, 0),
432                         last_good: now,
433                         queued: false,
434                 });
435                 let ret = state_ref.state;
436                 let was_good_timeout: u32 = self.get_u64(U64Setting::WasGoodTimeout)
437                         .try_into().expect("Need WasGood timeout that fits in a u32");
438                 if (state_ref.state == AddressState::Good || state_ref.state == AddressState::WasGood)
439                                 && state != AddressState::Good
440                                 && state_ref.last_good >= now - was_good_timeout {
441                         state_ref.state = AddressState::WasGood;
442                         for i in 0..64 {
443                                 if state_ref.last_services() & (1 << i) != 0 {
444                                         nodes.good_node_services[i].remove(&addr);
445                                 }
446                         }
447                         state_ref.last_services = (0, 0);
448                         if !state_ref.queued {
449                                 nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
450                                 state_ref.queued = true;
451                         }
452                 } else {
453                         state_ref.state = state;
454                         if state == AddressState::Good {
455                                 for i in 0..64 {
456                                         if services & (1 << i) != 0 && state_ref.last_services() & (1 << i) == 0 {
457                                                 nodes.good_node_services[i].insert(addr.clone());
458                                         } else if services & (1 << i) == 0 && state_ref.last_services() & (1 << i) != 0 {
459                                                 nodes.good_node_services[i].remove(&addr);
460                                         }
461                                 }
462                                 state_ref.last_services = Node::services(services);
463                                 state_ref.last_good = now;
464                         }
465                         if !state_ref.queued {
466                                 nodes.state_next_scan[state.to_num() as usize].push(addr);
467                                 state_ref.queued = true;
468                         }
469                 }
470                 ret
471         }
472
473         pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
474                 let settings_file = self.store.clone() + "/settings";
475                 let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
476                         let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
477                                 self.get_u64(U64Setting::RunTimeout),
478                                 self.get_u64(U64Setting::WasGoodTimeout),
479                                 self.get_u64(U64Setting::MinProtocolVersion),
480                                 self.get_u64(U64Setting::RescanInterval(AddressState::Untested)),
481                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowBlockCount)),
482                                 self.get_u64(U64Setting::RescanInterval(AddressState::HighBlockCount)),
483                                 self.get_u64(U64Setting::RescanInterval(AddressState::LowVersion)),
484                                 self.get_u64(U64Setting::RescanInterval(AddressState::BadVersion)),
485                                 self.get_u64(U64Setting::RescanInterval(AddressState::NotFullNode)),
486                                 self.get_u64(U64Setting::RescanInterval(AddressState::ProtocolViolation)),
487                                 self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)),
488                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest)),
489                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong)),
490                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr)),
491                                 self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
492                                 self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
493                                 self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
494                                 self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
495                                 self.get_regex(RegexSetting::SubverRegex).as_str());
496                         write_all(f, settings_string).and_then(|(mut f, _)| {
497                                 f.poll_sync_all()
498                         }).and_then(|_| {
499                                 tokio::fs::rename(settings_file.clone() + ".tmp", settings_file)
500                         })
501                 });
502
503                 let nodes_file = self.store.clone() + "/nodes";
504                 let nodes_future = File::create(nodes_file.clone() + ".tmp").and_then(move |f| {
505                         let mut nodes_buff = String::new();
506                         {
507                                 let nodes = self.nodes.read().unwrap();
508                                 nodes_buff.reserve(nodes.nodes_to_state.len() * 32);
509                                 for (ref sockaddr, ref node) in nodes.nodes_to_state.iter() {
510                                         nodes_buff += &sockaddr.to_string();
511                                         nodes_buff += ",";
512                                         nodes_buff += &node.state.to_num().to_string();
513                                         nodes_buff += ",";
514                                         nodes_buff += &node.last_services().to_string();
515                                         nodes_buff += "\n";
516                                 }
517                         }
518                         write_all(f, nodes_buff)
519                 }).and_then(|(mut f, _)| {
520                         f.poll_sync_all()
521                 }).and_then(|_| {
522                         tokio::fs::rename(nodes_file.clone() + ".tmp", nodes_file)
523                 });
524
525                 settings_future.join(nodes_future).then(|_| { future::ok(()) })
526         }
527
528         pub fn write_dns(&'static self, bgp_client: Arc<BGPClient>) -> impl Future<Item=(), Error=()> {
529                 let dns_file = self.store.clone() + "/nodes.dump";
530                 File::create(dns_file.clone() + ".tmp").and_then(move |f| {
531                         let mut dns_buff = String::new();
532                         {
533                                 let mut rng = thread_rng();
534                                 for i in &[ 0b00000000001u64,
535                                             0b00000000100,
536                                             0b00000000101,
537                                             0b00000001000,
538                                             0b00000001001,
539                                             0b00000001100,
540                                             0b00000001101,
541                                             0b00001001001,
542                                             0b10000000000,
543                                             0b10000000001,
544                                             0b10000000100,
545                                             0b10000000101,
546                                             0b10000001000,
547                                             0b10000001001,
548                                             0b10000001100,
549                                             0b10000001101,
550                                             0b10001001000] {
551                                 //            ^ NODE_NETWORK_LIIMTED
552                                 //COMPACT_FILTERS ^   ^ NODE_BLOOM
553                                 //      NODE_WITNESS ^  ^ NODE_NETWORK
554                                 // We support all combos of NETWORK, NETWORK_LIMITED, BLOOM, and WITNESS
555                                 // We support COMPACT_FILTERS with WITNESS and NETWORK or NETWORK_LIIMTED.
556                                         let mut tor_set: Vec<Ipv6Addr> = Vec::new();
557                                         let mut v6_set: Vec<Ipv6Addr> = Vec::new();
558                                         let mut v4_set: Vec<Ipv4Addr> = Vec::new();
559                                         macro_rules! add_addr { ($addr: expr) => {
560                                                 match $addr.ip() {
561                                                         IpAddr::V4(v4addr) => v4_set.push(v4addr),
562                                                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => tor_set.push(v6addr),
563                                                         IpAddr::V6(v6addr) => v6_set.push(v6addr),
564                                                 }
565                                         } }
566                                         {
567                                                 let nodes = self.nodes.read().unwrap();
568                                                 if i.count_ones() == 1 {
569                                                         for j in 0..64 {
570                                                                 if i & (1 << j) != 0 {
571                                                                         let set_ref = &nodes.good_node_services[j];
572                                                                         for a in set_ref.iter().filter(|e| e.port() == 8333) {
573                                                                                 add_addr!(a);
574                                                                         }
575                                                                         break;
576                                                                 }
577                                                         }
578                                                 } else if i.count_ones() == 2 {
579                                                         let mut first_set = None;
580                                                         let mut second_set = None;
581                                                         for j in 0..64 {
582                                                                 if i & (1 << j) != 0 {
583                                                                         if first_set == None {
584                                                                                 first_set = Some(&nodes.good_node_services[j]);
585                                                                         } else {
586                                                                                 second_set = Some(&nodes.good_node_services[j]);
587                                                                                 break;
588                                                                         }
589                                                                 }
590                                                         }
591                                                         for a in first_set.unwrap().intersection(&second_set.unwrap()).filter(|e| e.port() == 8333) {
592                                                                 add_addr!(a);
593                                                         }
594                                                 } else {
595                                                         //TODO: Could optimize this one a bit
596                                                         let mut intersection;
597                                                         let mut intersection_set_ref = None;
598                                                         for j in 0..64 {
599                                                                 if i & (1 << j) != 0 {
600                                                                         if intersection_set_ref == None {
601                                                                                 intersection_set_ref = Some(&nodes.good_node_services[j]);
602                                                                         } else {
603                                                                                 let new_intersection = intersection_set_ref.unwrap()
604                                                                                         .intersection(&nodes.good_node_services[j]).map(|e| (*e).clone()).collect();
605                                                                                 intersection = Some(new_intersection);
606                                                                                 intersection_set_ref = Some(intersection.as_ref().unwrap());
607                                                                         }
608                                                                 }
609                                                         }
610                                                         for a in intersection_set_ref.unwrap().iter().filter(|e| e.port() == 8333) {
611                                                                 add_addr!(a);
612                                                         }
613                                                 }
614                                         }
615                                         let mut asn_set = HashSet::with_capacity(cmp::max(v4_set.len(), v6_set.len()));
616                                         asn_set.insert(0);
617                                         for (a, asn) in v4_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V4(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 21) {
618                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tA\t{} ; AS{}\n", i, a, asn);
619                                         }
620                                         asn_set.clear();
621                                         asn_set.insert(0);
622                                         for (a, asn) in v6_set.iter().map(|a| (a, bgp_client.get_asn(IpAddr::V6(*a)))).filter(|a| asn_set.insert(a.1)).choose_multiple(&mut rng, 10) {
623                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; AS{}\n", i, a, asn);
624                                         }
625                                         for a in tor_set.iter().choose_multiple(&mut rng, 2) {
626                                                 dns_buff += &format!("x{:x}.dnsseed\tIN\tAAAA\t{} ; Tor Onionv2\n", i, a);
627                                         }
628                                 }
629                         }
630                         write_all(f, dns_buff)
631                 }).and_then(|(mut f, _)| {
632                         f.poll_sync_all()
633                 }).and_then(|_| {
634                         tokio::fs::rename(dns_file.clone() + ".tmp", dns_file)
635                 }).then(|_| { future::ok(()) })
636         }
637
638         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
639                 let mut res = Vec::with_capacity(128);
640
641                 {
642                         let mut nodes_lock = self.nodes.write().unwrap();
643                         let nodes = nodes_lock.borrow_mut();
644                         for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() {
645                                 let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1);
646                                 let split_point = cmp::min(cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval,
647                                                         SECS_PER_SCAN_RESULTS * MAX_CONNS_PER_SEC_PER_STATUS),
648                                                 state_nodes.len() as u64);
649                                 for node in state_nodes.drain(..split_point as usize) {
650                                         nodes.nodes_to_state.get_mut(&node).unwrap().queued = false;
651                                         res.push((&node).into());
652                                 }
653                         }
654                 }
655                 res.shuffle(&mut thread_rng());
656                 res
657         }
658 }