From 2cf23b8a5638990cbf244e7aaf0b9b8b47f24f2e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 21 Aug 2019 18:29:38 -0400 Subject: [PATCH] Detect some simple classes of evil spy nodes --- src/datastore.rs | 11 +++++++++-- src/main.rs | 26 ++++++++++++++++++++++++-- src/peer.rs | 21 +++++++++++++-------- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/datastore.rs b/src/datastore.rs index 4f59305..9032b09 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -32,6 +32,7 @@ pub enum AddressState { TimeoutAwaitingBlock, Good, WasGood, + EvilNode, } impl AddressState { @@ -51,6 +52,7 @@ impl AddressState { 0xb => Some(AddressState::TimeoutAwaitingBlock), 0xc => Some(AddressState::Good), 0xd => Some(AddressState::WasGood), + 0xe => Some(AddressState::EvilNode), _ => None, } } @@ -71,6 +73,7 @@ impl AddressState { AddressState::TimeoutAwaitingBlock => 11, AddressState::Good => 12, AddressState::WasGood => 13, + AddressState::EvilNode => 14, } } @@ -90,11 +93,12 @@ impl AddressState { AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block", AddressState::Good => "Good", AddressState::WasGood => "Was Good", + AddressState::EvilNode => "Evil Node", } } pub const fn get_count() -> u8 { - 14 + 15 } } @@ -184,6 +188,7 @@ impl Store { u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64)); u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64)); u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64)); + u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64)); future::ok((u64s, try_read!(l, Regex))) }).or_else(|_| -> future::FutureResult<(HashMap, Regex), ()> { let mut u64s = HashMap::with_capacity(15); @@ -204,6 +209,7 @@ impl Store { u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600); u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800); u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800); + u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000); u64s.insert(U64Setting::MinProtocolVersion, 70002); future::ok((u64s, Regex::new(".*").unwrap())) }); @@ -378,7 +384,7 @@ impl Store { pub fn save_data(&'static self) -> impl Future { let settings_file = self.store.clone() + "/settings"; let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| { - let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", self.get_u64(U64Setting::ConnsPerSec), self.get_u64(U64Setting::RunTimeout), self.get_u64(U64Setting::WasGoodTimeout), @@ -397,6 +403,7 @@ impl Store { self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)), self.get_u64(U64Setting::RescanInterval(AddressState::Good)), self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)), + self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)), self.get_regex(RegexSetting::SubverRegex).as_str()); write_all(f, settings_string).and_then(|(mut f, _)| { f.poll_sync_all() diff --git a/src/main.rs b/src/main.rs index 0f2838f..4bf9351 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,6 +69,7 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { msg: (String::new(), false), request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()), })); + let err_peer_state = Arc::clone(&peer_state); let final_peer_state = Arc::clone(&peer_state); let peer = Delay::new(scan_time).then(move |_| { @@ -77,7 +78,19 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { Peer::new(node.clone(), Duration::from_secs(timeout), printer) }); tokio::spawn(peer.and_then(move |(mut write, read)| { - TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| { + TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(move |err| { + match err { + bitcoin::consensus::encode::Error::UnrecognizedNetworkCommand(ref msg) => { + // If we got here, we hit one of the explicitly disallowed messages indicating + // a bogus "node". + let mut state_lock = err_peer_state.lock().unwrap(); + state_lock.msg = (format!("(bad msg type {})", msg), true); + state_lock.fail_reason = AddressState::EvilNode; + }, + _ => {}, + } + () + }).for_each(move |msg| { let mut state_lock = peer_state.lock().unwrap(); macro_rules! check_set_flag { ($recvd_flag: ident, $msg: expr) => { { @@ -177,8 +190,17 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { check_set_flag!(recvd_block, "block"); return future::err(()); }, + NetworkMessage::Inv(invs) => { + for inv in invs { + if inv.inv_type == InvType::Transaction { + state_lock.fail_reason = AddressState::EvilNode; + state_lock.msg = ("due to unrequested inv tx".to_string(), true); + return future::err(()); + } + } + }, NetworkMessage::Tx(_) => { - state_lock.fail_reason = AddressState::ProtocolViolation; + state_lock.fail_reason = AddressState::EvilNode; state_lock.msg = ("due to unrequested transaction".to_string(), true); return future::err(()); }, diff --git a/src/peer.rs b/src/peer.rs index 9276d08..8e1c5e9 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -45,9 +45,9 @@ impl<'a> std::io::Read for BytesDecoder<'a> { struct MsgCoder<'a>(&'a Printer); impl<'a> codec::Decoder for MsgCoder<'a> { type Item = NetworkMessage; - type Error = std::io::Error; + type Error = encode::Error; - fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result, std::io::Error> { + fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result, encode::Error> { let mut decoder = BytesDecoder { buf: bytes, pos: 0 @@ -58,19 +58,24 @@ impl<'a> codec::Decoder for MsgCoder<'a> { if res.magic == Network::Bitcoin.magic() { Ok(Some(res.payload)) } else { - Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic")) + Err(encode::Error::UnexpectedNetworkMagic { + expected: Network::Bitcoin.magic(), + actual: res.magic + }) } }, Err(e) => match e { encode::Error::Io(_) => Ok(None), - encode::Error::UnrecognizedNetworkCommand(_msg) => { + encode::Error::UnrecognizedNetworkCommand(ref msg) => { decoder.buf.advance(decoder.pos); //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true); - Ok(None) + if msg == "gnop" { + Err(e) + } else { Ok(None) } }, _ => { self.0.add_line(format!("Error decoding message: {:?}", e), true); - Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + Err(e) }, } } @@ -93,7 +98,7 @@ impl<'a> codec::Encoder for MsgCoder<'a> { pub struct Peer {} impl Peer { - pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future, impl Stream)> { + pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future, impl Stream)> { let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| { future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached")) }); @@ -118,7 +123,7 @@ impl Peer { nonce: 0xdeadbeef, user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(), start_height: 0, - relay: true, + relay: false, })); future::ok((sender, read)) }) -- 2.39.5