X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fdatastore.rs;h=bfc15c90bc7f0978b2c03cc3274d72263ce9e6d0;hb=HEAD;hp=c55e47d93cfb1b59d253a583ebdd611f5b4780d3;hpb=85bcd9e15da4aca021ff67356c3725d3ffe95e13;p=dnsseed-rust diff --git a/src/datastore.rs b/src/datastore.rs index c55e47d..bfc15c9 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::collections::{HashSet, HashMap, hash_map}; use std::sync::{Arc, RwLock}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::io::{BufRead, BufReader}; use bitcoin::network::address::{Address, AddrV2Message}; @@ -17,10 +17,11 @@ use tokio::io::write_all; use regex::Regex; +use crate::bloom::RollingBloomFilter; use crate::bgp_client::BGPClient; pub const SECS_PER_SCAN_RESULTS: u64 = 15; -const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 30; +const MAX_CONNS_PER_SEC_PER_STATUS: u64 = 2500; #[derive(Clone, Copy, Hash, PartialEq, Eq)] pub enum AddressState { @@ -225,6 +226,7 @@ pub struct Store { u64_settings: RwLock>, subver_regex: RwLock>, nodes: RwLock, + timeout_nodes: RollingBloomFilter, start_time: Instant, store: String, } @@ -271,14 +273,14 @@ impl Store { let mut u64s = HashMap::with_capacity(15); u64s.insert(U64Setting::RunTimeout, 120); u64s.insert(U64Setting::WasGoodTimeout, 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1); + u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 3600); u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600); u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200); u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600); u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600); u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400); u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400); - u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400); + u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 604800); u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600); u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingPong), 3600); u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingAddr), 1800); @@ -353,6 +355,7 @@ impl Store { u64_settings: RwLock::new(u64_settings), subver_regex: RwLock::new(Arc::new(regex)), nodes: RwLock::new(nodes), + timeout_nodes: RollingBloomFilter::new(), store, start_time: Instant::now(), }) @@ -370,6 +373,9 @@ impl Store { pub fn get_node_count(&self, state: AddressState) -> usize { self.nodes.read().unwrap().state_next_scan[state.to_num() as usize].len() } + pub fn get_bloom_node_count(&self) -> [usize; crate::bloom::GENERATION_COUNT] { + self.timeout_nodes.get_element_count() + } pub fn get_regex(&self, _setting: RegexSetting) -> Arc { Arc::clone(&*self.subver_regex.read().unwrap()) @@ -421,12 +427,33 @@ impl Store { pub fn set_node_state(&self, sockaddr: SocketAddr, state: AddressState, services: u64) -> AddressState { let addr: SockAddr = sockaddr.into(); + if state == AddressState::Untested && self.timeout_nodes.contains(&addr) { + return AddressState::Timeout; + } + let now = (Instant::now() - self.start_time).as_secs().try_into().unwrap(); let mut nodes_lock = self.nodes.write().unwrap(); let nodes = nodes_lock.borrow_mut(); - let state_ref = nodes.nodes_to_state.entry(addr.clone()).or_insert(Node { + let node_entry = nodes.nodes_to_state.entry(addr.clone()); + match node_entry { + hash_map::Entry::Occupied(entry) + if entry.get().state == AddressState::Untested && + entry.get().last_services() == 0 && + state == AddressState::Timeout => { + entry.remove_entry(); + self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)))); + return AddressState::Untested; + }, + hash_map::Entry::Vacant(_) if state == AddressState::Timeout => { + self.timeout_nodes.insert(&addr, Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::Timeout)))); + return AddressState::Untested; + }, + _ => {}, + } + + let state_ref = node_entry.or_insert(Node { state: AddressState::Untested, last_services: (0, 0), last_good: now, @@ -444,7 +471,6 @@ impl Store { nodes.good_node_services[i].remove(&addr); } } - state_ref.last_services = (0, 0); if !state_ref.queued { nodes.state_next_scan[AddressState::WasGood.to_num() as usize].push(addr); state_ref.queued = true;