Skip to content

Commit

Permalink
feat: Add multicast tx submission for Ethereum fallback provider (#5446)
Browse files Browse the repository at this point in the history
### Description

Add multicast tx submission for Ethereum fallback provider.

`EthereumFallbackProvider` will use different strategies to query chain
RPCs:
1. If the method is `eth_sendRawTransaction`, it will make a parallel
query to all RPC endpoints and return the first successful result. If no
success, it will retry 4 times.
2. For other methods, the current functionality does not change: it will
sequentially query RPC endpoints according to their priorities and
adjust priorities depending on success/failure. It also retries 4 times.

### Related issues

- Contributes into
hyperlane-xyz/hyperlane-monorepo#5452

### Backward compatibility

Yes

### Testing

Added unit tests

---------

Co-authored-by: Danil Nemirovsky <[email protected]>
  • Loading branch information
ameten and ameten authored Feb 20, 2025
1 parent 5f02f58 commit 7a52195
Show file tree
Hide file tree
Showing 2 changed files with 417 additions and 123 deletions.
230 changes: 107 additions & 123 deletions main/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
use derive_new::new;
use hyperlane_core::rpc_clients::{BlockNumberGetter, FallbackProvider};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::ops::Deref;
use std::time::Duration;
use thiserror::Error;

use async_trait::async_trait;
use derive_new::new;
use ethers::providers::{HttpClientError, JsonRpcClient, ProviderError};
use futures_util::{stream::FuturesUnordered, StreamExt};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{instrument, warn_span};
use tracing::{instrument, warn, warn_span};

use ethers_prometheus::json_rpc_client::JsonRpcBlockGetter;
use hyperlane_core::rpc_clients::{BlockNumberGetter, FallbackProvider};
use hyperlane_metric::prometheus_metric::PrometheusConfigExt;

use crate::rpc_clients::{categorize_client_response, CategorizedResponse};

const METHOD_SEND_RAW_TRANSACTION: &str = "eth_sendRawTransaction";

/// Wrapper of `FallbackProvider` for use in `hyperlane-ethereum`
/// The wrapper uses two distinct strategies to place requests to chains:
/// 1. multicast - the request will be sent to all the providers simultaneously and the first
/// successful response will be used.
/// 2. fallback - the request will be sent to each provider one by one according to their
/// priority and the priority will be updated depending on success/failure.
///
/// Multicast strategy is used to submit transactions into the chain, namely with RPC method
/// `eth_sendRawTransaction` while fallback strategy is used for all the other RPC methods.
#[derive(new)]
pub struct EthereumFallbackProvider<C, B>(FallbackProvider<C, B>);

Expand Down Expand Up @@ -91,8 +103,75 @@ where
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
if method == METHOD_SEND_RAW_TRANSACTION {
self.multicast(method, params).await
} else {
self.fallback(method, params).await
}
}
}

