Merge pull request #793 from galderz/t_double_validation_792
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Fri, 5 Feb 2021 14:17:46 +0000 (06:17 -0800)
committerGitHub <noreply@github.com>
Fri, 5 Feb 2021 14:17:46 +0000 (06:17 -0800)
.github/workflows/build.yml
Cargo.toml
lightning-block-sync/Cargo.toml [new file with mode: 0644]
lightning-block-sync/src/convert.rs [new file with mode: 0644]
lightning-block-sync/src/http.rs [new file with mode: 0644]
lightning-block-sync/src/lib.rs [new file with mode: 0644]
lightning-block-sync/src/rest.rs [new file with mode: 0644]
lightning-block-sync/src/rpc.rs [new file with mode: 0644]
lightning-block-sync/src/utils.rs [new file with mode: 0644]
lightning/src/ln/msgs.rs
lightning/src/routing/network_graph.rs

index 84fc509462f74dff8e341d4327db4f3f6a6039f5..fd219f101edb608f4d43d4fcc6d2148c5a341b1d 100644 (file)
@@ -13,7 +13,7 @@ jobs:
                      1.30.0,
                      # 1.34.2 is Debian stable
                      1.34.2,
-                     # 1.45.2 is MSRV for lightning-net-tokio and generates coverage
+                     # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, and coverage generation
                      1.45.2]
         include:
           - toolchain: stable
@@ -48,6 +48,24 @@ jobs:
       - name: Build on Rust ${{ matrix.toolchain }}
         if: "! matrix.build-net-tokio"
         run: cargo build --verbose  --color always -p lightning
+      - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features
+        if: "matrix.build-net-tokio && !matrix.coverage"
+        run: |
+          cd lightning-block-sync
+          cargo build --verbose --color always --features rest-client
+          cargo build --verbose --color always --features rpc-client
+          cargo build --verbose --color always --features rpc-client,rest-client
+          cargo build --verbose --color always --features rpc-client,rest-client,tokio
+          cd ..
+      - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
+        if: matrix.coverage
+        run: |
+          cd lightning-block-sync
+          RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rest-client
+          RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client
+          RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client
+          RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio
+          cd ..
       - name: Test on Rust ${{ matrix.toolchain }} with net-tokio
         if: "matrix.build-net-tokio && !matrix.coverage"
         run: cargo test --verbose --color always
@@ -57,6 +75,24 @@ jobs:
       - name: Test on Rust ${{ matrix.toolchain }}
         if: "! matrix.build-net-tokio"
         run: cargo test --verbose --color always  -p lightning
+      - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features
+        if: "matrix.build-net-tokio && !matrix.coverage"
+        run: |
+          cd lightning-block-sync
+          cargo test --verbose --color always --features rest-client
+          cargo test --verbose --color always --features rpc-client
+          cargo test --verbose --color always --features rpc-client,rest-client
+          cargo test --verbose --color always --features rpc-client,rest-client,tokio
+          cd ..
+      - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
+        if: matrix.coverage
+        run: |
+          cd lightning-block-sync
+          RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rest-client
+          RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client
+          RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client
+          RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client,tokio
+          cd ..
       - name: Install deps for kcov
         if: matrix.coverage
         run: |
index c43e7927581432e9834aed21a16329aba92f80b3..96f4b1d17700ec1cfee63c2a282b768b080ba053 100644 (file)
@@ -2,6 +2,7 @@
 
 members = [
     "lightning",
+    "lightning-block-sync",
     "lightning-net-tokio",
     "lightning-persister",
 ]
diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml
new file mode 100644 (file)
index 0000000..aec6d14
--- /dev/null
@@ -0,0 +1,25 @@
+[package]
+name = "lightning-block-sync"
+version = "0.0.1"
+authors = ["Jeffrey Czyz", "Matt Corallo"]
+license = "Apache-2.0"
+edition = "2018"
+description = """
+Utilities to fetch the chain data from a block source and feed them into Rust Lightning.
+"""
+
+[features]
+rest-client = [ "serde", "serde_json", "chunked_transfer" ]
+rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
+
+[dependencies]
+bitcoin = "0.24"
+lightning = { version = "0.0.12", path = "../lightning" }
+tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true }
+serde = { version = "1.0", features = ["derive"], optional = true }
+serde_json = { version = "1.0", optional = true }
+chunked_transfer = { version = "1.4", optional = true }
+futures = { version = "0.3" }
+
+[dev-dependencies]
+tokio = { version = "1.0", features = [ "macros", "rt" ] }
diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs
new file mode 100644 (file)
index 0000000..37b2c43
--- /dev/null
@@ -0,0 +1,472 @@
+use crate::{BlockHeaderData, BlockSourceError};
+use crate::http::{BinaryResponse, JsonResponse};
+use crate::utils::hex_to_uint256;
+
+use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::consensus::encode;
+use bitcoin::hash_types::{BlockHash, TxMerkleNode};
+use bitcoin::hashes::hex::{ToHex, FromHex};
+
+use serde::Deserialize;
+
+use serde_json;
+
+use std::convert::From;
+use std::convert::TryFrom;
+use std::convert::TryInto;
+
+/// Conversion from `std::io::Error` into `BlockSourceError`.
+impl From<std::io::Error> for BlockSourceError {
+       fn from(e: std::io::Error) -> BlockSourceError {
+               match e.kind() {
+                       std::io::ErrorKind::InvalidData => BlockSourceError::persistent(e),
+                       std::io::ErrorKind::InvalidInput => BlockSourceError::persistent(e),
+                       _ => BlockSourceError::transient(e),
+               }
+       }
+}
+
+/// Parses binary data as a block.
+impl TryInto<Block> for BinaryResponse {
+       type Error = std::io::Error;
+
+       fn try_into(self) -> std::io::Result<Block> {
+               match encode::deserialize(&self.0) {
+                       Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")),
+                       Ok(block) => Ok(block),
+               }
+       }
+}
+
+/// Converts a JSON value into block header data. The JSON value may be an object representing a
+/// block header or an array of such objects. In the latter case, the first object is converted.
+impl TryInto<BlockHeaderData> for JsonResponse {
+       type Error = std::io::Error;
+
+       fn try_into(self) -> std::io::Result<BlockHeaderData> {
+               let mut header = match self.0 {
+                       serde_json::Value::Array(mut array) if !array.is_empty() => array.drain(..).next().unwrap(),
+                       serde_json::Value::Object(_) => self.0,
+                       _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected JSON type")),
+               };
+
+               if !header.is_object() {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
+               }
+
+               // Add an empty previousblockhash for the genesis block.
+               if let None = header.get("previousblockhash") {
+                       let hash: BlockHash = Default::default();
+                       header.as_object_mut().unwrap().insert("previousblockhash".to_string(), serde_json::json!(hash.to_hex()));
+               }
+
+               match serde_json::from_value::<GetHeaderResponse>(header) {
+                       Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header response")),
+                       Ok(response) => match response.try_into() {
+                               Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header data")),
+                               Ok(header) => Ok(header),
+                       },
+               }
+       }
+}
+
+/// Response data from `getblockheader` RPC and `headers` REST requests.
+#[derive(Deserialize)]
+struct GetHeaderResponse {
+       pub version: i32,
+       pub merkleroot: String,
+       pub time: u32,
+       pub nonce: u32,
+       pub bits: String,
+       pub previousblockhash: String,
+
+       pub chainwork: String,
+       pub height: u32,
+}
+
+/// Converts from `GetHeaderResponse` to `BlockHeaderData`.
+impl TryFrom<GetHeaderResponse> for BlockHeaderData {
+       type Error = bitcoin::hashes::hex::Error;
+
+       fn try_from(response: GetHeaderResponse) -> Result<Self, bitcoin::hashes::hex::Error> {
+               Ok(BlockHeaderData {
+                       header: BlockHeader {
+                               version: response.version,
+                               prev_blockhash: BlockHash::from_hex(&response.previousblockhash)?,
+                               merkle_root: TxMerkleNode::from_hex(&response.merkleroot)?,
+                               time: response.time,
+                               bits: u32::from_be_bytes(<[u8; 4]>::from_hex(&response.bits)?),
+                               nonce: response.nonce,
+                       },
+                       chainwork: hex_to_uint256(&response.chainwork)?,
+                       height: response.height,
+               })
+       }
+}
+
+
+/// Converts a JSON value into a block. Assumes the block is hex-encoded in a JSON string.
+impl TryInto<Block> for JsonResponse {
+       type Error = std::io::Error;
+
+       fn try_into(self) -> std::io::Result<Block> {
+               match self.0.as_str() {
+                       None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")),
+                       Some(hex_data) => match Vec::<u8>::from_hex(hex_data) {
+                               Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")),
+                               Ok(block_data) => match encode::deserialize(&block_data) {
+                                       Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")),
+                                       Ok(block) => Ok(block),
+                               },
+                       },
+               }
+       }
+}
+
+/// Converts a JSON value into the best block hash and optional height.
+impl TryInto<(BlockHash, Option<u32>)> for JsonResponse {
+       type Error = std::io::Error;
+
+       fn try_into(self) -> std::io::Result<(BlockHash, Option<u32>)> {
+               if !self.0.is_object() {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
+               }
+
+               let hash = match &self.0["bestblockhash"] {
+                       serde_json::Value::String(hex_data) => match BlockHash::from_hex(&hex_data) {
+                               Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")),
+                               Ok(block_hash) => block_hash,
+                       },
+                       _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")),
+               };
+
+               let height = match &self.0["blocks"] {
+                       serde_json::Value::Null => None,
+                       serde_json::Value::Number(height) => match height.as_u64() {
+                               None => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")),
+                               Some(height) => match height.try_into() {
+                                       Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")),
+                                       Ok(height) => Some(height),
+                               }
+                       },
+                       _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON number")),
+               };
+
+               Ok((hash, height))
+       }
+}
+
+#[cfg(test)]
+pub(crate) mod tests {
+       use super::*;
+       use bitcoin::blockdata::constants::genesis_block;
+       use bitcoin::consensus::encode;
+       use bitcoin::network::constants::Network;
+
+       /// Converts from `BlockHeaderData` into a `GetHeaderResponse` JSON value.
+       impl From<BlockHeaderData> for serde_json::Value {
+               fn from(data: BlockHeaderData) -> Self {
+                       let BlockHeaderData { chainwork, height, header } = data;
+                       serde_json::json!({
+                               "chainwork": chainwork.to_string()["0x".len()..],
+                               "height": height,
+                               "version": header.version,
+                               "merkleroot": header.merkle_root.to_hex(),
+                               "time": header.time,
+                               "nonce": header.nonce,
+                               "bits": header.bits.to_hex(),
+                               "previousblockhash": header.prev_blockhash.to_hex(),
+                       })
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_unexpected_type() {
+               let response = JsonResponse(serde_json::json!(42));
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "unexpected JSON type");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_unexpected_header_type() {
+               let response = JsonResponse(serde_json::json!([42]));
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_invalid_header_response() {
+               let block = genesis_block(Network::Bitcoin);
+               let mut response = JsonResponse(BlockHeaderData {
+                       chainwork: block.header.work(),
+                       height: 0,
+                       header: block.header
+               }.into());
+               response.0["chainwork"].take();
+
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid header response");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_invalid_header_data() {
+               let block = genesis_block(Network::Bitcoin);
+               let mut response = JsonResponse(BlockHeaderData {
+                       chainwork: block.header.work(),
+                       height: 0,
+                       header: block.header
+               }.into());
+               response.0["chainwork"] = serde_json::json!("foobar");
+
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_valid_header() {
+               let block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(BlockHeaderData {
+                       chainwork: block.header.work(),
+                       height: 0,
+                       header: block.header
+               }.into());
+
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(data) => {
+                               assert_eq!(data.chainwork, block.header.work());
+                               assert_eq!(data.height, 0);
+                               assert_eq!(data.header, block.header);
+                       },
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_with_valid_header_array() {
+               let genesis_block = genesis_block(Network::Bitcoin);
+               let best_block_header = BlockHeader {
+                       prev_blockhash: genesis_block.block_hash(),
+                       ..genesis_block.header
+               };
+               let chainwork = genesis_block.header.work() + best_block_header.work();
+               let response = JsonResponse(serde_json::json!([
+                               serde_json::Value::from(BlockHeaderData {
+                                       chainwork, height: 1, header: best_block_header,
+                               }),
+                               serde_json::Value::from(BlockHeaderData {
+                                       chainwork: genesis_block.header.work(), height: 0, header: genesis_block.header,
+                               }),
+               ]));
+
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(data) => {
+                               assert_eq!(data.chainwork, chainwork);
+                               assert_eq!(data.height, 1);
+                               assert_eq!(data.header, best_block_header);
+                       },
+               }
+       }
+
+       #[test]
+       fn into_block_header_from_json_response_without_previous_block_hash() {
+               let block = genesis_block(Network::Bitcoin);
+               let mut response = JsonResponse(BlockHeaderData {
+                       chainwork: block.header.work(),
+                       height: 0,
+                       header: block.header
+               }.into());
+               response.0.as_object_mut().unwrap().remove("previousblockhash");
+
+               match TryInto::<BlockHeaderData>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(BlockHeaderData { chainwork: _, height: _, header }) => {
+                               assert_eq!(header, block.header);
+                       },
+               }
+       }
+
+       #[test]
+       fn into_block_from_invalid_binary_response() {
+               let response = BinaryResponse(b"foo".to_vec());
+               match TryInto::<Block>::try_into(response) {
+                       Err(_) => {},
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_from_valid_binary_response() {
+               let genesis_block = genesis_block(Network::Bitcoin);
+               let response = BinaryResponse(encode::serialize(&genesis_block));
+               match TryInto::<Block>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(block) => assert_eq!(block, genesis_block),
+               }
+       }
+
+       #[test]
+       fn into_block_from_json_response_with_unexpected_type() {
+               let response = JsonResponse(serde_json::json!({ "result": "foo" }));
+               match TryInto::<Block>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_from_json_response_with_invalid_hex_data() {
+               let response = JsonResponse(serde_json::json!("foobar"));
+               match TryInto::<Block>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_from_json_response_with_invalid_block_data() {
+               let response = JsonResponse(serde_json::json!("abcd"));
+               match TryInto::<Block>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid block data");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_from_json_response_with_valid_block_data() {
+               let genesis_block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(serde_json::json!(encode::serialize_hex(&genesis_block)));
+               match TryInto::<Block>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(block) => assert_eq!(block, genesis_block),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_unexpected_type() {
+               let response = JsonResponse(serde_json::json!("foo"));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_unexpected_bestblockhash_type() {
+               let response = JsonResponse(serde_json::json!({ "bestblockhash": 42 }));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_invalid_hex_data() {
+               let response = JsonResponse(serde_json::json!({ "bestblockhash": "foobar"} ));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_without_height() {
+               let block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(serde_json::json!({
+                       "bestblockhash": block.block_hash().to_hex(),
+               }));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok((hash, height)) => {
+                               assert_eq!(hash, block.block_hash());
+                               assert!(height.is_none());
+                       },
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_unexpected_blocks_type() {
+               let block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(serde_json::json!({
+                       "bestblockhash": block.block_hash().to_hex(),
+                       "blocks": "foo",
+               }));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON number");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_invalid_height() {
+               let block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(serde_json::json!({
+                       "bestblockhash": block.block_hash().to_hex(),
+                       "blocks": std::u64::MAX,
+               }));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid height");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn into_block_hash_from_json_response_with_height() {
+               let block = genesis_block(Network::Bitcoin);
+               let response = JsonResponse(serde_json::json!({
+                       "bestblockhash": block.block_hash().to_hex(),
+                       "blocks": 1,
+               }));
+               match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok((hash, height)) => {
+                               assert_eq!(hash, block.block_hash());
+                               assert_eq!(height.unwrap(), 1);
+                       },
+               }
+       }
+}
diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs
new file mode 100644 (file)
index 0000000..0788bc9
--- /dev/null
@@ -0,0 +1,767 @@
+use chunked_transfer;
+use serde_json;
+
+use std::convert::TryFrom;
+#[cfg(not(feature = "tokio"))]
+use std::io::Write;
+use std::net::ToSocketAddrs;
+use std::time::Duration;
+
+#[cfg(feature = "tokio")]
+use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
+#[cfg(feature = "tokio")]
+use tokio::net::TcpStream;
+
+#[cfg(not(feature = "tokio"))]
+use std::io::BufRead;
+use std::io::Read;
+#[cfg(not(feature = "tokio"))]
+use std::net::TcpStream;
+
+/// Timeout for operations on TCP streams.
+const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
+
+/// Maximum HTTP message header size in bytes.
+const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
+
+/// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
+/// overhead for HTTP chunked transfer encoding.
+const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
+
+/// Endpoint for interacting with an HTTP-based API.
+#[derive(Debug)]
+pub struct HttpEndpoint {
+       host: String,
+       port: Option<u16>,
+       path: String,
+}
+
+impl HttpEndpoint {
+       /// Creates an endpoint for the given host and default HTTP port.
+       pub fn for_host(host: String) -> Self {
+               Self {
+                       host,
+                       port: None,
+                       path: String::from("/"),
+               }
+       }
+
+       /// Specifies a port to use with the endpoint.
+       pub fn with_port(mut self, port: u16) -> Self {
+               self.port = Some(port);
+               self
+       }
+
+       /// Specifies a path to use with the endpoint.
+       pub fn with_path(mut self, path: String) -> Self {
+               self.path = path;
+               self
+       }
+
+       /// Returns the endpoint host.
+       pub fn host(&self) -> &str {
+               &self.host
+       }
+
+       /// Returns the endpoint port.
+       pub fn port(&self) -> u16 {
+               match self.port {
+                       None => 80,
+                       Some(port) => port,
+               }
+       }
+
+       /// Returns the endpoint path.
+       pub fn path(&self) -> &str {
+               &self.path
+       }
+}
+
+impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
+       type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
+
+       fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
+               (self.host(), self.port()).to_socket_addrs()
+       }
+}
+
+/// Client for making HTTP requests.
+pub(crate) struct HttpClient {
+       stream: TcpStream,
+}
+
+impl HttpClient {
+       /// Opens a connection to an HTTP endpoint.
+       pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
+               let address = match endpoint.to_socket_addrs()?.next() {
+                       None => {
+                               return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
+                       },
+                       Some(address) => address,
+               };
+               let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
+               stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
+               stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
+
+               #[cfg(feature = "tokio")]
+               let stream = {
+                       stream.set_nonblocking(true)?;
+                       TcpStream::from_std(stream)?
+               };
+
+               Ok(Self { stream })
+       }
+
+       /// Sends a `GET` request for a resource identified by `uri` at the `host`.
+       ///
+       /// Returns the response body in `F` format.
+       #[allow(dead_code)]
+       pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
+       where F: TryFrom<Vec<u8>, Error = std::io::Error> {
+               let request = format!(
+                       "GET {} HTTP/1.1\r\n\
+                        Host: {}\r\n\
+                        Connection: keep-alive\r\n\
+                        \r\n", uri, host);
+               let response_body = self.send_request_with_retry(&request).await?;
+               F::try_from(response_body)
+       }
+
+       /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
+       /// authentication credentials.
+       ///
+       /// The request body consists of the provided JSON `content`. Returns the response body in `F`
+       /// format.
+       #[allow(dead_code)]
+       pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
+       where F: TryFrom<Vec<u8>, Error = std::io::Error> {
+               let content = content.to_string();
+               let request = format!(
+                       "POST {} HTTP/1.1\r\n\
+                        Host: {}\r\n\
+                        Authorization: {}\r\n\
+                        Connection: keep-alive\r\n\
+                        Content-Type: application/json\r\n\
+                        Content-Length: {}\r\n\
+                        \r\n\
+                        {}", uri, host, auth, content.len(), content);
+               let response_body = self.send_request_with_retry(&request).await?;
+               F::try_from(response_body)
+       }
+
+       /// Sends an HTTP request message and reads the response, returning its body. Attempts to
+       /// reconnect and retry if the connection has been closed.
+       async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
+               let endpoint = self.stream.peer_addr().unwrap();
+               match self.send_request(request).await {
+                       Ok(bytes) => Ok(bytes),
+                       Err(e) => match e.kind() {
+                               std::io::ErrorKind::ConnectionReset |
+                               std::io::ErrorKind::ConnectionAborted |
+                               std::io::ErrorKind::UnexpectedEof => {
+                                       // Reconnect if the connection was closed. This may happen if the server's
+                                       // keep-alive limits are reached.
+                                       *self = Self::connect(endpoint)?;
+                                       self.send_request(request).await
+                               },
+                               _ => Err(e),
+                       },
+               }
+       }
+
+       /// Sends an HTTP request message and reads the response, returning its body.
+       async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
+               self.write_request(request).await?;
+               self.read_response().await
+       }
+
+       /// Writes an HTTP request message.
+       async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
+               #[cfg(feature = "tokio")]
+               {
+                       self.stream.write_all(request.as_bytes()).await?;
+                       self.stream.flush().await
+               }
+               #[cfg(not(feature = "tokio"))]
+               {
+                       self.stream.write_all(request.as_bytes())?;
+                       self.stream.flush()
+               }
+       }
+
+       /// Reads an HTTP response message.
+       async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
+               #[cfg(feature = "tokio")]
+               let stream = self.stream.split().0;
+               #[cfg(not(feature = "tokio"))]
+               let stream = std::io::Read::by_ref(&mut self.stream);
+
+               let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
+
+               #[cfg(feature = "tokio")]
+               let mut reader = tokio::io::BufReader::new(limited_stream);
+               #[cfg(not(feature = "tokio"))]
+               let mut reader = std::io::BufReader::new(limited_stream);
+
+               macro_rules! read_line { () => { {
+                       let mut line = String::new();
+                       #[cfg(feature = "tokio")]
+                       let bytes_read = reader.read_line(&mut line).await?;
+                       #[cfg(not(feature = "tokio"))]
+                       let bytes_read = reader.read_line(&mut line)?;
+
+                       match bytes_read {
+                               0 => None,
+                               _ => {
+                                       // Remove trailing CRLF
+                                       if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
+                                       Some(line)
+                               },
+                       }
+               } } }
+
+               // Read and parse status line
+               let status_line = read_line!()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
+               let status = HttpStatus::parse(&status_line)?;
+
+               // Read and parse relevant headers
+               let mut message_length = HttpMessageLength::Empty;
+               loop {
+                       let line = read_line!()
+                               .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
+                       if line.is_empty() { break; }
+
+                       let header = HttpHeader::parse(&line)?;
+                       if header.has_name("Content-Length") {
+                               let length = header.value.parse()
+                                       .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
+                               if let HttpMessageLength::Empty = message_length {
+                                       message_length = HttpMessageLength::ContentLength(length);
+                               }
+                               continue;
+                       }
+
+                       if header.has_name("Transfer-Encoding") {
+                               message_length = HttpMessageLength::TransferEncoding(header.value.into());
+                               continue;
+                       }
+               }
+
+               if !status.is_ok() {
+                       // TODO: Handle 3xx redirection responses.
+                       return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found"));
+               }
+
+               // Read message body
+               let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
+               reader.get_mut().set_limit(read_limit as u64);
+               match message_length {
+                       HttpMessageLength::Empty => { Ok(Vec::new()) },
+                       HttpMessageLength::ContentLength(length) => {
+                               if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
+                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
+                               } else {
+                                       let mut content = vec![0; length];
+                                       #[cfg(feature = "tokio")]
+                                       reader.read_exact(&mut content[..]).await?;
+                                       #[cfg(not(feature = "tokio"))]
+                                       reader.read_exact(&mut content[..])?;
+                                       Ok(content)
+                               }
+                       },
+                       HttpMessageLength::TransferEncoding(coding) => {
+                               if !coding.eq_ignore_ascii_case("chunked") {
+                                       Err(std::io::Error::new(
+                                                       std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
+                               } else {
+                                       let mut content = Vec::new();
+                                       #[cfg(feature = "tokio")]
+                                       {
+                                               // Since chunked_transfer doesn't have an async interface, only use it to
+                                               // determine the size of each chunk to read.
+                                               //
+                                               // TODO: Replace with an async interface when available.
+                                               // https://github.com/frewsxcv/rust-chunked-transfer/issues/7
+                                               loop {
+                                                       // Read the chunk header which contains the chunk size.
+                                                       let mut chunk_header = String::new();
+                                                       reader.read_line(&mut chunk_header).await?;
+                                                       if chunk_header == "0\r\n" {
+                                                               // Read the terminator chunk since the decoder consumes the CRLF
+                                                               // immediately when this chunk is encountered.
+                                                               reader.read_line(&mut chunk_header).await?;
+                                                       }
+
+                                                       // Decode the chunk header to obtain the chunk size.
+                                                       let mut buffer = Vec::new();
+                                                       let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
+                                                       decoder.read_to_end(&mut buffer)?;
+
+                                                       // Read the chunk body.
+                                                       let chunk_size = match decoder.remaining_chunks_size() {
+                                                               None => break,
+                                                               Some(chunk_size) => chunk_size,
+                                                       };
+                                                       let chunk_offset = content.len();
+                                                       content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
+                                                       reader.read_exact(&mut content[chunk_offset..]).await?;
+                                                       content.resize(chunk_offset + chunk_size, 0);
+                                               }
+                                               Ok(content)
+                                       }
+                                       #[cfg(not(feature = "tokio"))]
+                                       {
+                                               let mut decoder = chunked_transfer::Decoder::new(reader);
+                                               decoder.read_to_end(&mut content)?;
+                                               Ok(content)
+                                       }
+                               }
+                       },
+               }
+       }
+}
+
+/// HTTP response status code as defined by [RFC 7231].
+///
+/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
+struct HttpStatus<'a> {
+       code: &'a str,
+}
+
+impl<'a> HttpStatus<'a> {
+       /// Parses an HTTP status line as defined by [RFC 7230].
+       ///
+       /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
+       fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
+               let mut tokens = line.splitn(3, ' ');
+
+               let http_version = tokens.next()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
+               if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
+                       !http_version.eq_ignore_ascii_case("HTTP/1.0") {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
+               }
+
+               let code = tokens.next()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
+               if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
+               }
+
+               let _reason = tokens.next()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
+
+               Ok(Self { code })
+       }
+
+       /// Returns whether the status is successful (i.e., 2xx status class).
+       fn is_ok(&self) -> bool {
+               self.code.starts_with('2')
+       }
+}
+
+/// HTTP response header as defined by [RFC 7231].
+///
+/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
+struct HttpHeader<'a> {
+       name: &'a str,
+       value: &'a str,
+}
+
+impl<'a> HttpHeader<'a> {
+       /// Parses an HTTP header field as defined by [RFC 7230].
+       ///
+       /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
+       fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
+               let mut tokens = line.splitn(2, ':');
+               let name = tokens.next()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
+               let value = tokens.next()
+                       .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
+                       .trim_start();
+               Ok(Self { name, value })
+       }
+
+       /// Returns whether the header field has the given name.
+       fn has_name(&self, name: &str) -> bool {
+               self.name.eq_ignore_ascii_case(name)
+       }
+}
+
+/// HTTP message body length as defined by [RFC 7230].
+///
+/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
+enum HttpMessageLength {
+       Empty,
+       ContentLength(usize),
+       TransferEncoding(String),
+}
+
+/// An HTTP response body in binary format.
+pub(crate) struct BinaryResponse(pub(crate) Vec<u8>);
+
+/// An HTTP response body in JSON format.
+pub(crate) struct JsonResponse(pub(crate) serde_json::Value);
+
+/// Interprets bytes from an HTTP response body as binary data.
+impl TryFrom<Vec<u8>> for BinaryResponse {
+       type Error = std::io::Error;
+
+       fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
+               Ok(BinaryResponse(bytes))
+       }
+}
+
+/// Interprets bytes from an HTTP response body as a JSON value.
+impl TryFrom<Vec<u8>> for JsonResponse {
+       type Error = std::io::Error;
+
+       fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
+               Ok(JsonResponse(serde_json::from_slice(&bytes)?))
+       }
+}
+
+#[cfg(test)]
+mod endpoint_tests {
+       use super::HttpEndpoint;
+
+       #[test]
+       fn with_default_port() {
+               let endpoint = HttpEndpoint::for_host("foo.com".into());
+               assert_eq!(endpoint.host(), "foo.com");
+               assert_eq!(endpoint.port(), 80);
+       }
+
+       #[test]
+       fn with_custom_port() {
+               let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
+               assert_eq!(endpoint.host(), "foo.com");
+               assert_eq!(endpoint.port(), 8080);
+       }
+
+       #[test]
+       fn with_uri_path() {
+               let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
+               assert_eq!(endpoint.host(), "foo.com");
+               assert_eq!(endpoint.path(), "/path");
+       }
+
+       #[test]
+       fn without_uri_path() {
+               let endpoint = HttpEndpoint::for_host("foo.com".into());
+               assert_eq!(endpoint.host(), "foo.com");
+               assert_eq!(endpoint.path(), "/");
+       }
+
+       #[test]
+       fn convert_to_socket_addrs() {
+               let endpoint = HttpEndpoint::for_host("foo.com".into());
+               let host = endpoint.host();
+               let port = endpoint.port();
+
+               use std::net::ToSocketAddrs;
+               match (&endpoint).to_socket_addrs() {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(mut socket_addrs) => {
+                               match socket_addrs.next() {
+                                       None => panic!("Expected socket address"),
+                                       Some(addr) => {
+                                               assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap());
+                                               assert!(socket_addrs.next().is_none());
+                                       }
+                               }
+                       }
+               }
+       }
+}
+
+#[cfg(test)]
+pub(crate) mod client_tests {
+       use super::*;
+       use std::io::BufRead;
+       use std::io::Write;
+
+       /// Server for handling HTTP client requests with a stock response.
+       pub struct HttpServer {
+               address: std::net::SocketAddr,
+               handler: std::thread::JoinHandle<()>,
+               shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
+       }
+
+       /// Body of HTTP response messages.
+       pub enum MessageBody<T: ToString> {
+               Empty,
+               Content(T),
+               ChunkedContent(T),
+       }
+
+       impl HttpServer {
+               pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
+                       let response = match body {
+                               MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
+                               MessageBody::Content(body) => {
+                                       let body = body.to_string();
+                                       format!(
+                                               "HTTP/1.1 200 OK\r\n\
+                                                Content-Length: {}\r\n\
+                                                \r\n\
+                                                {}", body.len(), body)
+                               },
+                               MessageBody::ChunkedContent(body) => {
+                                       let mut chuncked_body = Vec::new();
+                                       {
+                                               use chunked_transfer::Encoder;
+                                               let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
+                                               encoder.write_all(body.to_string().as_bytes()).unwrap();
+                                       }
+                                       format!(
+                                               "HTTP/1.1 200 OK\r\n\
+                                                Transfer-Encoding: chunked\r\n\
+                                                \r\n\
+                                                {}", String::from_utf8(chuncked_body).unwrap())
+                               },
+                       };
+                       HttpServer::responding_with(response)
+               }
+
+               pub fn responding_with_not_found() -> Self {
+                       let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
+                       HttpServer::responding_with(response)
+               }
+
+               fn responding_with(response: String) -> Self {
+                       let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
+                       let address = listener.local_addr().unwrap();
+
+                       let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
+                       let shutdown_signaled = std::sync::Arc::clone(&shutdown);
+                       let handler = std::thread::spawn(move || {
+                               for stream in listener.incoming() {
+                                       let mut stream = stream.unwrap();
+                                       stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
+
+                                       let lines_read = std::io::BufReader::new(&stream)
+                                               .lines()
+                                               .take_while(|line| !line.as_ref().unwrap().is_empty())
+                                               .count();
+                                       if lines_read == 0 { continue; }
+
+                                       for chunk in response.as_bytes().chunks(16) {
+                                               if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
+                                                       return;
+                                               } else {
+                                                       if let Err(_) = stream.write(chunk) { break; }
+                                                       if let Err(_) = stream.flush() { break; }
+                                               }
+                                       }
+                               }
+                       });
+
+                       Self { address, handler, shutdown }
+               }
+
+               fn shutdown(self) {
+                       self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
+                       self.handler.join().unwrap();
+               }
+
+               pub fn endpoint(&self) -> HttpEndpoint {
+                       HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
+               }
+       }
+
+       #[test]
+       fn connect_to_unresolvable_host() {
+               match HttpClient::connect(("example.invalid", 80)) {
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn connect_with_no_socket_address() {
+               match HttpClient::connect(&vec![][..]) {
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn connect_with_unknown_server() {
+               match HttpClient::connect(("::", 80)) {
+                       #[cfg(target_os = "windows")]
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
+                       #[cfg(not(target_os = "windows"))]
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn connect_with_valid_endpoint() {
+               let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
+
+               match HttpClient::connect(&server.endpoint()) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(_) => {},
+               }
+       }
+
+       #[tokio::test]
+       async fn read_empty_message() {
+               let server = HttpServer::responding_with("".to_string());
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_incomplete_message() {
+               let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_too_large_message_headers() {
+               let response = format!(
+                       "HTTP/1.1 302 Found\r\n\
+                        Location: {}\r\n\
+                        \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
+               let server = HttpServer::responding_with(response);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_too_large_message_body() {
+               let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
+               let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "out of range");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+               server.shutdown();
+       }
+
+       #[tokio::test]
+       async fn read_message_with_unsupported_transfer_coding() {
+               let response = String::from(
+                       "HTTP/1.1 200 OK\r\n\
+                        Transfer-Encoding: gzip\r\n\
+                        \r\n\
+                        foobar");
+               let server = HttpServer::responding_with(response);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_empty_message_body() {
+               let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_message_body_with_length() {
+               let body = "foo bar baz qux".repeat(32);
+               let content = MessageBody::Content(body.clone());
+               let server = HttpServer::responding_with_ok::<String>(content);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
+               }
+       }
+
+       #[tokio::test]
+       async fn read_chunked_message_body() {
+               let body = "foo bar baz qux".repeat(32);
+               let chunked_content = MessageBody::ChunkedContent(body.clone());
+               let server = HttpServer::responding_with_ok::<String>(chunked_content);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
+               }
+       }
+
+       #[tokio::test]
+       async fn reconnect_closed_connection() {
+               let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
+               match client.get::<BinaryResponse>("/foo", "foo.com").await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
+               }
+       }
+
+       #[test]
+       fn from_bytes_into_binary_response() {
+               let bytes = b"foo";
+               match BinaryResponse::try_from(bytes.to_vec()) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(response) => assert_eq!(&response.0, bytes),
+               }
+       }
+
+       #[test]
+       fn from_invalid_bytes_into_json_response() {
+               let json = serde_json::json!({ "result": 42 });
+               match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
+                       Err(_) => {},
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[test]
+       fn from_valid_bytes_into_json_response() {
+               let json = serde_json::json!({ "result": 42 });
+               match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(response) => assert_eq!(response.0, json),
+               }
+       }
+}
diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs
new file mode 100644 (file)
index 0000000..58f77bd
--- /dev/null
@@ -0,0 +1,126 @@
+//! A lightweight client for keeping in sync with chain activity.
+//!
+//! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers
+//! and data.
+//!
+//! Enabling feature `rest-client` or `rpc-client` allows configuring the client to fetch blocks
+//! using Bitcoin Core's REST or RPC interface, respectively.
+//!
+//! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`,
+//! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime.
+//!
+//! [`BlockSource`]: trait.BlockSource.html
+
+#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
+pub mod http;
+
+#[cfg(feature = "rest-client")]
+pub mod rest;
+
+#[cfg(feature = "rpc-client")]
+pub mod rpc;
+
+#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
+mod convert;
+
+#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
+mod utils;
+
+use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::hash_types::BlockHash;
+use bitcoin::util::uint::Uint256;
+
+use std::future::Future;
+use std::pin::Pin;
+
+/// Abstract type for retrieving block headers and data.
+pub trait BlockSource : Sync + Send {
+       /// Returns the header for a given hash. A height hint may be provided in case a block source
+       /// cannot easily find headers based on a hash. This is merely a hint and thus the returned
+       /// header must have the same hash as was requested. Otherwise, an error must be returned.
+       ///
+       /// Implementations that cannot find headers based on the hash should return a `Transient` error
+       /// when `height_hint` is `None`.
+       fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
+
+       /// Returns the block for a given hash. A headers-only block source should return a `Transient`
+       /// error.
+       fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
+
+       // TODO: Phrase in terms of `Poll` once added.
+       /// Returns the hash of the best block and, optionally, its height. When polling a block source,
+       /// the height is passed to `get_header` to allow for a more efficient lookup.
+       fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
+}
+
+/// Result type for `BlockSource` requests.
+type BlockSourceResult<T> = Result<T, BlockSourceError>;
+
+// TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details,
+// see: https://areweasyncyet.rs.
+/// Result type for asynchronous `BlockSource` requests.
+type AsyncBlockSourceResult<'a, T> = Pin<Box<dyn Future<Output = BlockSourceResult<T>> + 'a + Send>>;
+
+/// Error type for `BlockSource` requests.
+///
+/// Transient errors may be resolved when re-polling, but no attempt will be made to re-poll on
+/// persistent errors.
+pub struct BlockSourceError {
+       kind: BlockSourceErrorKind,
+       error: Box<dyn std::error::Error + Send + Sync>,
+}
+
+/// The kind of `BlockSourceError`, either persistent or transient.
+#[derive(Clone, Copy)]
+pub enum BlockSourceErrorKind {
+       /// Indicates an error that won't resolve when retrying a request (e.g., invalid data).
+       Persistent,
+
+       /// Indicates an error that may resolve when retrying a request (e.g., unresponsive).
+       Transient,
+}
+
+impl BlockSourceError {
+       /// Creates a new persistent error originated from the given error.
+       pub fn persistent<E>(error: E) -> Self
+       where E: Into<Box<dyn std::error::Error + Send + Sync>> {
+               Self {
+                       kind: BlockSourceErrorKind::Persistent,
+                       error: error.into(),
+               }
+       }
+
+       /// Creates a new transient error originated from the given error.
+       pub fn transient<E>(error: E) -> Self
+       where E: Into<Box<dyn std::error::Error + Send + Sync>> {
+               Self {
+                       kind: BlockSourceErrorKind::Transient,
+                       error: error.into(),
+               }
+       }
+
+       /// Returns the kind of error.
+       pub fn kind(&self) -> BlockSourceErrorKind {
+               self.kind
+       }
+
+       /// Converts the error into the underlying error.
+       pub fn into_inner(self) -> Box<dyn std::error::Error + Send + Sync> {
+               self.error
+       }
+}
+
+/// A block header and some associated data. This information should be available from most block
+/// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces).
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub struct BlockHeaderData {
+       /// The block header itself.
+       pub header: BlockHeader,
+
+       /// The block height where the genesis block has height 0.
+       pub height: u32,
+
+       /// The total chain work in expected number of double-SHA256 hashes required to build a chain
+       /// of equivalent weight.
+       pub chainwork: Uint256,
+}
diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs
new file mode 100644 (file)
index 0000000..3c2e76e
--- /dev/null
@@ -0,0 +1,110 @@
+use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
+use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse};
+
+use bitcoin::blockdata::block::Block;
+use bitcoin::hash_types::BlockHash;
+use bitcoin::hashes::hex::ToHex;
+
+use std::convert::TryFrom;
+use std::convert::TryInto;
+
+/// A simple REST client for requesting resources using HTTP `GET`.
+pub struct RestClient {
+       endpoint: HttpEndpoint,
+       client: HttpClient,
+}
+
+impl RestClient {
+       /// Creates a new REST client connected to the given endpoint.
+       ///
+       /// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
+       pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
+               let client = HttpClient::connect(&endpoint)?;
+               Ok(Self { endpoint, client })
+       }
+
+       /// Requests a resource encoded in `F` format and interpreted as type `T`.
+       async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
+       where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
+               let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
+               let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
+               self.client.get::<F>(&uri, &host).await?.try_into()
+       }
+}
+
+impl BlockSource for RestClient {
+       fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+               Box::pin(async move {
+                       let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
+                       Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
+               })
+       }
+
+       fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+               Box::pin(async move {
+                       let resource_path = format!("block/{}.bin", header_hash.to_hex());
+                       Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
+               })
+       }
+
+       fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+               Box::pin(async move {
+                       Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
+               })
+       }
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use crate::http::BinaryResponse;
+       use crate::http::client_tests::{HttpServer, MessageBody};
+
+       /// Parses binary data as a string-encoded `u32`.
+       impl TryInto<u32> for BinaryResponse {
+               type Error = std::io::Error;
+
+               fn try_into(self) -> std::io::Result<u32> {
+                       match std::str::from_utf8(&self.0) {
+                               Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
+                               Ok(s) => match u32::from_str_radix(s, 10) {
+                                       Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
+                                       Ok(n) => Ok(n),
+                               }
+                       }
+               }
+       }
+
+       #[tokio::test]
+       async fn request_unknown_resource() {
+               let server = HttpServer::responding_with_not_found();
+               let mut client = RestClient::new(server.endpoint()).unwrap();
+
+               match client.request_resource::<BinaryResponse, u32>("/").await {
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn request_malformed_resource() {
+               let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
+               let mut client = RestClient::new(server.endpoint()).unwrap();
+
+               match client.request_resource::<BinaryResponse, u32>("/").await {
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn request_valid_resource() {
+               let server = HttpServer::responding_with_ok(MessageBody::Content(42));
+               let mut client = RestClient::new(server.endpoint()).unwrap();
+
+               match client.request_resource::<BinaryResponse, u32>("/").await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(n) => assert_eq!(n, 42),
+               }
+       }
+}
diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs
new file mode 100644 (file)
index 0000000..34cbd2e
--- /dev/null
@@ -0,0 +1,197 @@
+use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
+use crate::http::{HttpClient, HttpEndpoint, JsonResponse};
+
+use bitcoin::blockdata::block::Block;
+use bitcoin::hash_types::BlockHash;
+use bitcoin::hashes::hex::ToHex;
+
+use serde_json;
+
+use std::convert::TryFrom;
+use std::convert::TryInto;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// A simple RPC client for calling methods using HTTP `POST`.
+pub struct RpcClient {
+       basic_auth: String,
+       endpoint: HttpEndpoint,
+       client: HttpClient,
+       id: AtomicUsize,
+}
+
+impl RpcClient {
+       /// Creates a new RPC client connected to the given endpoint with the provided credentials. The
+       /// credentials should be a base64 encoding of a user name and password joined by a colon, as is
+       /// required for HTTP basic access authentication.
+       pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
+               let client = HttpClient::connect(&endpoint)?;
+               Ok(Self {
+                       basic_auth: "Basic ".to_string() + credentials,
+                       endpoint,
+                       client,
+                       id: AtomicUsize::new(0),
+               })
+       }
+
+       /// Calls a method with the response encoded in JSON format and interpreted as type `T`.
+       async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
+       where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
+               let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
+               let uri = self.endpoint.path();
+               let content = serde_json::json!({
+                       "method": method,
+                       "params": params,
+                       "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
+               });
+
+               let mut response = self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content)
+                       .await?.0;
+               if !response.is_object() {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
+               }
+
+               let error = &response["error"];
+               if !error.is_null() {
+                       // TODO: Examine error code for a more precise std::io::ErrorKind.
+                       let message = error["message"].as_str().unwrap_or("unknown error");
+                       return Err(std::io::Error::new(std::io::ErrorKind::Other, message));
+               }
+
+               let result = &mut response["result"];
+               if result.is_null() {
+                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result"));
+               }
+
+               JsonResponse(result.take()).try_into()
+       }
+}
+
+impl BlockSource for RpcClient {
+       fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+               Box::pin(async move {
+                       let header_hash = serde_json::json!(header_hash.to_hex());
+                       Ok(self.call_method("getblockheader", &[header_hash]).await?)
+               })
+       }
+
+       fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+               Box::pin(async move {
+                       let header_hash = serde_json::json!(header_hash.to_hex());
+                       let verbosity = serde_json::json!(0);
+                       Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
+               })
+       }
+
+       fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+               Box::pin(async move {
+                       Ok(self.call_method("getblockchaininfo", &[]).await?)
+               })
+       }
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use crate::http::client_tests::{HttpServer, MessageBody};
+
+       /// Credentials encoded in base64.
+       const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA==";
+
+       /// Converts a JSON value into `u64`.
+       impl TryInto<u64> for JsonResponse {
+               type Error = std::io::Error;
+
+               fn try_into(self) -> std::io::Result<u64> {
+                       match self.0.as_u64() {
+                               None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")),
+                               Some(n) => Ok(n),
+                       }
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_unknown_response() {
+               let server = HttpServer::responding_with_not_found();
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               match client.call_method::<u64>("getblockcount", &[]).await {
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound),
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_malfomred_response() {
+               let response = serde_json::json!("foo");
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               match client.call_method::<u64>("getblockcount", &[]).await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_error() {
+               let response = serde_json::json!({
+                       "error": { "code": -8, "message": "invalid parameter" },
+               });
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               let invalid_block_hash = serde_json::json!("foo");
+               match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::Other);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_missing_result() {
+               let response = serde_json::json!({ "result": null });
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               match client.call_method::<u64>("getblockcount", &[]).await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_malformed_result() {
+               let response = serde_json::json!({ "result": "foo" });
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               match client.call_method::<u64>("getblockcount", &[]).await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
+                               assert_eq!(e.get_ref().unwrap().to_string(), "not a number");
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
+       #[tokio::test]
+       async fn call_method_returning_valid_result() {
+               let response = serde_json::json!({ "result": 654470 });
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+
+               match client.call_method::<u64>("getblockcount", &[]).await {
+                       Err(e) => panic!("Unexpected error: {:?}", e),
+                       Ok(count) => assert_eq!(count, 654470),
+               }
+       }
+}
diff --git a/lightning-block-sync/src/utils.rs b/lightning-block-sync/src/utils.rs
new file mode 100644 (file)
index 0000000..96a2e57
--- /dev/null
@@ -0,0 +1,54 @@
+use bitcoin::hashes::hex::FromHex;
+use bitcoin::util::uint::Uint256;
+
+pub fn hex_to_uint256(hex: &str) -> Result<Uint256, bitcoin::hashes::hex::Error> {
+       let bytes = <[u8; 32]>::from_hex(hex)?;
+       Ok(Uint256::from_be_bytes(bytes))
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use bitcoin::util::uint::Uint256;
+
+       #[test]
+       fn hex_to_uint256_empty_str() {
+               assert!(hex_to_uint256("").is_err());
+       }
+
+       #[test]
+       fn hex_to_uint256_too_short_str() {
+               let hex = String::from_utf8(vec![b'0'; 32]).unwrap();
+               assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 32)));
+       }
+
+       #[test]
+       fn hex_to_uint256_too_long_str() {
+               let hex = String::from_utf8(vec![b'0'; 128]).unwrap();
+               assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 128)));
+       }
+
+       #[test]
+       fn hex_to_uint256_odd_length_str() {
+               let hex = String::from_utf8(vec![b'0'; 65]).unwrap();
+               assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::OddLengthString(65)));
+       }
+
+       #[test]
+       fn hex_to_uint256_invalid_char() {
+               let hex = String::from_utf8(vec![b'G'; 64]).unwrap();
+               assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidChar(b'G')));
+       }
+
+       #[test]
+       fn hex_to_uint256_lowercase_str() {
+               let hex: String = std::iter::repeat("0123456789abcdef").take(4).collect();
+               assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4]));
+       }
+
+       #[test]
+       fn hex_to_uint256_uppercase_str() {
+               let hex: String = std::iter::repeat("0123456789ABCDEF").take(4).collect();
+               assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4]));
+       }
+}
index 8517e85d8c299436ad42d6bf2b8f27e399a55faa..289cdae4ed760eaeb7206e7d645c7a90bd55f07f 100644 (file)
@@ -602,9 +602,8 @@ pub struct ReplyChannelRange {
        pub first_blocknum: u32,
        /// The number of blocks included in the range of the reply
        pub number_of_blocks: u32,
-       /// Indicates if the query recipient maintains up-to-date channel
-       /// information for the chain_hash
-       pub full_information: bool,
+       /// True when this is the final reply for a query
+       pub sync_complete: bool,
        /// The short_channel_ids in the channel range
        pub short_channel_ids: Vec<u64>,
 }
