Detect some simple classes of evil spy nodes
[dnsseed-rust] / src / peer.rs
1 use std::cmp;
2 use std::net::SocketAddr;
3 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5 use bitcoin::consensus::encode;
6 use bitcoin::consensus::encode::{Decodable, Encodable};
7 use bitcoin::network::address::Address;
8 use bitcoin::network::constants::Network;
9 use bitcoin::network::message::{RawNetworkMessage, NetworkMessage};
10 use bitcoin::network::message_network::VersionMessage;
11
12 use tokio::prelude::*;
13 use tokio::codec;
14 use tokio::codec::Framed;
15 use tokio::net::TcpStream;
16 use tokio::timer::Delay;
17
18 use futures::sync::mpsc;
19
20 use crate::printer::Printer;
21
22 struct BytesCoder<'a>(&'a mut bytes::BytesMut);
23 impl<'a> std::io::Write for BytesCoder<'a> {
24         fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
25                 self.0.extend_from_slice(&b);
26                 Ok(b.len())
27         }
28         fn flush(&mut self) -> Result<(), std::io::Error> {
29                 Ok(())
30         }
31 }
32 struct BytesDecoder<'a> {
33         buf: &'a mut bytes::BytesMut,
34         pos: usize,
35 }
36 impl<'a> std::io::Read for BytesDecoder<'a> {
37         fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
38                 let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
39                 b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
40                 self.pos += copy_len;
41                 Ok(copy_len)
42         }
43 }
44
45 struct MsgCoder<'a>(&'a Printer);
46 impl<'a> codec::Decoder for MsgCoder<'a> {
47         type Item = NetworkMessage;
48         type Error = encode::Error;
49
50         fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, encode::Error> {
51                 let mut decoder = BytesDecoder {
52                         buf: bytes,
53                         pos: 0
54                 };
55                 match RawNetworkMessage::consensus_decode(&mut decoder) {
56                         Ok(res) => {
57                                 decoder.buf.advance(decoder.pos);
58                                 if res.magic == Network::Bitcoin.magic() {
59                                         Ok(Some(res.payload))
60                                 } else {
61                                         Err(encode::Error::UnexpectedNetworkMagic {
62                                                 expected: Network::Bitcoin.magic(),
63                                                 actual: res.magic
64                                         })
65                                 }
66                         },
67                         Err(e) => match e {
68                                 encode::Error::Io(_) => Ok(None),
69                                 encode::Error::UnrecognizedNetworkCommand(ref msg) => {
70                                         decoder.buf.advance(decoder.pos);
71                                         //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
72                                         if msg == "gnop" {
73                                                 Err(e)
74                                         } else { Ok(None) }
75                                 },
76                                 _ => {
77                                         self.0.add_line(format!("Error decoding message: {:?}", e), true);
78                                         Err(e)
79                                 },
80                         }
81                 }
82         }
83 }
84 impl<'a> codec::Encoder for MsgCoder<'a> {
85         type Item = NetworkMessage;
86         type Error = std::io::Error;
87
88         fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
89                 if let Err(_) = (RawNetworkMessage {
90                         magic: Network::Bitcoin.magic(),
91                         payload: msg,
92                 }.consensus_encode(&mut BytesCoder(res))) {
93                         //XXX
94                 }
95                 Ok(())
96         }
97 }
98
99 pub struct Peer {}
100 impl Peer {
101         pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=encode::Error>)> {
102                 let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
103                         future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
104                 });
105                 TcpStream::connect(&addr).select(connect_timeout)
106                         .or_else(move |_| {
107                                 Delay::new(Instant::now() + timeout / 10).then(|_| {
108                                         future::err(())
109                                 })
110                         }).and_then(move |stream| {
111                                 let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
112                                 let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
113                                 tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
114                                         .then(|_| {
115                                                 future::err(())
116                                         }));
117                                 let _ = sender.try_send(NetworkMessage::Version(VersionMessage {
118                                         version: 70015,
119                                         services: (1 << 3), // NODE_WITNESS
120                                         timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64,
121                                         receiver: Address::new(&addr, 0),
122                                         sender: Address::new(&"0.0.0.0:0".parse().unwrap(), 0),
123                                         nonce: 0xdeadbeef,
124                                         user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
125                                         start_height: 0,
126                                         relay: false,
127                                 }));
128                                 future::ok((sender, read))
129                         })
130         }
131 }