[dependencies]
bitcoin = "0.27"
lightning = { version = "0.0.106", path = "../lightning" }
+futures = { version = "0.3" }
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
/// start when there are no chain listeners to sync yet.
///
/// [`SpvClient`]: crate::SpvClient
-pub async fn validate_best_block_header<B: BlockSource>(block_source: &mut B) ->
+pub async fn validate_best_block_header<B: BlockSource>(block_source: &B) ->
BlockSourceResult<ValidatedBlockHeader> {
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
block_source
/// C: chain::Filter,
/// P: chainmonitor::Persist<S>,
/// >(
-/// block_source: &mut B,
+/// block_source: &B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// keys_manager: &K,
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<'a, B: BlockSource, C: Cache, L: chain::Listen + ?Sized>(
- block_source: &mut B,
+ block_source: &B,
network: Network,
header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &'a L)>,
///
/// Implementations that cannot find headers based on the hash should return a `Transient` error
/// when `height_hint` is `None`.
- fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
+ fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
- fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
+ fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
/// Returns the hash of the best block and, optionally, its height.
///
/// to allow for a more efficient lookup.
///
/// [`get_header`]: Self::get_header
- fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
+ fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
}
/// Result type for `BlockSource` requests.
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;
-use std::ops::DerefMut;
+use std::ops::Deref;
/// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving
/// related chain data. It serves as an adapter for `BlockSource`.
/// [`ChainPoller`]: ../struct.ChainPoller.html
pub trait Poll {
/// Returns a chain tip in terms of its relationship to the provided chain tip.
- fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
+ fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>;
/// Returns the header that preceded the given header in the chain.
- fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
+ fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>;
/// Returns the block associated with the given header.
- fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
+ fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>;
}
///
/// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way
/// of validating chain data and checking consistency.
-pub struct ChainPoller<B: DerefMut<Target=T> + Sized, T: BlockSource> {
+pub struct ChainPoller<B: Deref<Target=T> + Sized, T: BlockSource> {
block_source: B,
network: Network,
}
-impl<B: DerefMut<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
+impl<B: Deref<Target=T> + Sized, T: BlockSource> ChainPoller<B, T> {
/// Creates a new poller for the given block source.
///
/// If the `network` parameter is mainnet, then the difficulty between blocks is checked for
}
}
-impl<B: DerefMut<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
- fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) ->
+impl<B: Deref<Target=T> + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller<B, T> {
+ fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ChainTip>
{
Box::pin(async move {
})
}
- fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
+ fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlockHeader>
{
Box::pin(async move {
})
}
- fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) ->
+ fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) ->
AsyncBlockSourceResult<'a, ValidatedBlock>
{
Box::pin(async move {
let best_known_chain_tip = chain.tip();
chain.disconnect_tip();
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Transient);
#[tokio::test]
async fn poll_chain_without_headers() {
- let mut chain = Blockchain::default().with_height(1).without_headers();
+ let chain = Blockchain::default().with_height(1).without_headers();
let best_known_chain_tip = chain.at_height(0);
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
chain.blocks.last_mut().unwrap().header.bits =
BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32]));
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
#[tokio::test]
async fn poll_chain_with_malformed_headers() {
- let mut chain = Blockchain::default().with_height(1).malformed_headers();
+ let chain = Blockchain::default().with_height(1).malformed_headers();
let best_known_chain_tip = chain.at_height(0);
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
#[tokio::test]
async fn poll_chain_with_common_tip() {
- let mut chain = Blockchain::default().with_height(0);
+ let chain = Blockchain::default().with_height(0);
let best_known_chain_tip = chain.tip();
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Common),
let worse_chain_tip = chain.tip();
assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork);
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
chain.disconnect_tip();
let worse_chain_tip = chain.tip();
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)),
#[tokio::test]
async fn poll_chain_with_better_tip() {
- let mut chain = Blockchain::default().with_height(1);
+ let chain = Blockchain::default().with_height(1);
let best_known_chain_tip = chain.at_height(0);
let better_chain_tip = chain.tip();
- let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin);
+ let poller = ChainPoller::new(&chain, Network::Bitcoin);
match poller.poll_chain_tip(best_known_chain_tip).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)),
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
+use futures::lock::Mutex;
+
use std::convert::TryFrom;
use std::convert::TryInto;
/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
- client: HttpClient,
+ client: Mutex<HttpClient>,
}
impl RestClient {
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
- let client = HttpClient::connect(&endpoint)?;
+ let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self { endpoint, client })
}
/// Requests a resource encoded in `F` format and interpreted as type `T`.
- pub async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
+ pub async fn request_resource<F, T>(&self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
- self.client.get::<F>(&uri, &host).await?.try_into()
+ self.client.lock().await.get::<F>(&uri, &host).await?.try_into()
}
}
impl BlockSource for RestClient {
- fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+ fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
})
}
- fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+ fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
})
}
- fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+ fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
})
#[tokio::test]
async fn request_unknown_resource() {
let server = HttpServer::responding_with_not_found();
- let mut client = RestClient::new(server.endpoint()).unwrap();
+ let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
#[tokio::test]
async fn request_malformed_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
- let mut client = RestClient::new(server.endpoint()).unwrap();
+ let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
#[tokio::test]
async fn request_valid_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content(42));
- let mut client = RestClient::new(server.endpoint()).unwrap();
+ let client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => panic!("Unexpected error: {:?}", e),
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
+use futures::lock::Mutex;
+
use serde_json;
use std::convert::TryFrom;
pub struct RpcClient {
basic_auth: String,
endpoint: HttpEndpoint,
- client: HttpClient,
+ client: Mutex<HttpClient>,
id: AtomicUsize,
}
/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
/// required for HTTP basic access authentication.
pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
- let client = HttpClient::connect(&endpoint)?;
+ let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self {
basic_auth: "Basic ".to_string() + credentials,
endpoint,
}
/// Calls a method with the response encoded in JSON format and interpreted as type `T`.
- pub async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
+ pub async fn call_method<T>(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = self.endpoint.path();
"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
});
- let mut response = match self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
+ let mut response = match self.client.lock().await.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
Ok(JsonResponse(response)) => response,
Err(e) if e.kind() == std::io::ErrorKind::Other => {
match e.get_ref().unwrap().downcast_ref::<HttpError>() {
}
impl BlockSource for RpcClient {
- fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+ fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
Ok(self.call_method("getblockheader", &[header_hash]).await?)
})
}
- fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+ fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
})
}
- fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+ fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.call_method("getblockchaininfo", &[]).await?)
})
#[tokio::test]
async fn call_method_returning_unknown_response() {
let server = HttpServer::responding_with_not_found();
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
async fn call_method_returning_malfomred_response() {
let response = serde_json::json!("foo");
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
"error": { "code": -8, "message": "invalid parameter" },
});
let server = HttpServer::responding_with_server_error(response);
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let invalid_block_hash = serde_json::json!("foo");
match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
async fn call_method_returning_missing_result() {
let response = serde_json::json!({ "result": null });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
async fn call_method_returning_malformed_result() {
let response = serde_json::json!({ "result": "foo" });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
async fn call_method_returning_valid_result() {
let response = serde_json::json!({ "result": 654470 });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => panic!("Unexpected error: {:?}", e),
}
impl BlockSource for Blockchain {
- fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+ fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
if self.without_headers {
return Err(BlockSourceError::persistent("header not found"));
})
}
- fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+ fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
for (height, block) in self.blocks.iter().enumerate() {
if block.header.block_hash() == *header_hash {
})
}
- fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+ fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
match self.blocks.last() {
None => Err(BlockSourceError::transient("empty chain")),