2 use std::net::SocketAddr;
3 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5 use bitcoin::consensus::encode;
6 use bitcoin::consensus::encode::{Decodable, Encodable};
7 use bitcoin::network::address::Address;
8 use bitcoin::network::constants::Network;
9 use bitcoin::network::message::{RawNetworkMessage, NetworkMessage};
10 use bitcoin::network::message_network::VersionMessage;
12 use tokio::prelude::*;
14 use tokio::codec::Framed;
15 use tokio::net::TcpStream;
16 use tokio::timer::Delay;
18 use futures::sync::mpsc;
20 use crate::printer::Printer;
21 use crate::timeout_stream::TimeoutStream;
23 struct BytesCoder<'a>(&'a mut bytes::BytesMut);
24 impl<'a> std::io::Write for BytesCoder<'a> {
25 fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
26 self.0.extend_from_slice(&b);
29 fn flush(&mut self) -> Result<(), std::io::Error> {
33 struct BytesDecoder<'a> {
34 buf: &'a mut bytes::BytesMut,
37 impl<'a> std::io::Read for BytesDecoder<'a> {
38 fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
39 let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
40 b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
46 struct MsgCoder<'a>(&'a Printer);
47 impl<'a> codec::Decoder for MsgCoder<'a> {
48 type Item = NetworkMessage;
49 type Error = std::io::Error;
51 fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<NetworkMessage>, std::io::Error> {
52 let mut decoder = BytesDecoder {
56 match RawNetworkMessage::consensus_decode(&mut decoder) {
58 decoder.buf.advance(decoder.pos);
59 if res.magic == Network::Bitcoin.magic() {
62 Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "bad net magic"))
66 encode::Error::Io(_) => Ok(None),
67 encode::Error::UnrecognizedNetworkCommand(_msg) => {
68 decoder.buf.advance(decoder.pos);
69 //XXX(fixthese): self.0.add_line(format!("rust-bitcoin doesn't support {}!", msg), true);
73 self.0.add_line(format!("Error decoding message: {:?}", e), true);
74 Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
80 impl<'a> codec::Encoder for MsgCoder<'a> {
81 type Item = NetworkMessage;
82 type Error = std::io::Error;
84 fn encode(&mut self, msg: NetworkMessage, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
85 if let Err(_) = (RawNetworkMessage {
86 magic: Network::Bitcoin.magic(),
88 }.consensus_encode(&mut BytesCoder(res))) {
100 pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> impl Future<Error=(), Item=(mpsc::Sender<NetworkMessage>, impl Stream<Item=NetworkMessage, Error=std::io::Error>)> {
101 let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
102 future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
104 TcpStream::connect(&addr).select(connect_timeout)
106 Delay::new(Instant::now() + timeout / 10).then(|_| {
109 }).and_then(move |stream| {
110 let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
111 let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
112 tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
116 let _ = sender.try_send(NetworkMessage::Version(VersionMessage {
118 services: (1 << 3), // NODE_WITNESS
119 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).expect("time > 1970").as_secs() as i64,
120 receiver: Address::new(&addr, 0),
121 sender: Address::new(&"0.0.0.0:0".parse().unwrap(), 0),
123 user_agent: "/rust-bitcoin:0.18/bluematt-tokio-client:0.1/".to_string(),
127 future::ok((sender, TimeoutStream::new(read, timeout)))