From b133f38bd61ba96876d1d8c31d606b6c812dbc61 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 18 Sep 2019 13:54:51 -0400 Subject: [PATCH] Drop the connections/sec arg and just make it fit in the interval --- src/datastore.rs | 23 ++++++++++------------- src/main.rs | 18 ++++++++++-------- src/printer.rs | 3 --- src/reader.rs | 1 - 4 files changed, 20 insertions(+), 25 deletions(-) diff --git a/src/datastore.rs b/src/datastore.rs index 2dbe370..2f0e9a8 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -18,6 +18,8 @@ use regex::Regex; use crate::bgp_client::BGPClient; +pub const SECS_PER_SCAN_RESULTS: u64 = 15; + #[derive(Clone, Copy, Hash, PartialEq, Eq)] pub enum AddressState { Untested, @@ -106,7 +108,6 @@ impl AddressState { #[derive(Hash, PartialEq, Eq)] pub enum U64Setting { - ConnsPerSec, RunTimeout, WasGoodTimeout, RescanInterval(AddressState), @@ -172,7 +173,6 @@ impl Store { } } } let mut u64s = HashMap::with_capacity(AddressState::get_count() as usize + 4); - u64s.insert(U64Setting::ConnsPerSec, try_read!(l, u64)); u64s.insert(U64Setting::RunTimeout, try_read!(l, u64)); u64s.insert(U64Setting::WasGoodTimeout, try_read!(l, u64)); u64s.insert(U64Setting::MinProtocolVersion, try_read!(l, u64)); @@ -194,10 +194,9 @@ impl Store { future::ok((u64s, try_read!(l, Regex))) }).or_else(|_| -> future::FutureResult<(HashMap, Regex), ()> { let mut u64s = HashMap::with_capacity(15); - u64s.insert(U64Setting::ConnsPerSec, 10); u64s.insert(U64Setting::RunTimeout, 120); u64s.insert(U64Setting::WasGoodTimeout, 21600); - u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0); + u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 1); u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600); u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200); u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600); @@ -386,8 +385,7 @@ impl Store { pub fn save_data(&'static self) -> impl Future { let settings_file = self.store.clone() + "/settings"; let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| { - let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", - self.get_u64(U64Setting::ConnsPerSec), + let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", self.get_u64(U64Setting::RunTimeout), self.get_u64(U64Setting::WasGoodTimeout), self.get_u64(U64Setting::MinProtocolVersion), @@ -529,18 +527,17 @@ impl Store { } pub fn get_next_scan_nodes(&self) -> Vec { - let results = 30 * self.get_u64(U64Setting::ConnsPerSec) as usize; - let per_bucket_results = results / (AddressState::get_count() as usize); - let mut res = Vec::with_capacity(results); + let mut res = Vec::with_capacity(128); let cur_time = Instant::now(); { let mut nodes = self.nodes.write().unwrap(); for (idx, state_nodes) in nodes.state_next_scan.iter_mut().enumerate() { - let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap()))); - let split_point = cmp::min(cmp::min(results - res.len(), (per_bucket_results * (idx + 1)) - res.len()), - state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx)); - let mut new_nodes = state_nodes.split_off(split_point); + let rescan_interval = cmp::max(self.get_u64(U64Setting::RescanInterval(AddressState::from_num(idx as u8).unwrap())), 1); + let cmp_time = cur_time - Duration::from_secs(rescan_interval); + let split_point = cmp::min(SECS_PER_SCAN_RESULTS * state_nodes.len() as u64 / rescan_interval, + state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx) as u64); + let mut new_nodes = state_nodes.split_off(split_point as usize); mem::swap(&mut new_nodes, state_nodes); for (_, node) in new_nodes.drain(..) { res.push(node); diff --git a/src/main.rs b/src/main.rs index 19dcb30..d25020e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod bgp_client; mod timeout_stream; mod datastore; -use std::{cmp, env}; +use std::env; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::sync::atomic::{Ordering, AtomicBool}; @@ -279,17 +279,19 @@ fn scan_net() { let printer = unsafe { PRINTER.as_ref().unwrap() }; let store = unsafe { DATA_STORE.as_ref().unwrap() }; + let start_time = Instant::now(); let mut scan_nodes = store.get_next_scan_nodes(); printer.add_line(format!("Got {} addresses to scan", scan_nodes.len()), false); - let per_iter_time = Duration::from_millis(1000 / store.get_u64(U64Setting::ConnsPerSec)); - let start_time = Instant::now(); - let mut iter_time = start_time; + if !scan_nodes.is_empty() { + let per_iter_time = Duration::from_millis(datastore::SECS_PER_SCAN_RESULTS / scan_nodes.len() as u64); + let mut iter_time = start_time; - for node in scan_nodes.drain(..) { - scan_node(iter_time, node, false); - iter_time += per_iter_time; + for node in scan_nodes.drain(..) { + scan_node(iter_time, node, false); + iter_time += per_iter_time; + } } - Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(1))).then(move |_| { + Delay::new(start_time + Duration::from_secs(datastore::SECS_PER_SCAN_RESULTS)).then(move |_| { if !START_SHUTDOWN.load(Ordering::Relaxed) { scan_net(); } diff --git a/src/printer.rs b/src/printer.rs index ba7173c..4b65222 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -65,9 +65,6 @@ impl Printer { out.write_all(format!( "\nCurrent connections open/in progress: {}\n", stats.connection_count).as_bytes()).expect("stdout broken?"); - out.write_all(format!( - "Connections opened each second: {} (\"c x\" to change to x seconds)\n", store.get_u64(U64Setting::ConnsPerSec) - ).as_bytes()).expect("stdout broken?"); out.write_all(format!( "Current block count: {}\n", stats.header_count).as_bytes()).expect("stdout broken?"); diff --git a/src/reader.rs b/src/reader.rs index 0098ef4..967d717 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -91,7 +91,6 @@ pub fn read(store: &'static Store, printer: &'static Printer, bgp_client: Arc store.set_u64(U64Setting::ConnsPerSec, try_parse_next_chunk!(u64)), "t" => store.set_u64(U64Setting::RunTimeout, try_parse_next_chunk!(u64)), "v" => store.set_u64(U64Setting::MinProtocolVersion, try_parse_next_chunk!(u64)), "w" => store.set_u64(U64Setting::WasGoodTimeout, try_parse_next_chunk!(u64)), -- 2.39.5