let cur_time = Instant::now();
let mut nodes = self.nodes.write().unwrap();
- for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
+ for (idx, (state, state_nodes)) in nodes.state_next_scan.iter_mut().enumerate() {
let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
- let split_point = cmp::min(cmp::min(results - res.len(), per_bucket_results),
+ let split_point = cmp::min(cmp::min(results - res.len(), results - (per_bucket_results * (AddressState::get_count() as usize - idx))),
state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
let mut new_nodes = state_nodes.split_off(split_point);
mem::swap(&mut new_nodes, state_nodes);
mod timeout_stream;
mod datastore;
-use std::env;
+use std::{cmp, env};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{Ordering, AtomicBool};
let store = unsafe { DATA_STORE.as_ref().unwrap() };
let mut scan_nodes = store.get_next_scan_nodes();
let per_iter_time = Duration::from_millis(1000 / store.get_u64(U64Setting::ConnsPerSec));
- let mut iter_time = Instant::now();
+ let start_time = Instant::now();
+ let mut iter_time = start_time;
for node in scan_nodes.drain(..) {
scan_node(iter_time, node);
iter_time += per_iter_time;
}
- Delay::new(iter_time).then(|_| {
+ Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(15))).then(|_| {
let store = unsafe { DATA_STORE.as_ref().unwrap() };
store.save_data().then(|_| {
if !START_SHUTDOWN.load(Ordering::Relaxed) {