@@ -1727,7 +1726,7 @@ impl Readable for ReplyChannelRange {
                let chain_hash: BlockHash = Readable::read(r)?;
                let first_blocknum: u32 = Readable::read(r)?;
                let number_of_blocks: u32 = Readable::read(r)?;
-               let full_information: bool = Readable::read(r)?;
+               let sync_complete: bool = Readable::read(r)?;
 
                // We expect the encoding_len to always includes the 1-byte
                // encoding_type and that short_channel_ids are 8-bytes each
@@ -1755,7 +1754,7 @@ impl Readable for ReplyChannelRange {
                        chain_hash,
                        first_blocknum,
                        number_of_blocks,
-                       full_information,
+                       sync_complete,
                        short_channel_ids
                })
        }
@@ -1768,7 +1767,7 @@ impl Writeable for ReplyChannelRange {
                self.chain_hash.write(w)?;
                self.first_blocknum.write(w)?;
                self.number_of_blocks.write(w)?;
-               self.full_information.write(w)?;
+               self.sync_complete.write(w)?;
 
                encoding_len.write(w)?;
                (EncodingType::Uncompressed as u8).write(w)?;
@@ -2569,7 +2568,7 @@ mod tests {
                        chain_hash: expected_chain_hash,
                        first_blocknum: 756230,
                        number_of_blocks: 1500,
-                       full_information: true,
+                       sync_complete: true,
                        short_channel_ids: vec![0x000000000000008e, 0x0000000000003c69, 0x000000000045a6c4],
                };
 
@@ -2582,7 +2581,7 @@ mod tests {
                        assert_eq!(reply_channel_range.chain_hash, expected_chain_hash);
                        assert_eq!(reply_channel_range.first_blocknum, 756230);
                        assert_eq!(reply_channel_range.number_of_blocks, 1500);
-                       assert_eq!(reply_channel_range.full_information, true);
+                       assert_eq!(reply_channel_range.sync_complete, true);
                        assert_eq!(reply_channel_range.short_channel_ids[0], 0x000000000000008e);
                        assert_eq!(reply_channel_range.short_channel_ids[1], 0x0000000000003c69);
                        assert_eq!(reply_channel_range.short_channel_ids[2], 0x000000000045a6c4);
index 4b53a21ba6bb71d6af51ebc6b892fa54c66075dc..bba99244b4a98624a1725c2d57a7eb2be15b6712 100644 (file)
@@ -264,18 +264,7 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
        /// does not match our chain_hash will be rejected when the announcement is
        /// processed.
        fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
-               log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),);
-
-               // Validate that the remote node maintains up-to-date channel
-               // information for chain_hash. Some nodes use the full_information
-               // flag to indicate multi-part messages so we must check whether
-               // we received SCIDs as well.
-               if !msg.full_information && msg.short_channel_ids.len() == 0 {
-                       return Err(LightningError {
-                               err: String::from("Received reply_channel_range with no information available"),
-                               action: ErrorAction::IgnoreError,
-                       });
-               }
+               log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, sync_complete={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.sync_complete, msg.short_channel_ids.len(),);
 
                log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(their_node_id), msg.short_channel_ids.len());
                let mut pending_events = self.pending_events.lock().unwrap();
@@ -2015,7 +2004,7 @@ mod tests {
                {
                        let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
                                chain_hash,
-                               full_information: true,
+                               sync_complete: true,
                                first_blocknum: 0,
                                number_of_blocks: 2000,
                                short_channel_ids: vec![
@@ -2048,22 +2037,6 @@ mod tests {
                                _ => panic!("expected MessageSendEvent::SendShortIdsQuery"),
                        }
                }
-
-               // Test receipt of a reply that indicates the remote node does not maintain up-to-date
-               // information for the chain_hash. Because of discrepancies in implementation we use
-               // full_information=false and short_channel_ids=[] as the signal.
-               {
-                       // Handle the reply indicating the peer was unable to fulfill our request.
-                       let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
-                               chain_hash,
-                               full_information: false,
-                               first_blocknum: 1000,
-                               number_of_blocks: 100,
-                               short_channel_ids: vec![],
-                       });
-                       assert!(result.is_err());
-                       assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available");
-               }
        }
 
        #[test]