Store nodes that timed out in a rolling bloom filter
[dnsseed-rust] / src / datastore.rs
index 04e377be126d61e8be93f3dc558dd6cf7affe0c7..bf0470a10f6c34393915da67bd9edbcb1684b111 100644 (file)
@@ -3,7 +3,7 @@ use std::convert::TryInto;
 use std::collections::{HashSet, HashMap, hash_map};
 use std::sync::{Arc, RwLock};
 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
-use std::time::Instant;
+use std::time::{Duration, Instant};
 use std::io::{BufRead, BufReader};
 
 use bitcoin::network::address::{Address, AddrV2Message};
@@ -17,6 +17,7 @@ use tokio::io::write_all;
 
 use regex::Regex;
 
+use crate::bloom::RollingBloomFilter;
 use crate::bgp_client::BGPClient;
 
 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
@@ -203,11 +204,13 @@ impl SockAddr {
 struct Nodes {
        good_node_services: [HashSet<SockAddr>; 64],
        nodes_to_state: HashMap<SockAddr, Node>,
+       timeout_nodes: RollingBloomFilter<SockAddr>,
        state_next_scan: [Vec<SockAddr>; AddressState::get_count() as usize],
 }
 struct NodesMutRef<'a> {
        good_node_services: &'a mut [HashSet<SockAddr>; 64],
        nodes_to_state: &'a mut HashMap<SockAddr, Node>,
+       timeout_nodes: &'a mut RollingBloomFilter<SockAddr>,
        state_next_scan: &'a mut [Vec<SockAddr>; AddressState::get_count() as usize],
 }
 
@@ -216,6 +219,7 @@ impl Nodes {
                NodesMutRef {
                        good_node_services: &mut self.good_node_services,
                        nodes_to_state: &mut self.nodes_to_state,
+                       timeout_nodes: &mut self.timeout_nodes,
                        state_next_scan: &mut self.state_next_scan,
                }
        }
@@ -297,6 +301,7 @@ impl Store {
                                Nodes {
                                        good_node_services,
                                        nodes_to_state: HashMap::new(),
+                                       timeout_nodes: RollingBloomFilter::new(),
                                        state_next_scan: state_vecs,
                                }
                        } }
@@ -370,6 +375,9 @@ impl Store {
        pub fn get_node_count(&self, state: AddressState) -> usize {
                self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
        }
+       pub fn get_bloom_node_count(&self) -> [usize; crate::bloom::GENERATION_COUNT] {
+               self.nodes.read().unwrap().timeout_nodes.get_element_count()
+       }
 
        pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
                Arc::clone(&*self.subver_regex.read().unwrap())
@@ -426,7 +434,27 @@ impl Store {
                let mut nodes_lock = self.nodes.write().unwrap();
                let nodes = nodes_lock.borrow_mut();
 
-               let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
+               let node_entry = nodes.nodes_to_state.entry(addr.clone());
+               match node_entry {
+                       hash_map::Entry::Occupied(entry)
+                                       if entry.get().state == AddressState::Untested &&
+                                          entry.get().last_services() == 0 &&
+                                          state == AddressState::Timeout => {
+                               entry.remove_entry();
+                               nodes.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
+                               return AddressState::Untested;
+                       },
+                       hash_map::Entry::Vacant(_) if state == AddressState::Timeout => {
+                               nodes.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
+                               return AddressState::Untested;
+                       },
+                       hash_map::Entry::Vacant(_) if nodes.timeout_nodes.contains(&addr) => {
+                               return AddressState::Timeout;
+                       },
+                       _ => {},
+               }
+
+               let state_ref = node_entry.or_insert(Node {
                        state: AddressState::Untested,
                        last_services: (0, 0),
                        last_good: now,