impl<C> EthereumFallbackProvider<C, JsonRpcBlockGetter<C>>
where
C: JsonRpcClient<Error = HttpClientError>
+ Into<JsonRpcBlockGetter<C>>
+ PrometheusConfigExt
+ Clone,
JsonRpcBlockGetter<C>: BlockNumberGetter,
{
async fn multicast<T, R>(&self, method: &str, params: T) -> Result<R, ProviderError>
where
T: Serialize,
R: DeserializeOwned,
{
use CategorizedResponse::*;

let params = serde_json::to_value(params).expect("valid");

// errors reported by providers
let mut errors = vec![];

// retry 4 times if all providers returned a retryable error
for i in 0..=3 {
if i > 0 {
// sleep starting from the second attempt
sleep(Duration::from_millis(100)).await;
}

// future which visits all providers as they fulfill their requests
let mut unordered = self.populate_unordered_future(method, &params);

while let Some(resp) = unordered.next().await {
let value = match categorize_client_response(method, resp) {
IsOk(v) => serde_json::from_value(v)?,
NonRetryableErr(e) | RetryableErr(e) | RateLimitErr(e) => {
errors.push(e.into());
continue;
}
};

// if we are here, it means one of the providers returned a successful result
if !errors.is_empty() {
// we log a warning if we got errors from some providers
warn!(errors_count=?errors.len(), errors=?errors, providers=?self.inner.providers, "multicast_request");
}

return Ok(value);
}
}

// we don't add a warning with all errors since an error will be logged later on
Err(FallbackError::AllProvidersFailed(errors).into())
}

async fn fallback<T, R>(&self, method: &str, params: T) -> Result<R, ProviderError>
where
T: Serialize,
R: DeserializeOwned,
{
use CategorizedResponse::*;

let params = serde_json::to_value(params).expect("valid");

let mut errors = vec![];
Expand All @@ -104,14 +183,11 @@ where
let priorities_snapshot = self.take_priorities_snapshot().await;
for (idx, priority) in priorities_snapshot.iter().enumerate() {
let provider = &self.inner.providers[priority.index];
let fut = match params {
Value::Null => provider.request(method, ()),
_ => provider.request(method, &params),
};
let fut = Self::provider_request(provider, method, &params);
let resp = fut.await;
self.handle_stalled_provider(priority, provider).await;
let _span =
warn_span!("request", fallback_count=%idx, provider_index=%priority.index, ?provider).entered();
warn_span!("fallback_request", fallback_count=%idx, provider_index=%priority.index, ?provider).entered();

match categorize_client_response(method, resp) {
IsOk(v) => return Ok(serde_json::from_value(v)?),
Expand All @@ -123,123 +199,31 @@ where

Err(FallbackError::AllProvidersFailed(errors).into())
}
}

#[cfg(test)]
mod tests {
use ethers_prometheus::json_rpc_client::{JsonRpcBlockGetter, BLOCK_NUMBER_RPC};
use hyperlane_core::rpc_clients::test::ProviderMock;
use hyperlane_core::rpc_clients::FallbackProviderBuilder;

use super::*;

#[derive(Debug, Clone, Default)]
struct EthereumProviderMock(ProviderMock);

impl Deref for EthereumProviderMock {
type Target = ProviderMock;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl EthereumProviderMock {
fn new(request_sleep: Option<Duration>) -> Self {
Self(ProviderMock::new(request_sleep))
async fn provider_request<'a>(
provider: &'a C,
method: &'a str,
params: &'a Value,
) -> Result<Value, HttpClientError> {
match params {
Value::Null => provider.request(method, ()).await,
_ => provider.request(method, params).await,
}
}

impl From<EthereumProviderMock> for JsonRpcBlockGetter<EthereumProviderMock> {
fn from(val: EthereumProviderMock) -> Self {
JsonRpcBlockGetter::new(val)
}
}

fn dummy_return_value<R: DeserializeOwned>() -> Result<R, HttpClientError> {
serde_json::from_str("0").map_err(|e| HttpClientError::SerdeJson {
err: e,
text: "".to_owned(),
})
fn populate_unordered_future<'a>(
&'a self,
method: &'a str,
params: &'a Value,
) -> FuturesUnordered<impl Future<Output = Result<Value, HttpClientError>> + Sized + '_> {
let unordered = FuturesUnordered::new();
self.inner
.providers
.iter()
.for_each(|p| unordered.push(Self::provider_request(p, method, params)));
unordered
}

#[async_trait]
impl JsonRpcClient for EthereumProviderMock {
type Error = HttpClientError;

/// Pushes the `(method, params)` to the back of the `requests` queue,
/// pops the responses from the back of the `responses` queue
async fn request<T: Debug + Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
params: T,
) -> Result<R, Self::Error> {
self.push(method, params);
if let Some(sleep_duration) = self.request_sleep() {
sleep(sleep_duration).await;
}
dummy_return_value()
}
}

impl PrometheusConfigExt for EthereumProviderMock {
fn node_host(&self) -> &str {
todo!()
}

fn chain_name(&self) -> &str {
todo!()
}
}

impl<C> EthereumFallbackProvider<C, JsonRpcBlockGetter<C>>
where
C: JsonRpcClient<Error = HttpClientError>
+ PrometheusConfigExt
+ Into<JsonRpcBlockGetter<C>>
+ Clone,
JsonRpcBlockGetter<C>: BlockNumberGetter,
{
async fn low_level_test_call(&self) {
self.request::<_, u64>(BLOCK_NUMBER_RPC, ()).await.unwrap();
}
}

#[tokio::test]
async fn test_first_provider_is_attempted() {
let fallback_provider_builder = FallbackProviderBuilder::default();
let providers = vec![
EthereumProviderMock::default(),
EthereumProviderMock::default(),
EthereumProviderMock::default(),
];
let fallback_provider = fallback_provider_builder.add_providers(providers).build();
let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider);
ethereum_fallback_provider.low_level_test_call().await;
let provider_call_count: Vec<_> =
ProviderMock::get_call_counts(&ethereum_fallback_provider).await;
assert_eq!(provider_call_count, vec![1, 0, 0]);
}

#[tokio::test]
async fn test_one_stalled_provider() {
let fallback_provider_builder = FallbackProviderBuilder::default();
let providers = vec![
EthereumProviderMock::new(Some(Duration::from_millis(10))),
EthereumProviderMock::default(),
EthereumProviderMock::default(),
];
let fallback_provider = fallback_provider_builder
.add_providers(providers)
.with_max_block_time(Duration::from_secs(0))
.build();
let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider);
ethereum_fallback_provider.low_level_test_call().await;
let provider_call_count: Vec<_> =
ProviderMock::get_call_counts(&ethereum_fallback_provider).await;
assert_eq!(provider_call_count, vec![0, 0, 2]);
}

// TODO: make `categorize_client_response` generic over `ProviderError` to allow testing
// two stalled providers (so that the for loop in `request` doesn't stop after the first provider)
}

#[cfg(test)]
mod tests;
Loading

0 comments on commit 7a52195

Please sign in to comment.