Initial checkin
[dnsseed-rust] / src / datastore.rs
1 use std::{cmp, mem};
2 use std::collections::{HashMap, hash_map};
3 use std::sync::RwLock;
4 use std::net::SocketAddr;
5 use std::time::{Duration, Instant};
6
7 use bitcoin::network::address::Address;
8
9 use rand::thread_rng;
10 use rand::seq::SliceRandom;
11
12 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
13 pub enum AddressState {
14         Untested,
15         LowBlockCount,
16         HighBlockCount,
17         LowVersion,
18         BadVersion,
19         NotFullNode,
20         ProtocolViolation,
21         Timeout,
22         TimeoutDuringRequest,
23         Good,
24         WasGood,
25 }
26
27 #[derive(Hash, PartialEq, Eq)]
28 pub enum U64Setting {
29         ConnsPerSec,
30         RunTimeout,
31         WasGoodTimeout,
32         RescanInterval(AddressState),
33         MinProtocolVersion,
34 }
35
36 #[derive(Hash, PartialEq, Eq)]
37 pub enum StringSetting {
38         SubverRegex,
39 }
40
41 struct Nodes {
42         good_node_services: HashMap<u8, Vec<SocketAddr>>,
43         nodes_to_state: HashMap<SocketAddr, AddressState>,
44         state_next_scan: HashMap<AddressState, Vec<(Instant, SocketAddr)>>,
45 }
46
47 pub struct Store {
48         u64_settings: RwLock<HashMap<U64Setting, u64>>,
49         subver_regex: RwLock<String>,
50         nodes: RwLock<Nodes>,
51 }
52
53 impl Store {
54         pub fn new() -> Store {
55                 let mut u64s = HashMap::with_capacity(15);
56                 u64s.insert(U64Setting::ConnsPerSec, 50);
57                 u64s.insert(U64Setting::RunTimeout, 120);
58                 u64s.insert(U64Setting::WasGoodTimeout, 21600);
59                 u64s.insert(U64Setting::RescanInterval(AddressState::Untested), 0);
60                 u64s.insert(U64Setting::RescanInterval(AddressState::LowBlockCount), 3600);
61                 u64s.insert(U64Setting::RescanInterval(AddressState::HighBlockCount), 7200);
62                 u64s.insert(U64Setting::RescanInterval(AddressState::LowVersion), 21600);
63                 u64s.insert(U64Setting::RescanInterval(AddressState::BadVersion), 21600);
64                 u64s.insert(U64Setting::RescanInterval(AddressState::NotFullNode), 86400);
65                 u64s.insert(U64Setting::RescanInterval(AddressState::ProtocolViolation), 86400);
66                 u64s.insert(U64Setting::RescanInterval(AddressState::Timeout), 86400);
67                 u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutDuringRequest), 21600);
68                 u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
69                 u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
70                 u64s.insert(U64Setting::MinProtocolVersion, 10000); //XXX
71                 let mut state_vecs = HashMap::with_capacity(11);
72                 state_vecs.insert(AddressState::Untested, Vec::new());
73                 state_vecs.insert(AddressState::LowBlockCount, Vec::new());
74                 state_vecs.insert(AddressState::HighBlockCount, Vec::new());
75                 state_vecs.insert(AddressState::LowVersion, Vec::new());
76                 state_vecs.insert(AddressState::BadVersion, Vec::new());
77                 state_vecs.insert(AddressState::NotFullNode, Vec::new());
78                 state_vecs.insert(AddressState::ProtocolViolation, Vec::new());
79                 state_vecs.insert(AddressState::Timeout, Vec::new());
80                 state_vecs.insert(AddressState::TimeoutDuringRequest, Vec::new());
81                 state_vecs.insert(AddressState::Good, Vec::new());
82                 state_vecs.insert(AddressState::WasGood, Vec::new());
83                 let mut good_node_services = HashMap::with_capacity(64);
84                 for i in 0..64 {
85                         good_node_services.insert(i, Vec::new());
86                 }
87                 Store {
88                         u64_settings: RwLock::new(u64s),
89                         subver_regex: RwLock::new(".*".to_string()),
90                         nodes: RwLock::new(Nodes {
91                                 good_node_services,
92                                 nodes_to_state: HashMap::new(),
93                                 state_next_scan: state_vecs,
94                         }),
95                 }
96         }
97
98         pub fn get_u64(&self, setting: U64Setting) -> u64 {
99                 *self.u64_settings.read().unwrap().get(&setting).unwrap()
100         }
101
102         pub fn get_node_count(&self, state: AddressState) -> usize {
103                 self.nodes.read().unwrap().state_next_scan.get(&state).unwrap().len()
104         }
105
106         pub fn get_string(&self, _setting: StringSetting) -> String {
107                 self.subver_regex.read().unwrap().clone()
108         }
109
110         pub fn add_fresh_nodes(&self, addresses: &Vec<(u32, Address)>) {
111                 let mut nodes = self.nodes.write().unwrap();
112                 let cur_time = Instant::now();
113                 for &(_, ref addr) in addresses {
114                         if let Ok(socketaddr) = addr.socket_addr() {
115                                 match nodes.nodes_to_state.entry(socketaddr.clone()) {
116                                         hash_map::Entry::Vacant(e) => {
117                                                 e.insert(AddressState::Untested);
118                                                 nodes.state_next_scan.get_mut(&AddressState::Untested).unwrap().push((cur_time, socketaddr));
119                                         },
120                                         hash_map::Entry::Occupied(_) => {},
121                                 }
122                         } else {
123                                 //TODO: Handle onions
124                         }
125                 }
126         }
127
128         pub fn set_node_state(&self, addr: SocketAddr, state: AddressState, services: u64) {
129                 let mut nodes = self.nodes.write().unwrap();
130                 let state_ref = nodes.nodes_to_state.get_mut(&addr).unwrap();
131                 if *state_ref == AddressState::Good && state != AddressState::Good {
132                         *state_ref = AddressState::WasGood;
133                         nodes.state_next_scan.get_mut(&AddressState::WasGood).unwrap().push((Instant::now(), addr));
134                 } else {
135                         *state_ref = state;
136                         nodes.state_next_scan.get_mut(&state).unwrap().push((Instant::now(), addr));
137                 }
138                 if state == AddressState::Good {
139
140                 }
141         }
142
143         pub fn get_next_scan_nodes(&self) -> Vec<SocketAddr> {
144                 let mut res = Vec::with_capacity(600);
145                 let cur_time = Instant::now();
146                 let mut nodes = self.nodes.write().unwrap();
147                 for (state, state_nodes) in nodes.state_next_scan.iter_mut() {
148                         let cmp_time = cur_time - Duration::from_secs(self.get_u64(U64Setting::RescanInterval(*state)));
149                         let split_point = cmp::min(cmp::min(600 - res.len(), 60),
150                                         state_nodes.binary_search_by(|a| a.0.cmp(&cmp_time)).unwrap_or_else(|idx| idx));
151                         let mut new_nodes = state_nodes.split_off(split_point);
152                         mem::swap(&mut new_nodes, state_nodes);
153                         for (_, node) in new_nodes.drain(..) {
154                                 res.push(node);
155                         }
156                 }
157                 res.shuffle(&mut thread_rng());
158                 res
159         }
160 }