Drop memory limit. It was useful to debug OOMs but is now unnecessary
[dnsseed-rust] / src / datastore.rs
index c55e47d93cfb1b59d253a583ebdd611f5b4780d3..bfc15c90bc7f0978b2c03cc3274d72263ce9e6d0 100644 (file)
@@ -3,7 +3,7 @@ use std::convert::TryInto;
 use std::collections::{HashSet, HashMap, hash_map};
 use std::sync::{Arc, RwLock};
 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
-use std::time::Instant;
+use std::time::{Duration, Instant};
 use std::io::{BufRead, BufReader};
 
 use bitcoin::network::address::{Address, AddrV2Message};
@@ -17,10 +17,11 @@ use tokio::io::write_all;
 
 use regex::Regex;
 
+use crate::bloom::RollingBloomFilter;
 use crate::bgp_client::BGPClient;
 
 pub const SECS_PER_SCAN_RESULTS: u64 = 15;
-const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30;
+const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 2500;
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
 pub enum AddressState {
@@ -225,6 +226,7 @@ pub struct Store {
        u64_settings: RwLock<HashMap<U64Setting, u64>>,
        subver_regex: RwLock<Arc<Regex>>,
        nodes: RwLock<Nodes>,
+       timeout_nodes: RollingBloomFilter<SockAddr>,
        start_time: Instant,
        store: String,
 }
@@ -271,14 +273,14 @@ impl Store {
                        let mut u64s = HashMap::with_capacity(15);
                        u64s.insert(U64Setting::RunTimeout, 120);
                        u64s.insert(U64Setting::WasGoodTimeout, 21600);
-                       u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1);
+                       u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 3600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
                        u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
                        u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
-                       u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
+                       u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 604800);
                        u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800);
@@ -353,6 +355,7 @@ impl Store {
                                u64_settings: RwLock::new(u64_settings),
                                subver_regex: RwLock::new(Arc::new(regex)),
                                nodes: RwLock::new(nodes),
+                               timeout_nodes: RollingBloomFilter::new(),
                                store,
                                start_time: Instant::now(),
                        })
@@ -370,6 +373,9 @@ impl Store {
        pub fn get_node_count(&self, state: AddressState) -> usize {
                self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len()
        }
+       pub fn get_bloom_node_count(&self) -> [usize; crate::bloom::GENERATION_COUNT] {
+               self.timeout_nodes.get_element_count()
+       }
 
        pub fn get_regex(&self, _setting: RegexSetting) -> Arc<Regex> {
                Arc::clone(&*self.subver_regex.read().unwrap())
@@ -421,12 +427,33 @@ impl Store {
        pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState {
                let addr: SockAddr = sockaddr.into();
 
+               if state == AddressState::Untested && self.timeout_nodes.contains(&addr) {
+                       return AddressState::Timeout;
+               }
+
                let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap();
 
                let mut nodes_lock = self.nodes.write().unwrap();
                let nodes = nodes_lock.borrow_mut();
 
-               let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node {
+               let node_entry = nodes.nodes_to_state.entry(addr.clone());
+               match node_entry {
+                       hash_map::Entry::Occupied(entry)
+                                       if entry.get().state == AddressState::Untested &&
+                                          entry.get().last_services() == 0 &&
+                                          state == AddressState::Timeout => {
+                               entry.remove_entry();
+                               self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
+                               return AddressState::Untested;
+                       },
+                       hash_map::Entry::Vacant(_) if state == AddressState::Timeout => {
+                               self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout))));
+                               return AddressState::Untested;
+                       },
+                       _ => {},
+               }
+
+               let state_ref = node_entry.or_insert(Node {
                        state: AddressState::Untested,
                        last_services: (0, 0),
                        last_good: now,
@@ -444,7 +471,6 @@ impl Store {
                                        nodes.good_node_services[i].remove(&addr);
                                }
                        }
-                       state_ref.last_services = (0, 0);
                        if !state_ref.queued {
                                nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr);
                                state_ref.queued = true;