+ async fn handle_peer(open_msg: Message, stream: TcpStream, timeout: Duration, printer: &'static Printer, client: Arc<BGPClient>) -> Result<(), std::io::Error> {
+ let mut open_bytes = [0; 64];
+ let len = {
+ let mut write_cursor = std::io::Cursor::new(&mut open_bytes);
+ open_msg.encode(&mut write_cursor);
+ write_cursor.position()
+ };
+ stream.write_all(&open_bytes[..len]).await?;
+ let mut cap = Default::default();
+
+ let mut readpending = Vec::new();
+ let mut readbuf = [0; 8192];
+ let mut msg_timeout = time::sleep(timeout);
+ 'read_loop: loop {
+ if client.shutdown.load(Ordering::Relaxed) {
+ return std::io::Error::new(std::io::ErrorKind::Other, "Shutting Down");
+ }
+ tokio::select! {
+ _ = msg_timeout => {
+ return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "Keepalive expired"));
+ },
+ res = stream.read(&mut readbuf) => {
+ let mut msg_opt = None;
+ let bytecnt = res?;
+ if readpending.is_empty() {
+ let mut cursor = std::io::Cursor::new(&readbuf[..bytecnt]);
+ let mut reader = Reader { stream: &mut cursor, capabilities: &cap };
+ match reader.read() {
+ Ok((_header, newmsg)) => { readpending.append(&readbuf[cursor.position()..bytecnt]); newmsg = Some(msg_opt) },
+ Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
+ readpending.append(&readbuf[..bytecnt]);
+ continue 'read_loop;
+ },
+ Err(e) => return Err(e),
+ }
+ } else { readpending.append(&readbuf[..bytecnt]); }
+ loop {
+ if msg_opt.is_none() {
+ let mut cursor = std::io::Cursor::new(&readpending);
+ let mut reader = Reader { stream: &mut cursor, capabilities: &cap };
+ match reader.read() {
+ Ok((_header, newmsg)) => { newmsg = Some(msg_opt) },
+ Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {},
+ Err(e) => return Err(e),
+ }
+ readpending = readpending.split_off(cursor.position());
+ }
+ if let Some(bgp_msg) = msg_opt.take() {
+ match bgp_msg {
+ Message::Open(o) => {
+ cap = Capabilities::from_parameters(o.parameters.clone());
+ client.routes.lock().unwrap().v4_table.clear();
+ client.routes.lock().unwrap().v6_table.clear();
+ printer.add_line("Connected to BGP route provider".to_string(), false);
+ },
+ Message::KeepAlive => {
+ msg_timeout = time::sleep(timeout);
+ //XXX: let _ = sender.try_send(Message::KeepAlive);
+ },
+ Message::Update(mut upd) => {
+ upd.normalize();
+ let mut route_table = client.routes.lock().unwrap();
+ for r in upd.withdrawn_routes {
+ route_table.withdraw(r);
+ }
+ if let Some(path) = Self::map_attrs(upd.attributes) {
+ for r in upd.announced_routes {
+ route_table.announce(r, path.clone());
+ }
+ }
+ printer.set_stat(Stat::V4RoutingTableSize(route_table.v4_table.len()));
+ printer.set_stat(Stat::V6RoutingTableSize(route_table.v6_table.len()));
+ printer.set_stat(Stat::RoutingTablePaths(route_table.max_paths));
+ },
+ _ => {}
+ }
+ } else { break; }
+ }
+ }
+ };
+ }
+ }
+