--- /dev/null
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::cmp;
+use std::ops::Bound::Included;
+use std::collections::BTreeMap;
+use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::time::{Duration, Instant};
+
+use bgp_rs::{AFI, SAFI, AddPathDirection, Open, OpenCapability, OpenParameter, NLRIEncoding, PathAttribute};
+use bgp_rs::Capabilities;
+use bgp_rs::{ASPath, Segment};
+use bgp_rs::Message;
+use bgp_rs::Reader;
+
+use tokio::prelude::*;
+use tokio::codec;
+use tokio::codec::Framed;
+use tokio::net::TcpStream;
+use tokio::timer::Delay;
+
+use futures::sync::mpsc;
+
+use crate::printer::Printer;
+
+pub struct RoutingTable {
+ v4_table: BTreeMap<(Ipv4Addr, u8, u32), Arc<Vec<PathAttribute>>>,
+ v6_table: BTreeMap<(Ipv6Addr, u8, u32), Arc<Vec<PathAttribute>>>,
+}
+
+impl RoutingTable {
+ fn new() -> Self {
+ Self {
+ v4_table: BTreeMap::new(),
+ v6_table: BTreeMap::new(),
+ }
+ }
+
+ fn get_route_attrs(&self, ip: IpAddr) -> Vec<Arc<Vec<PathAttribute>>> {
+ macro_rules! lookup_res {
+ ($addrty: ty, $addr: expr, $table: expr, $addr_bits: expr) => { {
+ let mut res = Vec::new();
+ //TODO: Optimize this!
+ for i in (0..($addr_bits + 1)).rev() {
+ let mut lookup = $addr.octets();
+ for b in 0..(i / 8) {
+ lookup[lookup.len() - b] = 0;
+ }
+ lookup[lookup.len() - (i/8)] &= !(((1u16 << (i % 8)) - 1) as u8);
+ let lookup_addr = <$addrty>::from(lookup);
+ for attrs in $table.range((Included((lookup_addr, $addr_bits, 0)), Included((lookup_addr, $addr_bits, std::u32::MAX)))) {
+ res.push(Arc::clone(&attrs.1));
+ }
+ if !res.is_empty() { break; }
+ }
+ res
+ } }
+ }
+ match ip {
+ IpAddr::V4(v4a) => lookup_res!(Ipv4Addr, v4a, self.v4_table, 32),
+ IpAddr::V6(v6a) => lookup_res!(Ipv6Addr, v6a, self.v6_table, 128)
+ }
+ }
+
+ fn withdraw(&mut self, route: NLRIEncoding) {
+ match route {
+ NLRIEncoding::IP(p) => {
+ let (ip, len) = <(IpAddr, u8)>::from(&p);
+ match ip {
+ IpAddr::V4(v4a) => self.v4_table.remove(&(v4a, len, 0)),
+ IpAddr::V6(v6a) => self.v6_table.remove(&(v6a, len, 0)),
+ }
+ },
+ NLRIEncoding::IP_WITH_PATH_ID((p, id)) => {
+ let (ip, len) = <(IpAddr, u8)>::from(&p);
+ match ip {
+ IpAddr::V4(v4a) => self.v4_table.remove(&(v4a, len, id)),
+ IpAddr::V6(v6a) => self.v6_table.remove(&(v6a, len, id)),
+ }
+ },
+ NLRIEncoding::IP_MPLS(_) => None,
+ };
+ }
+
+ fn announce(&mut self, route: NLRIEncoding, attrs: Arc<Vec<PathAttribute>>) {
+ match route {
+ NLRIEncoding::IP(p) => {
+ let (ip, len) = <(IpAddr, u8)>::from(&p);
+ match ip {
+ IpAddr::V4(v4a) => self.v4_table.insert((v4a, len, 0), attrs),
+ IpAddr::V6(v6a) => self.v6_table.insert((v6a, len, 0), attrs),
+ }
+ },
+ NLRIEncoding::IP_WITH_PATH_ID((p, id)) => {
+ let (ip, len) = <(IpAddr, u8)>::from(&p);
+ match ip {
+ IpAddr::V4(v4a) => self.v4_table.insert((v4a, len, id), attrs),
+ IpAddr::V6(v6a) => self.v6_table.insert((v6a, len, id), attrs),
+ }
+ },
+ NLRIEncoding::IP_MPLS(_) => None,
+ };
+ }
+}
+
+struct BytesCoder<'a>(&'a mut bytes::BytesMut);
+impl<'a> std::io::Write for BytesCoder<'a> {
+ fn write(&mut self, b: &[u8]) -> Result<usize, std::io::Error> {
+ self.0.extend_from_slice(&b);
+ Ok(b.len())
+ }
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ Ok(())
+ }
+}
+struct BytesDecoder<'a> {
+ buf: &'a mut bytes::BytesMut,
+ pos: usize,
+}
+impl<'a> std::io::Read for BytesDecoder<'a> {
+ fn read(&mut self, b: &mut [u8]) -> Result<usize, std::io::Error> {
+ let copy_len = cmp::min(b.len(), self.buf.len() - self.pos);
+ b[..copy_len].copy_from_slice(&self.buf[self.pos..self.pos + copy_len]);
+ self.pos += copy_len;
+ Ok(copy_len)
+ }
+}
+
+struct MsgCoder<'a>(&'a Printer);
+impl<'a> codec::Decoder for MsgCoder<'a> {
+ type Item = Message;
+ type Error = std::io::Error;
+
+ fn decode(&mut self, bytes: &mut bytes::BytesMut) -> Result<Option<Message>, std::io::Error> {
+ let mut decoder = BytesDecoder {
+ buf: bytes,
+ pos: 0
+ };
+ match (Reader {
+ stream: &mut decoder,
+ capabilities: Capabilities {
+ FOUR_OCTET_ASN_SUPPORT: true,
+ EXTENDED_PATH_NLRI_SUPPORT: true,
+ }
+ }).read() {
+ Ok((_header, msg)) => {
+ decoder.buf.advance(decoder.pos);
+ Ok(Some(msg))
+ },
+ Err(e) => match e.kind() {
+ std::io::ErrorKind::UnexpectedEof => Ok(None),
+ _ => Err(e),
+ },
+ }
+ }
+}
+impl<'a> codec::Encoder for MsgCoder<'a> {
+ type Item = Message;
+ type Error = std::io::Error;
+
+ fn encode(&mut self, msg: Message, res: &mut bytes::BytesMut) -> Result<(), std::io::Error> {
+ msg.write(&mut BytesCoder(res))?;
+ Ok(())
+ }
+}
+
+pub struct BGPClient {
+ routes: Mutex<RoutingTable>,
+ shutdown: AtomicBool,
+}
+impl BGPClient {
+ pub fn get_asn(&self, addr: IpAddr) -> u32 {
+ let attr_set = self.routes.lock().unwrap().get_route_attrs(addr);
+ let mut paths: Vec<(ASPath, u32, u32)> = Vec::new();
+ for attrs in attr_set.iter() {
+ let mut as4_path = None;
+ let mut as_path = None;
+ let mut pref = 100;
+ let mut med = 0;
+ for attr in attrs.iter() {
+ match attr {
+ PathAttribute::AS4_PATH(path) => as4_path = Some(path),
+ PathAttribute::AS_PATH(path) => as_path = Some(path),
+ PathAttribute::LOCAL_PREF(p) => pref = *p,
+ PathAttribute::MULTI_EXIT_DISC(m) => med = *m,
+ _ => {},
+ }
+ }
+ if let Some(path) = as4_path.or(as_path) {
+ paths.push((path.clone(), pref, med));
+ }
+ }
+ if paths.is_empty() { return 0; }
+
+ let mut path_vecs: Vec<_> = paths.iter_mut().map(|(p, pref, med)| {
+ let mut path = Vec::new();
+ for seg in p.segments.drain(..) {
+ match seg {
+ Segment::AS_SEQUENCE(mut asn) => path.append(&mut asn),
+ Segment::AS_SET(_) => {}, // Ignore sets for now, they're not that common anyway
+ }
+ }
+ (path, pref, med)
+ }).collect();
+ path_vecs.sort_unstable_by(|(path_a, pref_a, med_a), (path_b, pref_b, med_b)| {
+ pref_a.cmp(pref_b).then(path_b.len().cmp(&path_a.len())).then(med_b.cmp(med_a))
+ });
+ // TODO: Find last common ASN among all paths
+ *path_vecs[0].0.last().unwrap_or(&0)
+ }
+
+ pub fn disconnect(&self) {
+ self.shutdown.store(true, Ordering::Relaxed);
+ }
+
+ fn connect_given_client(addr: SocketAddr, timeout: Duration, printer: &'static Printer, client: Arc<BGPClient>) {
+ let connect_timeout = Delay::new(Instant::now() + timeout.clone()).then(|_| {
+ future::err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout reached"))
+ });
+ let client_reconn = Arc::clone(&client);
+ tokio::spawn(TcpStream::connect(&addr).select(connect_timeout)
+ .or_else(move |_| {
+ Delay::new(Instant::now() + timeout / 10).then(|_| {
+ future::err(())
+ })
+ }).and_then(move |stream| {
+ let (write, read) = Framed::new(stream.0, MsgCoder(printer)).split();
+ let (mut sender, receiver) = mpsc::channel(10); // We never really should send more than 10 messages unless they're dumb
+ tokio::spawn(write.sink_map_err(|_| { () }).send_all(receiver)
+ .then(|_| {
+ future::err(())
+ }));
+ let _ = sender.try_send(Message::Open(Open {
+ version: 4,
+ peer_asn: 23456,
+ hold_timer: 120,
+ identifier: 0x453b1215, // 69.59.18.21
+ parameters: vec![OpenParameter::Capabilities(vec![
+ OpenCapability::MultiProtocol((AFI::IPV4, SAFI::Unicast)),
+ OpenCapability::MultiProtocol((AFI::IPV6, SAFI::Unicast)),
+ OpenCapability::FourByteASN(397444),
+ OpenCapability::RouteRefresh,
+ OpenCapability::AddPath(vec![
+ (AFI::IPV4, SAFI::Unicast, AddPathDirection::ReceivePaths),
+ (AFI::IPV6, SAFI::Unicast, AddPathDirection::ReceivePaths)]),
+ ])]
+ }));
+ read.for_each(move |bgp_msg| {
+ if client.shutdown.load(Ordering::Relaxed) {
+ return future::err(std::io::Error::new(std::io::ErrorKind::Other, "Shutting Down"));
+ }
+ match bgp_msg {
+ Message::Open(_) => {
+ printer.add_line("Connected to BGP route provider".to_string(), true);
+ },
+ Message::KeepAlive => {
+ let _ = sender.try_send(Message::KeepAlive);
+ },
+ Message::Update(mut upd) => {
+ upd.normalize();
+ let mut route_table = client.routes.lock().unwrap();
+ for r in upd.withdrawn_routes {
+ route_table.withdraw(r);
+ }
+ let attrs = Arc::new(upd.attributes);
+ for r in upd.announced_routes {
+ route_table.announce(r, Arc::clone(&attrs));
+ }
+ },
+ _ => {}
+ }
+ future::ok(())
+ }).or_else(move |e| {
+ printer.add_line(format!("Got error from BGP stream: {:?}", e), true);
+ future::ok(())
+ })
+ }).then(move |_| {
+ if !client_reconn.shutdown.load(Ordering::Relaxed) {
+ BGPClient::connect_given_client(addr, timeout, printer, client_reconn);
+ }
+ future::ok(())
+ })
+ );
+ }
+
+ pub fn new(addr: SocketAddr, timeout: Duration, printer: &'static Printer) -> Arc<BGPClient> {
+ let client = Arc::new(BGPClient {
+ routes: Mutex::new(RoutingTable::new()),
+ shutdown: AtomicBool::new(false),
+ });
+ BGPClient::connect_given_client(addr, timeout, printer, Arc::clone(&client));
+ client
+ }
+}
mod printer;
mod reader;
mod peer;
+mod bgp_client;
mod timeout_stream;
mod datastore;
use datastore::{AddressState, Store, U64Setting, RegexSetting};
use timeout_stream::TimeoutStream;
use rand::Rng;
+use bgp_client::BGPClient;
use tokio::prelude::*;
use tokio::timer::Delay;
}));
}
-fn poll_dnsseeds() {
+fn poll_dnsseeds(bgp_client: Arc<BGPClient>) {
tokio::spawn(future::lazy(|| {
let printer = unsafe { PRINTER.as_ref().unwrap() };
let store = unsafe { DATA_STORE.as_ref().unwrap() };
printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false);
Delay::new(Instant::now() + Duration::from_secs(60)).then(|_| {
let store = unsafe { DATA_STORE.as_ref().unwrap() };
- let dns_future = store.write_dns();
+ let dns_future = store.write_dns(Arc::clone(&bgp_client));
store.save_data().join(dns_future).then(|_| {
if !START_SHUTDOWN.load(Ordering::Relaxed) {
- poll_dnsseeds();
+ poll_dnsseeds(bgp_client);
+ } else {
+ bgp_client.disconnect();
}
future::ok(())
})
}));
}
-fn make_trusted_conn(trusted_sockaddr: SocketAddr) {
+fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc<BGPClient>) {
let printer = unsafe { PRINTER.as_ref().unwrap() };
let trusted_peer = Peer::new(trusted_sockaddr.clone(), Duration::from_secs(600), printer);
+ let bgp_reload = Arc::clone(&bgp_client);
tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
printer.add_line("Connected to local peer".to_string(), false);
let mut starting_height = 0;
*unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block));
if !SCANNING.swap(true, Ordering::SeqCst) {
scan_net();
- poll_dnsseeds();
+ poll_dnsseeds(Arc::clone(&bgp_client));
}
}
},
}).then(move |_: Result<(), ()>| {
if !START_SHUTDOWN.load(Ordering::Relaxed) {
printer.add_line("Lost connection from trusted peer".to_string(), true);
- make_trusted_conn(trusted_sockaddr);
+ make_trusted_conn(trusted_sockaddr, bgp_reload);
}
future::ok(())
}));
}
fn main() {
- if env::args().len() != 3 {
- println!("USAGE: dnsseed-rust datastore localPeerAddress");
+ if env::args().len() != 4 {
+ println!("USAGE: dnsseed-rust datastore localPeerAddress bgp_peer");
return;
}
let mut args = env::args();
args.next();
let path = args.next().unwrap();
- let addr = args.next().unwrap();
+ let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
+ let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
Store::new(path).and_then(move |store| {
unsafe { DATA_STORE = Some(Box::new(store)) };
let store = unsafe { DATA_STORE.as_ref().unwrap() };
unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
- let trusted_sockaddr: SocketAddr = addr.parse().unwrap();
- make_trusted_conn(trusted_sockaddr);
+ let bgp_client = BGPClient::new(bgp_sockaddr, Duration::from_secs(600), unsafe { PRINTER.as_ref().unwrap() });
+ make_trusted_conn(trusted_sockaddr, bgp_client);
reader::read(store, unsafe { PRINTER.as_ref().unwrap() });