X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain.rs;h=ba206d60de29e42484cb84c0e0ddf5be6d6346dd;hb=ecd6d703b476c659c343d0e2c034c5b094645ce5;hp=a1d03d2b978533266f94c80f03d25e4196387c5b;hpb=591344ed66e57115f5847b995f7ec42dc9eaa871;p=dnsseed-rust diff --git a/src/main.rs b/src/main.rs index a1d03d2..ba206d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod printer; mod reader; mod peer; +mod bgp_client; mod timeout_stream; mod datastore; @@ -25,6 +26,7 @@ use peer::Peer; use datastore::{AddressState, Store, U64Setting, RegexSetting}; use timeout_stream::TimeoutStream; use rand::Rng; +use bgp_client::BGPClient; use tokio::prelude::*; use tokio::timer::Delay; @@ -69,6 +71,7 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { msg: (String::new(), false), request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()), })); + let err_peer_state = Arc::clone(&peer_state); let final_peer_state = Arc::clone(&peer_state); let peer = Delay::new(scan_time).then(move |_| { @@ -77,7 +80,19 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { Peer::new(node.clone(), Duration::from_secs(timeout), printer) }); tokio::spawn(peer.and_then(move |(mut write, read)| { - TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| { + TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(move |err| { + match err { + bitcoin::consensus::encode::Error::UnrecognizedNetworkCommand(ref msg) => { + // If we got here, we hit one of the explicitly disallowed messages indicating + // a bogus "node". + let mut state_lock = err_peer_state.lock().unwrap(); + state_lock.msg = (format!("(bad msg type {})", msg), true); + state_lock.fail_reason = AddressState::EvilNode; + }, + _ => {}, + } + () + }).for_each(move |msg| { let mut state_lock = peer_state.lock().unwrap(); macro_rules! check_set_flag { ($recvd_flag: ident, $msg: expr) => { { @@ -177,8 +192,17 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { check_set_flag!(recvd_block, "block"); return future::err(()); }, + NetworkMessage::Inv(invs) => { + for inv in invs { + if inv.inv_type == InvType::Transaction { + state_lock.fail_reason = AddressState::EvilNode; + state_lock.msg = ("due to unrequested inv tx".to_string(), true); + return future::err(()); + } + } + }, NetworkMessage::Tx(_) => { - state_lock.fail_reason = AddressState::ProtocolViolation; + state_lock.fail_reason = AddressState::EvilNode; state_lock.msg = ("due to unrequested transaction".to_string(), true); return future::err(()); }, @@ -202,8 +226,10 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { } } else { assert!(state_lock.fail_reason != AddressState::Good); - if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack && state_lock.recvd_pong { - if !state_lock.recvd_addrs { + if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack { + if !state_lock.recvd_pong { + state_lock.fail_reason = AddressState::TimeoutAwaitingPong; + } else if !state_lock.recvd_addrs { state_lock.fail_reason = AddressState::TimeoutAwaitingAddr; } else if !state_lock.recvd_block { state_lock.fail_reason = AddressState::TimeoutAwaitingBlock; @@ -211,8 +237,8 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { } let old_state = store.set_node_state(node, state_lock.fail_reason, 0); if (manual || old_state != state_lock.fail_reason) && state_lock.fail_reason == AddressState::TimeoutDuringRequest { - printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {}, pong: {})", - node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack, state_lock.recvd_pong), true); + printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {})", + node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack), true); } else if manual || (old_state != state_lock.fail_reason && state_lock.msg.0 != "" && state_lock.msg.1) { printer.add_line(format!("Updating {} from {} to {} {}", node, old_state.to_str(), state_lock.fail_reason.to_str(), &state_lock.msg.0), state_lock.msg.1); } @@ -221,7 +247,7 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { })); } -fn poll_dnsseeds() { +fn poll_dnsseeds(bgp_client: Arc) { tokio::spawn(future::lazy(|| { let printer = unsafe { PRINTER.as_ref().unwrap() }; let store = unsafe { DATA_STORE.as_ref().unwrap() }; @@ -234,9 +260,12 @@ fn poll_dnsseeds() { printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false); Delay::new(Instant::now() + Duration::from_secs(60)).then(|_| { let store = unsafe { DATA_STORE.as_ref().unwrap() }; - store.save_data().then(|_| { + let dns_future = store.write_dns(Arc::clone(&bgp_client)); + store.save_data().join(dns_future).then(|_| { if !START_SHUTDOWN.load(Ordering::Relaxed) { - poll_dnsseeds(); + poll_dnsseeds(bgp_client); + } else { + bgp_client.disconnect(); } future::ok(()) }) @@ -268,9 +297,10 @@ fn scan_net() { })); } -fn make_trusted_conn(trusted_sockaddr: SocketAddr) { +fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc) { let printer = unsafe { PRINTER.as_ref().unwrap() }; let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer); + let bgp_reload = Arc::clone(&bgp_client); tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| { printer.add_line("Connected to local peer".to_string(), false); let mut starting_height = 0; @@ -353,7 +383,7 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) { *unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block)); if !SCANNING.swap(true, Ordering::SeqCst) { scan_net(); - poll_dnsseeds(); + poll_dnsseeds(Arc::clone(&bgp_client)); } } }, @@ -371,15 +401,15 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr) { }).then(move |_: Result<(), ()>| { if !START_SHUTDOWN.load(Ordering::Relaxed) { printer.add_line("Lost connection from trusted peer".to_string(), true); - make_trusted_conn(trusted_sockaddr); + make_trusted_conn(trusted_sockaddr, bgp_reload); } future::ok(()) })); } fn main() { - if env::args().len() != 3 { - println!("USAGE: dnsseed-rust datastore localPeerAddress"); + if env::args().len() != 4 { + println!("USAGE: dnsseed-rust datastore localPeerAddress bgp_peer"); return; } @@ -398,15 +428,16 @@ fn main() { let mut args = env::args(); args.next(); let path = args.next().unwrap(); - let addr = args.next().unwrap(); + let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); + let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); Store::new(path).and_then(move |store| { unsafe { DATA_STORE = Some(Box::new(store)) }; let store = unsafe { DATA_STORE.as_ref().unwrap() }; unsafe { PRINTER = Some(Box::new(Printer::new(store))) }; - let trusted_sockaddr: SocketAddr = addr.parse().unwrap(); - make_trusted_conn(trusted_sockaddr); + let bgp_client = BGPClient::new(bgp_sockaddr, Duration::from_secs(600), unsafe { PRINTER.as_ref().unwrap() }); + make_trusted_conn(trusted_sockaddr, bgp_client); reader::read(store, unsafe { PRINTER.as_ref().unwrap() });