//! endpoint.
use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
-use crate::http::{HttpClient, HttpEndpoint, JsonResponse};
+use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
+use futures::lock::Mutex;
+
use serde_json;
use std::convert::TryFrom;
pub struct RpcClient {
basic_auth: String,
endpoint: HttpEndpoint,
- client: HttpClient,
+ client: Mutex<HttpClient>,
id: AtomicUsize,
}
/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
/// required for HTTP basic access authentication.
pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
- let client = HttpClient::connect(&endpoint)?;
+ let client = Mutex::new(HttpClient::connect(&endpoint)?);
Ok(Self {
basic_auth: "Basic ".to_string() + credentials,
endpoint,
}
/// Calls a method with the response encoded in JSON format and interpreted as type `T`.
- pub async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
+ pub async fn call_method<T>(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = self.endpoint.path();
"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
});
- let mut response = self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content)
- .await?.0;
+ let mut response = match self.client.lock().await.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
+ Ok(JsonResponse(response)) => response,
+ Err(e) if e.kind() == std::io::ErrorKind::Other => {
+ match e.get_ref().unwrap().downcast_ref::<HttpError>() {
+ Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) {
+ Ok(JsonResponse(response)) => response,
+ Err(_) => Err(e)?,
+ },
+ None => Err(e)?,
+ }
+ },
+ Err(e) => Err(e)?,
+ };
+
if !response.is_object() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
}
}
impl BlockSource for RpcClient {
- fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+ fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
Ok(self.call_method("getblockheader", &[header_hash]).await?)
})
}
- fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+ fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
})
}
- fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+ fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.call_method("getblockchaininfo", &[]).await?)
})
#[tokio::test]
async fn call_method_returning_unknown_response() {
let server = HttpServer::responding_with_not_found();
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
async fn call_method_returning_malfomred_response() {
let response = serde_json::json!("foo");
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
let response = serde_json::json!({
"error": { "code": -8, "message": "invalid parameter" },
});
- let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let server = HttpServer::responding_with_server_error(response);
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let invalid_block_hash = serde_json::json!("foo");
match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
async fn call_method_returning_missing_result() {
let response = serde_json::json!({ "result": null });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
async fn call_method_returning_malformed_result() {
let response = serde_json::json!({ "result": "foo" });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
async fn call_method_returning_valid_result() {
let response = serde_json::json!({ "result": 654470 });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
- let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+ let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => panic!("Unexpected error: {:?}", e),