Detect some simple classes of evil spy nodes
authorMatt Corallo <git@bluematt.me>
Wed, 21 Aug 2019 22:29:38 +0000 (18:29 -0400)
committerMatt Corallo <git@bluematt.me>
Wed, 21 Aug 2019 22:31:52 +0000 (18:31 -0400)
src/datastore.rs
src/main.rs
src/peer.rs

index 4f59305b30f5cf17e634b493f8742700b75a411f..9032b09c056b86d127ac29e7e31c543ca9099981 100644 (file)
@@ -32,6 +32,7 @@ pub enum AddressState {
        TimeoutAwaitingBlock,
        Good,
        WasGood,
+       EvilNode,
 }
 
 impl AddressState {
@@ -51,6 +52,7 @@ impl AddressState {
                        0xb => Some(AddressState::TimeoutAwaitingBlock),
                        0xc => Some(AddressState::Good),
                        0xd => Some(AddressState::WasGood),
+                       0xe => Some(AddressState::EvilNode),
                        _   => None,
                }
        }
@@ -71,6 +73,7 @@ impl AddressState {
                        AddressState::TimeoutAwaitingBlock => 11,
                        AddressState::Good => 12,
                        AddressState::WasGood => 13,
+                       AddressState::EvilNode => 14,
                }
        }
 
@@ -90,11 +93,12 @@ impl AddressState {
                        AddressState::TimeoutAwaitingBlock => "Timeout Awaiting Block",
                        AddressState::Good => "Good",
                        AddressState::WasGood => "Was Good",
+                       AddressState::EvilNode => "Evil Node",
                }
        }
 
        pub const fn get_count() -> u8 {
-               14
+               15
        }
 }
 
@@ -184,6 +188,7 @@ impl Store {
                        u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), try_read!(l, u64));
                        u64s.insert(U64Setting::RescanInterval(AddressState::Good), try_read!(l, u64));
                        u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), try_read!(l, u64));
+                       u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), try_read!(l, u64));
                        future::ok((u64s, try_read!(l, Regex)))
                }).or_else(|_| -> future::FutureResult<(HashMap<U64Setting, u64>, Regex), ()> {
                        let mut u64s = HashMap::with_capacity(15);
@@ -204,6 +209,7 @@ impl Store {
                        u64s.insert(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock), 3600);
                        u64s.insert(U64Setting::RescanInterval(AddressState::Good), 1800);
                        u64s.insert(U64Setting::RescanInterval(AddressState::WasGood), 1800);
+                       u64s.insert(U64Setting::RescanInterval(AddressState::EvilNode), 315360000);
                        u64s.insert(U64Setting::MinProtocolVersion, 70002);
                        future::ok((u64s, Regex::new(".*").unwrap()))
                });
