+
+}
+
+impl events::EventsProvider for ChannelManager {
+ fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ let mut ret = Vec::new();
+ mem::swap(&mut ret, &mut *pending_events);
+ ret
+ }
+}
+
+impl ChainListener for ChannelManager {
+ fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
+ let mut new_events = Vec::new();
+ let mut failed_channels = Vec::new();
+ {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_lock.borrow_parts();
+ let short_to_id = channel_state.short_to_id;
+ channel_state.by_id.retain(|_, channel| {
+ let chan_res = channel.block_connected(header, height, txn_matched, indexes_of_txn_matched);
+ if let Ok(Some(funding_locked)) = chan_res {
+ let announcement_sigs = self.get_announcement_sigs(channel);
+ new_events.push(events::Event::SendFundingLocked {
+ node_id: channel.get_their_node_id(),
+ msg: funding_locked,
+ announcement_sigs: announcement_sigs
+ });
+ short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
+ } else if let Err(e) = chan_res {
+ new_events.push(events::Event::HandleError {
+ node_id: channel.get_their_node_id(),
+ action: e.action,
+ });
+ if channel.is_shutdown() {
+ return false;
+ }
+ }
+ if let Some(funding_txo) = channel.get_funding_txo() {
+ for tx in txn_matched {
+ for inp in tx.input.iter() {
+ if inp.previous_output == funding_txo.into_bitcoin_outpoint() {
+ if let Some(short_id) = channel.get_short_channel_id() {
+ short_to_id.remove(&short_id);
+ }
+ // It looks like our counterparty went on-chain. We go ahead and
+ // broadcast our latest local state as well here, just in case its
+ // some kind of SPV attack, though we expect these to be dropped.
+ failed_channels.push(channel.force_shutdown());
+ if let Ok(update) = self.get_channel_update(&channel) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ return false;
+ }
+ }
+ }
+ }
+ if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height) {
+ if let Some(short_id) = channel.get_short_channel_id() {
+ short_to_id.remove(&short_id);
+ }
+ failed_channels.push(channel.force_shutdown());
+ // If would_broadcast_at_height() is true, the channel_monitor will broadcast
+ // the latest local tx for us, so we should skip that here (it doesn't really
+ // hurt anything, but does make tests a bit simpler).
+ failed_channels.last_mut().unwrap().0 = Vec::new();
+ if let Ok(update) = self.get_channel_update(&channel) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ return false;
+ }
+ true
+ });
+ }
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
+ }
+ let mut pending_events = self.pending_events.lock().unwrap();
+ for funding_locked in new_events.drain(..) {
+ pending_events.push(funding_locked);
+ }
+ self.latest_block_height.store(height as usize, Ordering::Release);
+ }
+
+ /// We force-close the channel without letting our counterparty participate in the shutdown
+ fn block_disconnected(&self, header: &BlockHeader) {
+ let mut new_events = Vec::new();
+ let mut failed_channels = Vec::new();
+ {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = channel_lock.borrow_parts();
+ let short_to_id = channel_state.short_to_id;
+ channel_state.by_id.retain(|_, v| {
+ if v.block_disconnected(header) {
+ if let Some(short_id) = v.get_short_channel_id() {
+ short_to_id.remove(&short_id);
+ }
+ failed_channels.push(v.force_shutdown());
+ if let Ok(update) = self.get_channel_update(&v) {
+ new_events.push(events::Event::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ false
+ } else {
+ true
+ }
+ });
+ }
+ for failure in failed_channels.drain(..) {
+ self.finish_force_close_channel(failure);
+ }
+ if !new_events.is_empty() {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ for funding_locked in new_events.drain(..) {
+ pending_events.push(funding_locked);
+ }
+ }
+ self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
+ }
+}
+
+macro_rules! handle_error {
+ ($self: ident, $internal: expr, $their_node_id: expr) => {
+ match $internal {
+ Ok(msg) => Ok(msg),
+ Err(MsgHandleErrInternal { err, needs_channel_force_close }) => {
+ if needs_channel_force_close {
+ match &err.action {
+ &Some(msgs::ErrorAction::DisconnectPeer { msg: Some(ref msg) }) => {
+ if msg.channel_id == [0; 32] {
+ $self.peer_disconnected(&$their_node_id, true);
+ } else {
+ $self.force_close_channel(&msg.channel_id);
+ }
+ },
+ &Some(msgs::ErrorAction::DisconnectPeer { msg: None }) => {},
+ &Some(msgs::ErrorAction::IgnoreError) => {},
+ &Some(msgs::ErrorAction::SendErrorMessage { ref msg }) => {
+ if msg.channel_id == [0; 32] {
+ $self.peer_disconnected(&$their_node_id, true);
+ } else {
+ $self.force_close_channel(&msg.channel_id);
+ }
+ },
+ &None => {},
+ }
+ }
+ Err(err)
+ },
+ }
+ }
+}
+
+impl ChannelMessageHandler for ChannelManager {
+ //TODO: Handle errors and close channel (or so)
+ fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<msgs::AcceptChannel, HandleError> {
+ handle_error!(self, self.internal_open_channel(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_accept_channel(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<msgs::FundingSigned, HandleError> {
+ handle_error!(self, self.internal_funding_created(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_funding_signed(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) -> Result<Option<msgs::AnnouncementSignatures>, HandleError> {
+ handle_error!(self, self.internal_funding_locked(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>), HandleError> {
+ handle_error!(self, self.internal_shutdown(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<Option<msgs::ClosingSigned>, HandleError> {
+ handle_error!(self, self.internal_closing_signed(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) -> Result<(), msgs::HandleError> {
+ handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<Option<msgs::HTLCFailChannelUpdate>, HandleError> {
+ handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>), HandleError> {
+ handle_error!(self, self.internal_commitment_signed(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<Option<msgs::CommitmentUpdate>, HandleError> {
+ handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_update_fee(their_node_id, msg), their_node_id)
+ }
+
+ fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), HandleError> {
+ handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), their_node_id)
+ }
+