Update to latest rust-bitcoin
[dnsseed-rust] / src / peer.rs
1 use std::cmp;
2 use std::net::{SocketAddr, IpAddr};
3 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5 use bitcoin::consensus::encode;
6 use bitcoin::consensus::encode::{Decodable, Encodable};
7 use bitcoin::network::address::Address;
8 use bitcoin::network::constants::{Network, ServiceFlags};
9 use bitcoin::network::message::{RawNetworkMessage, NetworkMessage};
10 use bitcoin::network::message_network::VersionMessage;
11
12 use tokio::prelude::*;
13 use tokio::codec;
14 use tokio::codec::Framed;
15 use tokio::net::TcpStream;
16 use tokio::io::read_exact;
17 use tokio::timer::Delay;
18
19 use futures::sync::mpsc;
20
21 use crate::printer::Printer;
22
23 struct BytesCoder<'a>(&'a mut bytes::BytesMut);
24 impl<'a> std::io::Write for BytesCoder<'a> {
25         fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
26                 self.0.extend_from_slice(&b);
27                 Ok(b.len())
28         }
29         fn flush(&mut self) -> Result<(), std::io::Error> {
30                 Ok(())
31         }
32 }
33 struct BytesDecoder<'a> {
34         buf: &'a mut bytes::BytesMut,
35         pos: usize,
36 }
37 impl<'a> std::io::Read for BytesDecoder<'a> {
38         fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
39                 let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
40                 b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
41                 self.pos += copy_len;
42                 Ok(copy_len)
43         }
44 }
45
46 struct MsgCoder<'a>(&'a Printer);
47 impl<'a> codec::Decoder for MsgCoder<'a> {
48         type Item = Option<NetworkMessage>;
49         type Error = encode::Error;
50
51         fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<Option<NetworkMessage>>, encode::Error> {
52                 let mut decoder = BytesDecoder {
53                         buf: bytes,
54                         pos: 0
55                 };
56                 match RawNetworkMessage::consensus_decode(&mut decoder) {
57                         Ok(res) => {
58                                 decoder.buf.advance(decoder.pos);
59                                 if res.magic == Network::Bitcoin.magic() {
60                                         Ok(Some(Some(res.payload)))
61                                 } else {
62                                         Err(encode::Error::UnexpectedNetworkMagic {
63                                                 expected: Network::Bitcoin.magic(),
64                                                 actual: res.magic
65                                         })
66                                 }
67                         },
68                         Err(e) => match e {
69                                 encode::Error::Io(_) => Ok(None),
70                                 encode::Error::UnrecognizedNetworkCommand(ref msg) => {
71                                         decoder.buf.advance(decoder.pos);
72                                         //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
73                                         if msg == "gnop" {
74                                                 Err(e)
75                                         } else { Ok(Some(None)) }
76                                 },
77                                 _ => {
78                                         self.0.add_line(format!("Error decoding message: {:?}", e), true);
79                                         Err(e)
80                                 },
81                         }
82                 }
83         }
84 }
85 impl<'a> codec::Encoder for MsgCoder<'a> {
86         type Item = NetworkMessage;
87         type Error = std::io::Error;
88
89         fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
90                 if let Err(_) = (RawNetworkMessage {
91                         magic: Network::Bitcoin.magic(),
92                         payload: msg,
93                 }.consensus_encode(&mut BytesCoder(res))) {
94                         //XXX
95                 }
96                 Ok(())
97         }
98 }
99
100 // base32 encoder and tests stolen (transliterated) from Bitcoin Core
101 // Copyright (c) 2012-2019 The Bitcoin Core developers
102 // Distributed under the MIT software license, see
103 // http://www.opensource.org/licenses/mit-license.php.
104 fn encode_base32(inp: &[u8]) -> String {
105         let mut ret = String::with_capacity(((inp.len() + 4) / 5) * 8);
106
107         let alphabet = "abcdefghijklmnopqrstuvwxyz234567";
108         let mut acc: u16 = 0;
109         let mut bits: u8 = 0;
110         for i in inp {
111                 acc = ((acc << 8) | *i as u16) & ((1 << (8 + 5 - 1)) - 1);
112                 bits += 8;
113                 while bits >= 5 {
114                         bits -= 5;
115                         let idx = ((acc >> bits) & ((1 << 5) - 1)) as usize;
116                         ret += &alphabet[idx..idx + 1];
117                 }
118         }
119         if bits != 0 {
120                 let idx = ((acc << (5 - bits)) & ((1 << 5) - 1)) as usize;
121                 ret += &alphabet[idx..idx + 1];
122         }
123         while ret.len() % 8 != 0 { ret += "=" };
124         return ret;
125 }
126
127 #[test]
128 fn test_encode_base32() {
129         let tests_in = ["","f","fo","foo","foob","fooba","foobar"];
130         let tests_out = ["","my======","mzxq====","mzxw6===","mzxw6yq=","mzxw6ytb","mzxw6ytboi======"];
131         for (inp, out) in tests_in.iter().zip(tests_out.iter()) {
132                 assert_eq!(&encode_base32(inp.as_bytes()), out);
133         }
134         // My seednode's onion addr:
135         assert_eq!(&encode_base32(&[0x6a, 0x8b, 0xd2, 0x78, 0x3f, 0x7a, 0xf8, 0x92, 0x8f, 0x80]), "nkf5e6b7pl4jfd4a");
136 }
137
138 /// Note that this should only be used for really small chunks, ie small enough to *definitely* fit
139 /// in the outbound TCP buffer, and shouldn't (practically) block.
140 macro_rules! try_write_small {
141         ($sock: expr, $obj: expr) => { {
142                 match $sock.write_all($obj) {
143                         Ok(()) => {},
144                         Err(e) => return future::Either::A(future::err(e)),
145                 }
146         } }
147 }
148
149 pub struct Peer {}
150 impl Peer {
151         pub fn new(addr: SocketAddr, tor_proxy: &SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=Option<NetworkMessage>, Error=encode::Error>)> {
152                 let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
153                         future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
154                 });
155                 match addr.ip() {
156                         IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => {
157                                 future::Either::A(connect_timeout.select(TcpStream::connect(&tor_proxy)
158                                         .and_then(move |mut stream: TcpStream| {
159                                                 try_write_small!(stream, &[5u8, 1u8, 0u8]); // SOCKS5 with 1 method and no auth
160                                                 future::Either::B(read_exact(stream, [0u8; 2]).and_then(move |(mut stream, response)| {
161                                                         if response != [5, 0] { // SOCKS5 with no auth successful
162                                                                 future::Either::B(future::Either::A(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate"))))
163                                                         } else {
164                                                                 let hostname = encode_base32(&v6addr.octets()[6..]) + ".onion";
165                                                                 let mut connect_msg = Vec::with_capacity(7 + hostname.len());
166                                                                 // SOCKS5 command CONNECT (+ reserved byte) to hostname with given len
167                                                                 connect_msg.extend_from_slice(&[5u8, 1u8, 0u8, 3u8, hostname.len() as u8]);
168                                                                 connect_msg.extend_from_slice(hostname.as_bytes());
169                                                                 connect_msg.push((addr.port() >> 8) as u8);
170                                                                 connect_msg.push((addr.port() >> 0) as u8);
171                                                                 try_write_small!(stream, &connect_msg);
172                                                                 future::Either::B(future::Either::B(read_exact(stream, [0u8; 4]).and_then(move |(stream, response)| {
173                                                                         if response[..3] != [5, 0, 0] {
174                                                                                 future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate")))
175                                                                         } else {
176                                                                                 if response[3] == 1 {
177                                                                                         future::Either::A(future::Either::A(read_exact(stream, [0; 6]).and_then(|(stream, _)| future::ok(stream))))
178                                                                                 } else if response[3] == 4 {
179                                                                                         future::Either::A(future::Either::B(read_exact(stream, [0; 18]).and_then(|(stream, _)| future::ok(stream))))
180                                                                                 } else {
181                                                                                         future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Bogus proxy address value")))
182                                                                                 }
183                                                                         }
184                                                                 })))
185                                                         }
186                                                 }))
187                                         })
188                                 ).and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e)))
189                         },
190                         _ => future::Either::B(connect_timeout.select(TcpStream::connect(&addr))
191                                 .and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e))),
192                 }.and_then(move |stream| {
193                                 let (write, read) = Framed::new(stream, MsgCoder(printer)).split();
194                                 let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
195                                 tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
196                                         .then(|_| {
197                                                 future::err(())
198                                         }));
199                                 let _ = sender.try_send(NetworkMessage::Version(VersionMessage {
200                                         version: 70015,
201                                         services: ServiceFlags::WITNESS,
202                                         timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64,
203                                         receiver: Address::new(&addr, ServiceFlags::NONE),
204                                         sender: Address::new(&"0.0.0.0:0".parse().unwrap(), ServiceFlags::WITNESS),
205                                         nonce: 0xdeadbeef,
206                                         user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
207                                         start_height: 0,
208                                         relay: false,
209                                 }));
210                                 future::ok((sender, read))
211                         })
212                 .or_else(move |_| {
213                         Delay::new(Instant::now() + timeout / 10).then(|_| future::err(()))
214                 })
215         }
216 }