From 820109e4f9b0b820a4c7c0f75670000548e6e760 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 26 Aug 2019 01:02:22 -0400 Subject: [PATCH] Implement connecting to Tor peers over SOCKS5 in Peer --- src/main.rs | 15 +++++--- src/peer.rs | 103 +++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 104 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index b868570..c86529e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,6 +37,7 @@ static mut HEADER_MAP: Option>>> = None; static mut HEIGHT_MAP: Option>>> = None; static mut DATA_STORE: Option> = None; static mut PRINTER: Option> = None; +static mut TOR_PROXY: Option = None; pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false); static SCANNING: AtomicBool = AtomicBool::new(false); @@ -77,7 +78,7 @@ pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { let peer = Delay::new(scan_time).then(move |_| { printer.set_stat(Stat::NewConnection); let timeout = store.get_u64(U64Setting::RunTimeout); - Peer::new(node.clone(), Duration::from_secs(timeout), printer) + Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer) }); tokio::spawn(peer.and_then(move |(mut write, read)| { TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(move |err| { @@ -288,7 +289,7 @@ fn scan_net() { scan_node(iter_time, node, false); iter_time += per_iter_time; } - Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(1))).then(|_| { + Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(1))).then(move |_| { if !START_SHUTDOWN.load(Ordering::Relaxed) { scan_net(); } @@ -299,7 +300,7 @@ fn scan_net() { fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc) { let printer = unsafe { PRINTER.as_ref().unwrap() }; - let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer); + let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer); let bgp_reload = Arc::clone(&bgp_client); tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| { printer.add_line("Connected to local peer".to_string(), false); @@ -408,8 +409,8 @@ fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc) { } fn main() { - if env::args().len() != 4 { - println!("USAGE: dnsseed-rust datastore localPeerAddress bgp_peer"); + if env::args().len() != 5 { + println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer"); return; } @@ -429,6 +430,10 @@ fn main() { args.next(); let path = args.next().unwrap(); let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); + + let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); + unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); } + let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); Store::new(path).and_then(move |store| { diff --git a/src/peer.rs b/src/peer.rs index 8e1c5e9..137d92d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,5 +1,5 @@ use std::cmp; -use std::net::SocketAddr; +use std::net::{SocketAddr, IpAddr}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use bitcoin::consensus::encode; @@ -13,6 +13,7 @@ use tokio::prelude::*; use tokio::codec; use tokio::codec::Framed; use tokio::net::TcpStream; +use tokio::io::read_exact; use tokio::timer::Delay; use futures::sync::mpsc; @@ -96,19 +97,100 @@ impl<'a> codec::Encoder for MsgCoder<'a> { } } +// base32 encoder and tests stolen (transliterated) from Bitcoin Core +// Copyright (c) 2012-2019 The Bitcoin Core developers +// Distributed under the MIT software license, see +// http://www.opensource.org/licenses/mit-license.php. +fn encode_base32(inp: &[u8]) -> String { + let mut ret = String::with_capacity(((inp.len() + 4) / 5) * 8); + + let alphabet = "abcdefghijklmnopqrstuvwxyz234567"; + let mut acc: u16 = 0; + let mut bits: u8 = 0; + for i in inp { + acc = ((acc << 8) | *i as u16) & ((1 << (8 + 5 - 1)) - 1); + bits += 8; + while bits >= 5 { + bits -= 5; + let idx = ((acc >> bits) & ((1 << 5) - 1)) as usize; + ret += &alphabet[idx..idx + 1]; + } + } + if bits != 0 { + let idx = ((acc << (5 - bits)) & ((1 << 5) - 1)) as usize; + ret += &alphabet[idx..idx + 1]; + } + while ret.len() % 8 != 0 { ret += "=" }; + return ret; +} + +#[test] +fn test_encode_base32() { + let tests_in = ["","f","fo","foo","foob","fooba","foobar"]; + let tests_out = ["","my======","mzxq====","mzxw6===","mzxw6yq=","mzxw6ytb","mzxw6ytboi======"]; + for (inp, out) in tests_in.iter().zip(tests_out.iter()) { + assert_eq!(&encode_base32(inp.as_bytes()), out); + } + // My seednode's onion addr: + assert_eq!(&encode_base32(&[0x6a, 0x8b, 0xd2, 0x78, 0x3f, 0x7a, 0xf8, 0x92, 0x8f, 0x80]), "nkf5e6b7pl4jfd4a"); +} + +/// Note that this should only be used for really small chunks, ie small enough to *definitely* fit +/// in the outbound TCP buffer, and shouldn't (practically) block. +macro_rules! try_write_small { + ($sock: expr, $obj: expr) => { { + match $sock.write_all($obj) { + Ok(()) => {}, + Err(e) => return future::Either::A(future::err(e)), + } + } } +} + pub struct Peer {} impl Peer { - pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future, impl Stream)> { + pub fn new(addr: SocketAddr, tor_proxy: &SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future, impl Stream)> { let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| { future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached")) }); - TcpStream::connect(&addr).select(connect_timeout) - .or_else(move |_| { - Delay::new(Instant::now() + timeout / 10).then(|_| { - future::err(()) - }) - }).and_then(move |stream| { - let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split(); + match addr.ip() { + IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => { + future::Either::A(connect_timeout.select(TcpStream::connect(&tor_proxy) + .and_then(move |mut stream: TcpStream| { + try_write_small!(stream, &[5u8, 1u8, 0u8]); // SOCKS5 with 1 method and no auth + future::Either::B(read_exact(stream, [0u8; 2]).and_then(move |(mut stream, response)| { + if response != [5, 0] { // SOCKS5 with no auth successful + future::Either::B(future::Either::A(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate")))) + } else { + let hostname = encode_base32(&v6addr.octets()[6..]) + ".onion"; + let mut connect_msg = Vec::with_capacity(7 + hostname.len()); + // SOCKS5 command CONNECT (+ reserved byte) to hostname with given len + connect_msg.extend_from_slice(&[5u8, 1u8, 0u8, 3u8, hostname.len() as u8]); + connect_msg.extend_from_slice(hostname.as_bytes()); + connect_msg.push((addr.port() >> 8) as u8); + connect_msg.push((addr.port() >> 0) as u8); + try_write_small!(stream, &connect_msg); + future::Either::B(future::Either::B(read_exact(stream, [0u8; 4]).and_then(move |(stream, response)| { + if response[..3] != [5, 0, 0] { + future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate"))) + } else { + if response[3] == 1 { + future::Either::A(future::Either::A(read_exact(stream, [0; 6]).and_then(|(stream, _)| future::ok(stream)))) + } else if response[3] == 4 { + future::Either::A(future::Either::B(read_exact(stream, [0; 18]).and_then(|(stream, _)| future::ok(stream)))) + } else { + future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Bogus proxy address value"))) + } + } + }))) + } + })) + }) + ).and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e))) + }, + _ => future::Either::B(connect_timeout.select(TcpStream::connect(&addr)) + .and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e))), + }.and_then(move |stream| { + let (write, read) = Framed::new(stream, MsgCoder(printer)).split(); let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver) .then(|_| { @@ -127,5 +209,8 @@ impl Peer { })); future::ok((sender, read)) }) + .or_else(move |_| { + Delay::new(Instant::now() + timeout / 10).then(|_| future::err(())) + }) } } -- 2.39.5