2 use std::collections::{HashSet, HashMap, hash_map};
4 use std::net::SocketAddr;
5 use std::time::{Duration, Instant};
7 use bitcoin::network::address::Address;
10 use rand::seq::SliceRandom;
12 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
13 pub enum AddressState {
27 #[derive(Hash, PartialEq, Eq)]
32 RescanInterval(AddressState),
36 #[derive(Hash, PartialEq, Eq)]
37 pub enum StringSetting {
48 good_node_services: HashMap<u8, HashSet<SocketAddr>>,
49 nodes_to_state: HashMap<SocketAddr, Node>,
50 state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
52 struct NodesMutRef<'a> {
53 good_node_services: &'a mut HashMap<u8, HashSet<SocketAddr>>,
54 nodes_to_state: &'a mut HashMap<SocketAddr, Node>,
55 state_next_scan: &'a mut HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
59 fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
61 good_node_services: &mut self.good_node_services,
62 nodes_to_state: &mut self.nodes_to_state,
63 state_next_scan: &mut self.state_next_scan,
69 u64_settings: RwLock<HashMap<U64Setting, u64>>,
70 subver_regex: RwLock<String>,
75 pub fn new() -> Store {
76 let mut u64s = HashMap::with_capacity(15);
77 u64s.insert(U64Setting::ConnsPerSec, 50);
78 u64s.insert(U64Setting::RunTimeout, 120);
79 u64s.insert(U64Setting::WasGoodTimeout, 21600);
80 u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
81 u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
82 u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
83 u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
84 u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
85 u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
86 u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
87 u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
88 u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
89 u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
90 u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
91 u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
92 let mut state_vecs = HashMap::with_capacity(11);
93 state_vecs.insert(AddressState::Untested, Vec::new());
94 state_vecs.insert(AddressState::LowBlockCount, Vec::new());
95 state_vecs.insert(AddressState::HighBlockCount, Vec::new());
96 state_vecs.insert(AddressState::LowVersion, Vec::new());
97 state_vecs.insert(AddressState::BadVersion, Vec::new());
98 state_vecs.insert(AddressState::NotFullNode, Vec::new());
99 state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
100 state_vecs.insert(AddressState::Timeout, Vec::new());
101 state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
102 state_vecs.insert(AddressState::Good, Vec::new());
103 state_vecs.insert(AddressState::WasGood, Vec::new());
104 let mut good_node_services = HashMap::with_capacity(64);
106 good_node_services.insert(i, HashSet::new());
109 u64_settings: RwLock::new(u64s),
110 subver_regex: RwLock::new(".*".to_string()),
111 nodes: RwLock::new(Nodes {
113 nodes_to_state: HashMap::new(),
114 state_next_scan: state_vecs,
119 pub fn get_u64(&self, setting: U64Setting) -> u64 {
120 *self.u64_settings.read().unwrap().get(&setting).unwrap()
123 pub fn get_node_count(&self, state: AddressState) -> usize {
124 self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
127 pub fn get_string(&self, _setting: StringSetting) -> String {
128 self.subver_regex.read().unwrap().clone()
131 pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
132 let mut nodes = self.nodes.write().unwrap();
133 let cur_time = Instant::now();
134 for &(_, ref addr) in addresses {
135 if let Ok(socketaddr) = addr.socket_addr() {
136 match nodes.nodes_to_state.entry(socketaddr.clone()) {
137 hash_map::Entry::Vacant(e) => {
139 state: AddressState::Untested,
141 last_update: cur_time,
143 nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
145 hash_map::Entry::Occupied(_) => {},
148 //TODO: Handle onions
153 pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
154 let mut nodes_lock = self.nodes.write().unwrap();
155 let nodes = nodes_lock.borrow_mut();
156 let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
157 state_ref.last_update = Instant::now();
158 if state_ref.state == AddressState::Good && state != AddressState::Good {
159 state_ref.state = AddressState::WasGood;
161 if state_ref.last_services & (1 << i) != 0 {
162 nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
165 state_ref.last_services = 0;
166 nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((state_ref.last_update, addr));
168 state_ref.state = state;
169 if state == AddressState::Good {
171 if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
172 nodes.good_node_services.get_mut(&i).unwrap().insert(addr);
173 } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
174 nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
178 nodes.state_next_scan.get_mut(&state).unwrap().push((state_ref.last_update, addr));
182 pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
183 let mut res = Vec::with_capacity(600);
184 let cur_time = Instant::now();
185 let mut nodes = self.nodes.write().unwrap();
186 for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
187 let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
188 let split_point = cmp::min(cmp::min(600 - res.len(), 60),
189 state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
190 let mut new_nodes = state_nodes.split_off(split_point);
191 mem::swap(&mut new_nodes, state_nodes);
192 for (_, node) in new_nodes.drain(..) {
196 res.shuffle(&mut thread_rng());