static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, sha256d::Hash>>>> = None;
static mut DATA_STORE: Option<Box<Store>> = None;
static mut PRINTER: Option<Box<Printer>> = None;
+static mut TOR_PROXY: Option<SocketAddr> = None;
pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
static SCANNING: AtomicBool = AtomicBool::new(false);
let peer = Delay::new(scan_time).then(move |_| {
printer.set_stat(Stat::NewConnection);
let timeout = store.get_u64(U64Setting::RunTimeout);
- Peer::new(node.clone(), Duration::from_secs(timeout), printer)
+ Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer)
});
tokio::spawn(peer.and_then(move |(mut write, read)| {
TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))).map_err(move |err| {
scan_node(iter_time, node, false);
iter_time += per_iter_time;
}
- Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(1))).then(|_| {
+ Delay::new(cmp::max(iter_time, start_time + Duration::from_secs(1))).then(move |_| {
if !START_SHUTDOWN.load(Ordering::Relaxed) {
scan_net();
}
fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc<BGPClient>) {
let printer = unsafe { PRINTER.as_ref().unwrap() };
- let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
+ let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer);
let bgp_reload = Arc::clone(&bgp_client);
tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
printer.add_line("Connected to local peer".to_string(), false);
}
fn main() {
- if env::args().len() != 4 {
- println!("USAGE: dnsseed-rust datastore localPeerAddress bgp_peer");
+ if env::args().len() != 5 {
+ println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer");
return;
}
args.next();
let path = args.next().unwrap();
let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
+
+ let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
+ unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); }
+
let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
Store::new(path).and_then(move |store| {
use std::cmp;
-use std::net::SocketAddr;
+use std::net::{SocketAddr, IpAddr};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use bitcoin::consensus::encode;
use tokio::codec;
use tokio::codec::Framed;
use tokio::net::TcpStream;
+use tokio::io::read_exact;
use tokio::timer::Delay;
use futures::sync::mpsc;
}
}
+// base32 encoder and tests stolen (transliterated) from Bitcoin Core
+// Copyright (c) 2012-2019 The Bitcoin Core developers
+// Distributed under the MIT software license, see
+// http://www.opensource.org/licenses/mit-license.php.
+fn encode_base32(inp: &[u8]) -> String {
+ let mut ret = String::with_capacity(((inp.len() + 4) / 5) * 8);
+
+ let alphabet = "abcdefghijklmnopqrstuvwxyz234567";
+ let mut acc: u16 = 0;
+ let mut bits: u8 = 0;
+ for i in inp {
+ acc = ((acc << 8) | *i as u16) & ((1 << (8 + 5 - 1)) - 1);
+ bits += 8;
+ while bits >= 5 {
+ bits -= 5;
+ let idx = ((acc >> bits) & ((1 << 5) - 1)) as usize;
+ ret += &alphabet[idx..idx + 1];
+ }
+ }
+ if bits != 0 {
+ let idx = ((acc << (5 - bits)) & ((1 << 5) - 1)) as usize;
+ ret += &alphabet[idx..idx + 1];
+ }
+ while ret.len() % 8 != 0 { ret += "=" };
+ return ret;
+}
+
+#[test]
+fn test_encode_base32() {
+ let tests_in = ["","f","fo","foo","foob","fooba","foobar"];
+ let tests_out = ["","my======","mzxq====","mzxw6===","mzxw6yq=","mzxw6ytb","mzxw6ytboi======"];
+ for (inp, out) in tests_in.iter().zip(tests_out.iter()) {
+ assert_eq!(&encode_base32(inp.as_bytes()), out);
+ }
+ // My seednode's onion addr:
+ assert_eq!(&encode_base32(&[0x6a, 0x8b, 0xd2, 0x78, 0x3f, 0x7a, 0xf8, 0x92, 0x8f, 0x80]), "nkf5e6b7pl4jfd4a");
+}
+
+/// Note that this should only be used for really small chunks, ie small enough to *definitely* fit
+/// in the outbound TCP buffer, and shouldn't (practically) block.
+macro_rules! try_write_small {
+ ($sock: expr, $obj: expr) => { {
+ match $sock.write_all($obj) {
+ Ok(()) => {},
+ Err(e) => return future::Either::A(future::err(e)),
+ }
+ } }
+}
+
pub struct Peer {}
impl Peer {
- pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=encode::Error>)> {
+ pub fn new(addr: SocketAddr, tor_proxy: &SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=encode::Error>)> {
let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
});
- TcpStream::connect(&addr).select(connect_timeout)
- .or_else(move |_| {
- Delay::new(Instant::now() + timeout / 10).then(|_| {
- future::err(())
- })
- }).and_then(move |stream| {
- let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
+ match addr.ip() {
+ IpAddr::V6(v6addr) if v6addr.octets()[..6] == [0xFD,0x87,0xD8,0x7E,0xEB,0x43][..] => {
+ future::Either::A(connect_timeout.select(TcpStream::connect(&tor_proxy)
+ .and_then(move |mut stream: TcpStream| {
+ try_write_small!(stream, &[5u8, 1u8, 0u8]); // SOCKS5 with 1 method and no auth
+ future::Either::B(read_exact(stream, [0u8; 2]).and_then(move |(mut stream, response)| {
+ if response != [5, 0] { // SOCKS5 with no auth successful
+ future::Either::B(future::Either::A(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate"))))
+ } else {
+ let hostname = encode_base32(&v6addr.octets()[6..]) + ".onion";
+ let mut connect_msg = Vec::with_capacity(7 + hostname.len());
+ // SOCKS5 command CONNECT (+ reserved byte) to hostname with given len
+ connect_msg.extend_from_slice(&[5u8, 1u8, 0u8, 3u8, hostname.len() as u8]);
+ connect_msg.extend_from_slice(hostname.as_bytes());
+ connect_msg.push((addr.port() >> 8) as u8);
+ connect_msg.push((addr.port() >> 0) as u8);
+ try_write_small!(stream, &connect_msg);
+ future::Either::B(future::Either::B(read_exact(stream, [0u8; 4]).and_then(move |(stream, response)| {
+ if response[..3] != [5, 0, 0] {
+ future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to authenticate")))
+ } else {
+ if response[3] == 1 {
+ future::Either::A(future::Either::A(read_exact(stream, [0; 6]).and_then(|(stream, _)| future::ok(stream))))
+ } else if response[3] == 4 {
+ future::Either::A(future::Either::B(read_exact(stream, [0; 18]).and_then(|(stream, _)| future::ok(stream))))
+ } else {
+ future::Either::B(future::err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Bogus proxy address value")))
+ }
+ }
+ })))
+ }
+ }))
+ })
+ ).and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e)))
+ },
+ _ => future::Either::B(connect_timeout.select(TcpStream::connect(&addr))
+ .and_then(|(stream, _)| future::ok(stream)).or_else(|(e, _)| future::err(e))),
+ }.and_then(move |stream| {
+ let (write, read) = Framed::new(stream, MsgCoder(printer)).split();
let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
.then(|_| {
}));
future::ok((sender, read))
})
+ .or_else(move |_| {
+ Delay::new(Instant::now() + timeout / 10).then(|_| future::err(()))
+ })
}
}