Track service bits
[dnsseed-rust] / src / datastore.rs
1 use std::{cmp, mem};
2 use std::collections::{HashSet, HashMap, hash_map};
3 use std::sync::RwLock;
4 use std::net::SocketAddr;
5 use std::time::{Duration, Instant};
6
7 use bitcoin::network::address::Address;
8
9 use rand::thread_rng;
10 use rand::seq::SliceRandom;
11
12 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
13 pub enum AddressState {
14         Untested,
15         LowBlockCount,
16         HighBlockCount,
17         LowVersion,
18         BadVersion,
19         NotFullNode,
20         ProtocolViolation,
21         Timeout,
22         TimeoutDuringRequest,
23         Good,
24         WasGood,
25 }
26
27 #[derive(Hash, PartialEq, Eq)]
28 pub enum U64Setting {
29         ConnsPerSec,
30         RunTimeout,
31         WasGoodTimeout,
32         RescanInterval(AddressState),
33         MinProtocolVersion,
34 }
35
36 #[derive(Hash, PartialEq, Eq)]
37 pub enum StringSetting {
38         SubverRegex,
39 }
40
41 struct Node {
42         state: AddressState,
43         last_services: u64,
44         last_update: Instant,
45 }
46
47 struct Nodes {
48         good_node_services: HashMap<u8, HashSet<SocketAddr>>,
49         nodes_to_state: HashMap<SocketAddr, Node>,
50         state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
51 }
52 struct NodesMutRef<'a> {
53         good_node_services: &'a mut HashMap<u8, HashSet<SocketAddr>>,
54         nodes_to_state: &'a mut HashMap<SocketAddr, Node>,
55         state_next_scan: &'a mut HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
56
57 }
58 impl Nodes {
59         fn borrow_mut<'a>(&'a mut self) -> NodesMutRef<'a> {
60                 NodesMutRef {
61                         good_node_services: &mut self.good_node_services,
62                         nodes_to_state: &mut self.nodes_to_state,
63                         state_next_scan: &mut self.state_next_scan,
64                 }
65         }
66 }
67
68 pub struct Store {
69         u64_settings: RwLock<HashMap<U64Setting, u64>>,
70         subver_regex: RwLock<String>,
71         nodes: RwLock<Nodes>,
72 }
73
74 impl Store {
75         pub fn new() -> Store {
76                 let mut u64s = HashMap::with_capacity(15);
77                 u64s.insert(U64Setting::ConnsPerSec, 50);
78                 u64s.insert(U64Setting::RunTimeout, 120);
79                 u64s.insert(U64Setting::WasGoodTimeout, 21600);
80                 u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
81                 u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
82                 u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
83                 u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
84                 u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
85                 u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
86                 u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
87                 u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
88                 u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
89                 u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
90                 u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
91                 u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
92                 let mut state_vecs = HashMap::with_capacity(11);
93                 state_vecs.insert(AddressState::Untested, Vec::new());
94                 state_vecs.insert(AddressState::LowBlockCount, Vec::new());
95                 state_vecs.insert(AddressState::HighBlockCount, Vec::new());
96                 state_vecs.insert(AddressState::LowVersion, Vec::new());
97                 state_vecs.insert(AddressState::BadVersion, Vec::new());
98                 state_vecs.insert(AddressState::NotFullNode, Vec::new());
99                 state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
100                 state_vecs.insert(AddressState::Timeout, Vec::new());
101                 state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
102                 state_vecs.insert(AddressState::Good, Vec::new());
103                 state_vecs.insert(AddressState::WasGood, Vec::new());
104                 let mut good_node_services = HashMap::with_capacity(64);
105                 for i in 0..64 {
106                         good_node_services.insert(i, HashSet::new());
107                 }
108                 Store {
109                         u64_settings: RwLock::new(u64s),
110                         subver_regex: RwLock::new(".*".to_string()),
111                         nodes: RwLock::new(Nodes {
112                                 good_node_services,
113                                 nodes_to_state: HashMap::new(),
114                                 state_next_scan: state_vecs,
115                         }),
116                 }
117         }
118
119         pub fn get_u64(&self, setting: U64Setting) -> u64 {
120                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
121         }
122
123         pub fn get_node_count(&self, state: AddressState) -> usize {
124                 self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
125         }
126
127         pub fn get_string(&self, _setting: StringSetting) -> String {
128                 self.subver_regex.read().unwrap().clone()
129         }
130
131         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
132                 let mut nodes = self.nodes.write().unwrap();
133                 let cur_time = Instant::now();
134                 for &(_, ref addr) in addresses {
135                         if let Ok(socketaddr) = addr.socket_addr() {
136                                 match nodes.nodes_to_state.entry(socketaddr.clone()) {
137                                         hash_map::Entry::Vacant(e) => {
138                                                 e.insert(Node {
139                                                         state: AddressState::Untested,
140                                                         last_services: 0,
141                                                         last_update: cur_time,
142                                                 });
143                                                 nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
144                                         },
145                                         hash_map::Entry::Occupied(_) => {},
146                                 }
147                         } else {
148                                 //TODO: Handle onions
149                         }
150                 }
151         }
152
153         pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
154                 let mut nodes_lock = self.nodes.write().unwrap();
155                 let nodes = nodes_lock.borrow_mut();
156                 let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
157                 state_ref.last_update = Instant::now();
158                 if state_ref.state == AddressState::Good && state != AddressState::Good {
159                         state_ref.state = AddressState::WasGood;
160                         for i in 0..64 {
161                                 if state_ref.last_services & (1 << i) != 0 {
162                                         nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
163                                 }
164                         }
165                         state_ref.last_services = 0;
166                         nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((state_ref.last_update, addr));
167                 } else {
168                         state_ref.state = state;
169                         if state == AddressState::Good {
170                                 for i in 0..64 {
171                                         if services & (1 << i) != 0 && state_ref.last_services & (1 << i) == 0 {
172                                                 nodes.good_node_services.get_mut(&i).unwrap().insert(addr);
173                                         } else if services & (1 << i) == 0 && state_ref.last_services & (1 << i) != 0 {
174                                                 nodes.good_node_services.get_mut(&i).unwrap().remove(&addr);
175                                         }
176                                 }
177                         }
178                         nodes.state_next_scan.get_mut(&state).unwrap().push((state_ref.last_update, addr));
179                 }
180         }
181
182         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
183                 let mut res = Vec::with_capacity(600);
184                 let cur_time = Instant::now();
185                 let mut nodes = self.nodes.write().unwrap();
186                 for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
187                         let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
188                         let split_point = cmp::min(cmp::min(600 - res.len(), 60),
189                                         state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
190                         let mut new_nodes = state_nodes.split_off(split_point);
191                         mem::swap(&mut new_nodes, state_nodes);
192                         for (_, node) in new_nodes.drain(..) {
193                                 res.push(node);
194                         }
195                 }
196                 res.shuffle(&mut thread_rng());
197                 res
198         }
199 }