-/// Tracks a view of the network, receiving updates from peers and generating Routes to
-/// payment destinations.
-pub struct Router {
- secp_ctx: Secp256k1<secp256k1::VerifyOnly>,
- network_map: RwLock<NetworkMap>,
- full_syncs_requested: AtomicUsize,
- chain_monitor: Arc<ChainWatchInterface>,
- logger: Arc<Logger>,
-}
-
-const SERIALIZATION_VERSION: u8 = 1;
-const MIN_SERIALIZATION_VERSION: u8 = 1;
-
-impl Writeable for Router {
- fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
- writer.write_all(&[SERIALIZATION_VERSION; 1])?;
- writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
-
- let network = self.network_map.read().unwrap();
- network.write(writer)?;
- Ok(())
- }
-}
-
-/// Arguments for the creation of a Router that are not deserialized.
-/// At a high-level, the process for deserializing a Router and resuming normal operation is:
-/// 1) Deserialize the Router by filling in this struct and calling <Router>::read(reaser, args).
-/// 2) Register the new Router with your ChainWatchInterface
-pub struct RouterReadArgs {
- /// The ChainWatchInterface for use in the Router in the future.
- ///
- /// No calls to the ChainWatchInterface will be made during deserialization.
- pub chain_monitor: Arc<ChainWatchInterface>,
- /// The Logger for use in the ChannelManager and which may be used to log information during
- /// deserialization.
- pub logger: Arc<Logger>,
-}
-
-impl ReadableArgs<RouterReadArgs> for Router {
- fn read<R: ::std::io::Read>(reader: &mut R, args: RouterReadArgs) -> Result<Router, DecodeError> {
- let _ver: u8 = Readable::read(reader)?;
- let min_ver: u8 = Readable::read(reader)?;
- if min_ver > SERIALIZATION_VERSION {
- return Err(DecodeError::UnknownVersion);
- }
- let network_map = Readable::read(reader)?;
- Ok(Router {
- secp_ctx: Secp256k1::verification_only(),
- network_map: RwLock::new(network_map),
- full_syncs_requested: AtomicUsize::new(0),
- chain_monitor: args.chain_monitor,
- logger: args.logger,
- })
- }
-}
-
-macro_rules! secp_verify_sig {
- ( $secp_ctx: expr, $msg: expr, $sig: expr, $pubkey: expr ) => {
- match $secp_ctx.verify($msg, $sig, $pubkey) {
- Ok(_) => {},
- Err(_) => return Err(LightningError{err: "Invalid signature from remote node", action: ErrorAction::IgnoreError}),
- }
- };
-}
-
-impl RoutingMessageHandler for Router {
-
- fn handle_node_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> {
- let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.signature, &msg.contents.node_id);
-
- let mut network = self.network_map.write().unwrap();
- match network.nodes.get_mut(&msg.contents.node_id) {
- None => Err(LightningError{err: "No existing channels for node_announcement", action: ErrorAction::IgnoreError}),
- Some(node) => {
- match node.last_update {
- Some(last_update) => if last_update >= msg.contents.timestamp {
- return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
- },
- None => {},
- }
-
- node.features = msg.contents.features.clone();
- node.last_update = Some(msg.contents.timestamp);
- node.rgb = msg.contents.rgb;
- node.alias = msg.contents.alias;
- node.addresses = msg.contents.addresses.clone();
-
- let should_relay = msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty();
- node.announcement_message = if should_relay { Some(msg.clone()) } else { None };
- Ok(should_relay)
- }
- }
- }
-
- fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> {
- if msg.contents.node_id_1 == msg.contents.node_id_2 || msg.contents.bitcoin_key_1 == msg.contents.bitcoin_key_2 {
- return Err(LightningError{err: "Channel announcement node had a channel with itself", action: ErrorAction::IgnoreError});
- }
-
- let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_1, &msg.contents.node_id_1);
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_2, &msg.contents.node_id_2);
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &msg.contents.bitcoin_key_1);
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &msg.contents.bitcoin_key_2);
-
- let checked_utxo = match self.chain_monitor.get_chain_utxo(msg.contents.chain_hash, msg.contents.short_channel_id) {
- Ok((script_pubkey, _value)) => {
- let expected_script = Builder::new().push_opcode(opcodes::all::OP_PUSHNUM_2)
- .push_slice(&msg.contents.bitcoin_key_1.serialize())
- .push_slice(&msg.contents.bitcoin_key_2.serialize())
- .push_opcode(opcodes::all::OP_PUSHNUM_2)
- .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_v0_p2wsh();
- if script_pubkey != expected_script {
- return Err(LightningError{err: "Channel announcement keys didn't match on-chain script", action: ErrorAction::IgnoreError});
- }
- //TODO: Check if value is worth storing, use it to inform routing, and compare it
- //to the new HTLC max field in channel_update
- true
- },
- Err(ChainError::NotSupported) => {
- // Tentatively accept, potentially exposing us to DoS attacks
- false
- },
- Err(ChainError::NotWatched) => {
- return Err(LightningError{err: "Channel announced on an unknown chain", action: ErrorAction::IgnoreError});
- },
- Err(ChainError::UnknownTx) => {
- return Err(LightningError{err: "Channel announced without corresponding UTXO entry", action: ErrorAction::IgnoreError});
- },
- };
-
- let mut network_lock = self.network_map.write().unwrap();
- let network = &mut *network_lock;
-
- let should_relay = msg.contents.excess_data.is_empty();
-
- let chan_info = ChannelInfo {
- features: msg.contents.features.clone(),
- one_to_two: DirectionalChannelInfo {
- src_node_id: msg.contents.node_id_1.clone(),
- last_update: 0,
- enabled: false,
- cltv_expiry_delta: u16::max_value(),
- htlc_minimum_msat: u64::max_value(),
- fee_base_msat: u32::max_value(),
- fee_proportional_millionths: u32::max_value(),
- last_update_message: None,
- },
- two_to_one: DirectionalChannelInfo {
- src_node_id: msg.contents.node_id_2.clone(),
- last_update: 0,
- enabled: false,
- cltv_expiry_delta: u16::max_value(),
- htlc_minimum_msat: u64::max_value(),
- fee_base_msat: u32::max_value(),
- fee_proportional_millionths: u32::max_value(),
- last_update_message: None,
- },
- announcement_message: if should_relay { Some(msg.clone()) } else { None },
- };
-
- match network.channels.entry(msg.contents.short_channel_id) {
- BtreeEntry::Occupied(mut entry) => {
- //TODO: because asking the blockchain if short_channel_id is valid is only optional
- //in the blockchain API, we need to handle it smartly here, though it's unclear
- //exactly how...
- if checked_utxo {
- // Either our UTXO provider is busted, there was a reorg, or the UTXO provider
- // only sometimes returns results. In any case remove the previous entry. Note
- // that the spec expects us to "blacklist" the node_ids involved, but we can't
- // do that because
- // a) we don't *require* a UTXO provider that always returns results.
- // b) we don't track UTXOs of channels we know about and remove them if they
- // get reorg'd out.
- // c) it's unclear how to do so without exposing ourselves to massive DoS risk.
- Self::remove_channel_in_nodes(&mut network.nodes, &entry.get(), msg.contents.short_channel_id);
- *entry.get_mut() = chan_info;
- } else {
- return Err(LightningError{err: "Already have knowledge of channel", action: ErrorAction::IgnoreError})
- }
- },
- BtreeEntry::Vacant(entry) => {
- entry.insert(chan_info);
- }
- };
-
- macro_rules! add_channel_to_node {
- ( $node_id: expr ) => {
- match network.nodes.entry($node_id) {
- BtreeEntry::Occupied(node_entry) => {
- node_entry.into_mut().channels.push(msg.contents.short_channel_id);
- },
- BtreeEntry::Vacant(node_entry) => {
- node_entry.insert(NodeInfo {
- channels: vec!(msg.contents.short_channel_id),
- lowest_inbound_channel_fee_base_msat: u32::max_value(),
- lowest_inbound_channel_fee_proportional_millionths: u32::max_value(),
- features: NodeFeatures::empty(),
- last_update: None,
- rgb: [0; 3],
- alias: [0; 32],
- addresses: Vec::new(),
- announcement_message: None,
- });
- }
- }
- };
- }
-
- add_channel_to_node!(msg.contents.node_id_1);
- add_channel_to_node!(msg.contents.node_id_2);
-
- log_trace!(self, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !should_relay { " with excess uninterpreted data!" } else { "" });
- Ok(should_relay)
- }
-
- fn handle_htlc_fail_channel_update(&self, update: &msgs::HTLCFailChannelUpdate) {
- match update {
- &msgs::HTLCFailChannelUpdate::ChannelUpdateMessage { ref msg } => {
- let _ = self.handle_channel_update(msg);
- },
- &msgs::HTLCFailChannelUpdate::ChannelClosed { ref short_channel_id, ref is_permanent } => {
- let mut network = self.network_map.write().unwrap();
- if *is_permanent {
- if let Some(chan) = network.channels.remove(short_channel_id) {
- Self::remove_channel_in_nodes(&mut network.nodes, &chan, *short_channel_id);
- }
- } else {
- if let Some(chan) = network.channels.get_mut(short_channel_id) {
- chan.one_to_two.enabled = false;
- chan.two_to_one.enabled = false;
- }
- }
- },
- &msgs::HTLCFailChannelUpdate::NodeFailure { ref node_id, ref is_permanent } => {
- if *is_permanent {
- //TODO: Wholly remove the node
- } else {
- self.mark_node_bad(node_id, false);
- }
- },
- }
- }
-
- fn handle_channel_update(&self, msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> {
- let mut network = self.network_map.write().unwrap();
- let dest_node_id;
- let chan_enabled = msg.contents.flags & (1 << 1) != (1 << 1);
- let chan_was_enabled;
-
- match network.channels.get_mut(&msg.contents.short_channel_id) {
- None => return Err(LightningError{err: "Couldn't find channel for update", action: ErrorAction::IgnoreError}),
- Some(channel) => {
- macro_rules! maybe_update_channel_info {
- ( $target: expr) => {
- if $target.last_update >= msg.contents.timestamp {
- return Err(LightningError{err: "Update older than last processed update", action: ErrorAction::IgnoreError});
- }
- chan_was_enabled = $target.enabled;
- $target.last_update = msg.contents.timestamp;
- $target.enabled = chan_enabled;
- $target.cltv_expiry_delta = msg.contents.cltv_expiry_delta;
- $target.htlc_minimum_msat = msg.contents.htlc_minimum_msat;
- $target.fee_base_msat = msg.contents.fee_base_msat;
- $target.fee_proportional_millionths = msg.contents.fee_proportional_millionths;
- $target.last_update_message = if msg.contents.excess_data.is_empty() {
- Some(msg.clone())
- } else {
- None
- };
- }
- }
- let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]);
- if msg.contents.flags & 1 == 1 {
- dest_node_id = channel.one_to_two.src_node_id.clone();
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.signature, &channel.two_to_one.src_node_id);
- maybe_update_channel_info!(channel.two_to_one);
- } else {
- dest_node_id = channel.two_to_one.src_node_id.clone();
- secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.signature, &channel.one_to_two.src_node_id);
- maybe_update_channel_info!(channel.one_to_two);
- }
- }
- }
-
- if chan_enabled {
- let node = network.nodes.get_mut(&dest_node_id).unwrap();
- node.lowest_inbound_channel_fee_base_msat = cmp::min(node.lowest_inbound_channel_fee_base_msat, msg.contents.fee_base_msat);
- node.lowest_inbound_channel_fee_proportional_millionths = cmp::min(node.lowest_inbound_channel_fee_proportional_millionths, msg.contents.fee_proportional_millionths);
- } else if chan_was_enabled {
- let mut lowest_inbound_channel_fee_base_msat = u32::max_value();
- let mut lowest_inbound_channel_fee_proportional_millionths = u32::max_value();
-
- {
- let node = network.nodes.get(&dest_node_id).unwrap();
-
- for chan_id in node.channels.iter() {
- let chan = network.channels.get(chan_id).unwrap();
- if chan.one_to_two.src_node_id == dest_node_id {
- lowest_inbound_channel_fee_base_msat = cmp::min(lowest_inbound_channel_fee_base_msat, chan.two_to_one.fee_base_msat);
- lowest_inbound_channel_fee_proportional_millionths = cmp::min(lowest_inbound_channel_fee_proportional_millionths, chan.two_to_one.fee_proportional_millionths);
- } else {
- lowest_inbound_channel_fee_base_msat = cmp::min(lowest_inbound_channel_fee_base_msat, chan.one_to_two.fee_base_msat);
- lowest_inbound_channel_fee_proportional_millionths = cmp::min(lowest_inbound_channel_fee_proportional_millionths, chan.one_to_two.fee_proportional_millionths);
- }
- }
- }
-
- //TODO: satisfy the borrow-checker without a double-map-lookup :(
- let mut_node = network.nodes.get_mut(&dest_node_id).unwrap();
- mut_node.lowest_inbound_channel_fee_base_msat = lowest_inbound_channel_fee_base_msat;
- mut_node.lowest_inbound_channel_fee_proportional_millionths = lowest_inbound_channel_fee_proportional_millionths;
- }
-
- Ok(msg.contents.excess_data.is_empty())
- }
-
- fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
- let mut result = Vec::with_capacity(batch_amount as usize);
- let network = self.network_map.read().unwrap();
- let mut iter = network.channels.range(starting_point..);
- while result.len() < batch_amount as usize {
- if let Some((_, ref chan)) = iter.next() {
- if chan.announcement_message.is_some() {
- result.push((chan.announcement_message.clone().unwrap(),
- chan.one_to_two.last_update_message.clone(),
- chan.two_to_one.last_update_message.clone()));
- } else {
- // TODO: We may end up sending un-announced channel_updates if we are sending
- // initial sync data while receiving announce/updates for this channel.
- }
- } else {
- return result;
- }
- }
- result
- }
-
- fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
- let mut result = Vec::with_capacity(batch_amount as usize);
- let network = self.network_map.read().unwrap();
- let mut iter = if let Some(pubkey) = starting_point {
- let mut iter = network.nodes.range((*pubkey)..);
- iter.next();
- iter
- } else {
- network.nodes.range(..)
- };
- while result.len() < batch_amount as usize {
- if let Some((_, ref node)) = iter.next() {
- if node.announcement_message.is_some() {
- result.push(node.announcement_message.clone().unwrap());
- }
- } else {
- return result;
- }
- }
- result
- }
-
- fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
- //TODO: Determine whether to request a full sync based on the network map.
- const FULL_SYNCS_TO_REQUEST: usize = 5;
- if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
- self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
- true
- } else {
- false
- }
- }
-}
-