@@ -378,7 +384,7 @@ impl Store {
        pub fn save_data(&'static self) -> impl Future<Item=(), Error=()> {
                let settings_file = self.store.clone() + "/settings";
                let settings_future = File::create(settings_file.clone() + ".tmp").and_then(move |f| {
-                       let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+                       let settings_string = format!("{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
                                self.get_u64(U64Setting::ConnsPerSec),
                                self.get_u64(U64Setting::RunTimeout),
                                self.get_u64(U64Setting::WasGoodTimeout),
@@ -397,6 +403,7 @@ impl Store {
                                self.get_u64(U64Setting::RescanInterval(AddressState::TimeoutAwaitingBlock)),
                                self.get_u64(U64Setting::RescanInterval(AddressState::Good)),
                                self.get_u64(U64Setting::RescanInterval(AddressState::WasGood)),
+                               self.get_u64(U64Setting::RescanInterval(AddressState::EvilNode)),
                                self.get_regex(RegexSetting::SubverRegex).as_str());
                        write_all(f, settings_string).and_then(|(mut f, _)| {
                                f.poll_sync_all()
index 0f2838f68f48e29842966283ccd11d8cccf54674..4bf93510c5e3aaa41ba8fb38e8c4356965f3af73 100644 (file)
@@ -69,6 +69,7 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
                msg: (String::new(), false),
                request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()),
        }));
+       let err_peer_state = Arc::clone(&peer_state);
        let final_peer_state = Arc::clone(&peer_state);
 
        let peer = Delay::new(scan_time).then(move |_| {
@@ -77,7 +78,19 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
                Peer::new(node.clone(), Duration::from_secs(timeout), printer)
        });
        tokio::spawn(peer.and_then(move |(mut write, read)| {
-               TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(|_| { () }).for_each(move |msg| {
+               TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(move |err| {
+                       match err {
+                               bitcoin::consensus::encode::Error::UnrecognizedNetworkCommand(ref msg) => {
+                                       // If we got here, we hit one of the explicitly disallowed messages indicating
+                                       // a bogus "node".
+                                       let mut state_lock = err_peer_state.lock().unwrap();
+                                       state_lock.msg = (format!("(bad msg type {})", msg), true);
+                                       state_lock.fail_reason = AddressState::EvilNode;
+                               },
+                               _ => {},
+                       }
+                       ()
+               }).for_each(move |msg| {
                        let mut state_lock = peer_state.lock().unwrap();
                        macro_rules! check_set_flag {
                                ($recvd_flag: ident, $msg: expr) => { {
@@ -177,8 +190,17 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
                                        check_set_flag!(recvd_block, "block");
                                        return future::err(());
                                },
+                               NetworkMessage::Inv(invs) => {
+                                       for inv in invs {
+                                               if inv.inv_type == InvType::Transaction {
+                                                       state_lock.fail_reason = AddressState::EvilNode;
+                                                       state_lock.msg = ("due to unrequested inv tx".to_string(), true);
+                                                       return future::err(());
+                                               }
+                                       }
+                               },
                                NetworkMessage::Tx(_) => {
-                                       state_lock.fail_reason = AddressState::ProtocolViolation;
+                                       state_lock.fail_reason = AddressState::EvilNode;
                                        state_lock.msg = ("due to unrequested transaction".to_string(), true);
                                        return future::err(());
                                },
index 9276d082d17bd8fe48281a9dc7d464e82dfc75f5..8e1c5e9eddb37ab768cae3f1c14ed4c87530f868 100644 (file)
@@ -45,9 +45,9 @@ impl<'a> std::io::Read for BytesDecoder<'a> {
 struct MsgCoder<'a>(&'a Printer);
 impl<'a> codec::Decoder for MsgCoder<'a> {
        type Item = NetworkMessage;
-       type Error = std::io::Error;
+       type Error = encode::Error;
 
-       fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, std::io::Error> {
+       fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, encode::Error> {
                let mut decoder = BytesDecoder {
                        buf: bytes,
                        pos: 0
@@ -58,19 +58,24 @@ impl<'a> codec::Decoder for MsgCoder<'a> {
                                if res.magic == Network::Bitcoin.magic() {
                                        Ok(Some(res.payload))
                                } else {
-                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic"))
+                                       Err(encode::Error::UnexpectedNetworkMagic {
+                                               expected: Network::Bitcoin.magic(),
+                                               actual: res.magic
+                                       })
                                }
                        },
                        Err(e) => match e {
                                encode::Error::Io(_) => Ok(None),
-                               encode::Error::UnrecognizedNetworkCommand(_msg) => {
+                               encode::Error::UnrecognizedNetworkCommand(ref msg) => {
                                        decoder.buf.advance(decoder.pos);
                                        //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
-                                       Ok(None)
+                                       if msg == "gnop" {
+                                               Err(e)
+                                       } else { Ok(None) }
                                },
                                _ => {
                                        self.0.add_line(format!("Error decoding message: {:?}", e), true);
-                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
+                                       Err(e)
                                },
                        }
                }
@@ -93,7 +98,7 @@ impl<'a> codec::Encoder for MsgCoder<'a> {
 
 pub struct Peer {}
 impl Peer {
-       pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=std::io::Error>)> {
+       pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=encode::Error>)> {
                let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
                        future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
                });
@@ -118,7 +123,7 @@ impl Peer {
                                        nonce: 0xdeadbeef,
                                        user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
                                        start_height: 0,
-                                       relay: true,
+                                       relay: false,
                                }));
                                future::ok((sender, read))
                        })