From e9a0131a142faa860b121c0053dd600cd1f573c6 Mon Sep 17 00:00:00 2001 From: Dinonard Date: Wed, 6 Jul 2022 12:27:21 +0200 Subject: [PATCH] Patch from tgmichel that removes schema cache task --- client/rpc/src/eth/cache/mod.rs | 117 +------------ client/rpc/src/eth/cache/tests.rs | 273 ------------------------------ client/rpc/src/eth/filter.rs | 55 +----- template/node/src/service.rs | 10 +- 4 files changed, 6 insertions(+), 449 deletions(-) delete mode 100644 client/rpc/src/eth/cache/tests.rs diff --git a/client/rpc/src/eth/cache/mod.rs b/client/rpc/src/eth/cache/mod.rs index 40756a8ffd..6aa019e036 100644 --- a/client/rpc/src/eth/cache/mod.rs +++ b/client/rpc/src/eth/cache/mod.rs @@ -17,7 +17,6 @@ // along with this program. If not, see . mod lru_cache; -mod tests; use std::{ collections::{BTreeMap, HashMap}, @@ -40,7 +39,7 @@ use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_runtime::{ generic::BlockId, - traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, UniqueSaturatedInto, Zero}, + traits::{BlakeTwo256, Block as BlockT, Header as HeaderT, UniqueSaturatedInto}, }; use fc_rpc_core::types::*; @@ -288,120 +287,6 @@ where BE: Backend + 'static, BE::State: StateBackend, { - /// Task that caches at which substrate hash a new EthereumStorageSchema was inserted in the Runtime Storage. - pub async fn ethereum_schema_cache_task(client: Arc, backend: Arc>) { - if let Ok(None) = frontier_backend_client::load_cached_schema::(backend.as_ref()) { - // Initialize the schema cache at genesis. - let mut cache: Vec<(EthereumStorageSchema, H256)> = Vec::new(); - let id = BlockId::Number(Zero::zero()); - if let Ok(Some(header)) = client.header(id) { - let genesis_schema_version = frontier_backend_client::onchain_storage_schema::< - B, - C, - BE, - >(client.as_ref(), id); - cache.push((genesis_schema_version, header.hash())); - let _ = frontier_backend_client::write_cached_schema::(backend.as_ref(), cache) - .map_err(|err| { - log::warn!("Error schema cache insert for genesis: {:?}", err); - }); - } else { - log::warn!("Error genesis header unreachable"); - } - } - - // Returns the schema for the given block hash and its parent. - let current_and_parent_schema = - |hash: B::Hash| -> Option<(EthereumStorageSchema, EthereumStorageSchema)> { - let id = BlockId::Hash(hash); - if let Ok(Some(header)) = client.header(id) { - let new_schema = frontier_backend_client::onchain_storage_schema::( - client.as_ref(), - id, - ); - - let parent_hash = header.parent_hash(); - let parent_id: BlockId = BlockId::Hash(*parent_hash); - let parent_schema = frontier_backend_client::onchain_storage_schema::( - client.as_ref(), - parent_id, - ); - return Some((new_schema, parent_schema)); - } - None - }; - - let mut notification_st = client.import_notification_stream(); - while let Some(notification) = notification_st.next().await { - let imported_hash = notification.hash; - if let (Some((new_schema, parent_schema)), Ok(Some(old_cache))) = ( - current_and_parent_schema(imported_hash), - frontier_backend_client::load_cached_schema::(backend.as_ref()), - ) { - let mut new_cache: Vec<(EthereumStorageSchema, H256)> = old_cache.clone(); - - if new_schema != parent_schema && notification.is_new_best { - // Always update cache on best block if there is a schema change. - new_cache.push((new_schema, imported_hash)); - } - - // Re-org handling. - if let Some(tree_route) = notification.tree_route { - // Imported block belongs to a re-org. - // First remove the retracted hashes from cache, if any. - let retracted = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect::>(); - let to_remove = old_cache - .iter() - .enumerate() - .filter_map(|(index, (_, hash))| { - if retracted.contains(hash) { - Some(index) - } else { - None - } - }) - .collect::>(); - for index in to_remove { - new_cache.remove(index); - } - // Next add if there is a schema change in the branch. - let to_add = tree_route - .enacted() - .iter() - .filter_map(|hash_and_number| { - if let Some((new_schema, parent_schema)) = - current_and_parent_schema(hash_and_number.hash) - { - if new_schema != parent_schema { - return Some((new_schema, hash_and_number.hash)); - } - return None; - } - None - }) - .collect::>(); - for item in to_add { - new_cache.push(item); - } - } - // Write cache. - if new_cache != old_cache { - let _ = frontier_backend_client::write_cached_schema::( - backend.as_ref(), - new_cache, - ) - .map_err(|err| { - log::warn!("Error schema cache insert: {:?}", err); - }); - } - } - } - } - pub async fn filter_pool_task( client: Arc, filter_pool: Arc>>, diff --git a/client/rpc/src/eth/cache/tests.rs b/client/rpc/src/eth/cache/tests.rs deleted file mode 100644 index 892ec42925..0000000000 --- a/client/rpc/src/eth/cache/tests.rs +++ /dev/null @@ -1,273 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 -// This file is part of Frontier. -// -// Copyright (c) 2020 Parity Technologies (UK) Ltd. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#[cfg(test)] -mod tests { - use crate::{frontier_backend_client, EthTask}; - - use codec::Encode; - use std::{path::PathBuf, sync::Arc, thread, time}; - - use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA}; - use frontier_template_runtime::RuntimeApi; - use futures::executor; - use sc_block_builder::BlockBuilderProvider; - use sp_consensus::BlockOrigin; - use sp_core::traits::SpawnEssentialNamed; - use sp_runtime::{ - generic::{Block, BlockId, Header}, - traits::BlakeTwo256, - }; - use substrate_test_runtime_client::{ - prelude::*, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, - }; - use tempfile::tempdir; - - type OpaqueBlock = - Block, substrate_test_runtime_client::runtime::Extrinsic>; - - pub fn open_frontier_backend( - path: PathBuf, - ) -> Result>, String> { - Ok(Arc::new(fc_db::Backend::::new( - &fc_db::DatabaseSettings { - source: sc_client_db::DatabaseSource::RocksDb { - path, - cache_size: 0, - }, - }, - )?)) - } - - #[test] - fn should_cache_pallet_ethereum_schema() { - let tmp = tempdir().expect("create a temporary directory"); - // Initialize storage with schema V1. - let builder = TestClientBuilder::new().add_extra_storage( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Encode::encode(&EthereumStorageSchema::V1), - ); - let (client, _) = builder.build_with_native_executor::(None); - let mut client = Arc::new(client); - - // Create a temporary frontier secondary DB. - let frontier_backend = open_frontier_backend(tmp.into_path()).unwrap(); - - // Spawn `frontier-schema-cache-task` background task. - let spawner = sp_core::testing::TaskExecutor::new(); - spawner.spawn_essential_blocking( - "frontier-schema-cache-task", - None, - Box::pin(EthTask::ethereum_schema_cache_task( - Arc::clone(&client), - Arc::clone(&frontier_backend), - )), - ); - - // Create some blocks. - for nonce in [1, 2, 3, 4, 5].into_iter() { - let mut builder = client.new_block(Default::default()).unwrap(); - builder.push_storage_change(vec![nonce], None).unwrap(); - let block = builder.build().unwrap().block; - executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); - } - - // Expect: only genesis block is cached to schema V1. - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![(EthereumStorageSchema::V1, client.genesis_hash())]) - ); - - // Create another block and push a schema change (V2). - let mut builder = client.new_block(Default::default()).unwrap(); - builder - .push_storage_change( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Some(Encode::encode(&EthereumStorageSchema::V2)), - ) - .unwrap(); - let block = builder.build().unwrap().block; - let block_hash = block.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); - - // Give some time to consume and process the import notification stream. - thread::sleep(time::Duration::from_millis(10)); - - // Expect: genesis still cached (V1), latest block cached (V2) - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![ - (EthereumStorageSchema::V1, client.genesis_hash()), - (EthereumStorageSchema::V2, block_hash) - ]) - ); - } - - #[test] - fn should_handle_cache_on_multiple_forks() { - let tmp = tempdir().expect("create a temporary directory"); - // Initialize storage with schema V1. - let builder = TestClientBuilder::new().add_extra_storage( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Encode::encode(&EthereumStorageSchema::V1), - ); - let (client, _) = builder.build_with_native_executor::(None); - let mut client = Arc::new(client); - - // Create a temporary frontier secondary DB. - let frontier_backend = open_frontier_backend(tmp.into_path()).unwrap(); - - // Spawn `frontier-schema-cache-task` background task. - let spawner = sp_core::testing::TaskExecutor::new(); - spawner.spawn_essential_blocking( - "frontier-schema-cache-task", - None, - Box::pin(EthTask::ethereum_schema_cache_task( - Arc::clone(&client), - Arc::clone(&frontier_backend), - )), - ); - - // G -> A1. - let mut builder = client.new_block(Default::default()).unwrap(); - builder.push_storage_change(vec![1], None).unwrap(); - let a1 = builder.build().unwrap().block; - let a1_hash = a1.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, a1)).unwrap(); - - // A1 -> A2, we store V2 schema. - let mut builder = client - .new_block_at(&BlockId::Hash(a1_hash), Default::default(), false) - .unwrap(); - builder - .push_storage_change( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Some(Encode::encode(&EthereumStorageSchema::V2)), - ) - .unwrap(); - let a2 = builder.build().unwrap().block; - let a2_hash = a2.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, a2)).unwrap(); - - // Give some time to consume and process the import notification stream. - thread::sleep(time::Duration::from_millis(100)); - - // Expect: genesis with schema V1, A2 with schema V2. - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![ - (EthereumStorageSchema::V1, client.genesis_hash()), - (EthereumStorageSchema::V2, a2_hash) - ]) - ); - - // A1 -> B2. A new block on top of A1. - let mut builder = client - .new_block_at(&BlockId::Hash(a1_hash), Default::default(), false) - .unwrap(); - builder.push_storage_change(vec![2], None).unwrap(); - let b2 = builder.build().unwrap().block; - let b2_hash = b2.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, b2)).unwrap(); - - // B2 -> B3, we store V2 schema again. This is the longest chain. - let mut builder = client - .new_block_at(&BlockId::Hash(b2_hash), Default::default(), false) - .unwrap(); - builder - .push_storage_change( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Some(Encode::encode(&EthereumStorageSchema::V2)), - ) - .unwrap(); - let b3 = builder.build().unwrap().block; - let b3_hash = b3.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, b3)).unwrap(); - - // Give some time to consume and process the import notification stream. - thread::sleep(time::Duration::from_millis(100)); - - // Expect: A2 to be retracted, genesis with schema V1, B3 with schema V2. - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![ - (EthereumStorageSchema::V1, client.genesis_hash()), - (EthereumStorageSchema::V2, b3_hash) - ]) - ); - - // A1 -> C2, a wild new block on top of A1. - let mut builder = client - .new_block_at(&BlockId::Hash(a1_hash), Default::default(), false) - .unwrap(); - builder - .push_storage_change( - PALLET_ETHEREUM_SCHEMA.to_vec(), - Some(Encode::encode(&EthereumStorageSchema::V2)), - ) - .unwrap(); - builder.push_storage_change(vec![3], None).unwrap(); - let c2 = builder.build().unwrap().block; - let c2_hash = c2.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, c2)).unwrap(); - - // Give some time to consume and process the import notification stream. - thread::sleep(time::Duration::from_millis(100)); - - // Expect: genesis with schema V1, B3 still with schema V2. - // C2 still not best block and not cached. - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![ - (EthereumStorageSchema::V1, client.genesis_hash()), - (EthereumStorageSchema::V2, b3_hash) - ]) - ); - - // Make C2 branch the longest chain. - // C2 -> C3 - let mut builder = client - .new_block_at(&BlockId::Hash(c2_hash), Default::default(), false) - .unwrap(); - builder.push_storage_change(vec![2], None).unwrap(); - let c3 = builder.build().unwrap().block; - let c3_hash = c3.header.hash(); - executor::block_on(client.import(BlockOrigin::Own, c3)).unwrap(); - - // C3 -> C4 - let mut builder = client - .new_block_at(&BlockId::Hash(c3_hash), Default::default(), false) - .unwrap(); - builder.push_storage_change(vec![3], None).unwrap(); - let c4 = builder.build().unwrap().block; - executor::block_on(client.import(BlockOrigin::Own, c4)).unwrap(); - - // Give some time to consume and process the import notification stream. - thread::sleep(time::Duration::from_millis(100)); - - // Expect: B2 branch to be retracted, genesis with schema V1, C2 with schema V2. - // C4 became new best, chain reorged, we expect the C2 ancestor to be cached. - assert_eq!( - frontier_backend_client::load_cached_schema::<_>(frontier_backend.as_ref()).unwrap(), - Some(vec![ - (EthereumStorageSchema::V1, client.genesis_hash()), - (EthereumStorageSchema::V2, c2_hash) - ]) - ); - } -} diff --git a/client/rpc/src/eth/filter.rs b/client/rpc/src/eth/filter.rs index b44975acb5..c2687a03ce 100644 --- a/client/rpc/src/eth/filter.rs +++ b/client/rpc/src/eth/filter.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time}; +use std::{marker::PhantomData, sync::Arc, time}; use ethereum::BlockV2 as EthereumBlock; use ethereum_types::{H256, U256}; @@ -28,15 +28,11 @@ use sp_blockchain::HeaderBackend; use sp_core::hashing::keccak_256; use sp_runtime::{ generic::BlockId, - traits::{ - BlakeTwo256, Block as BlockT, Header as HeaderT, NumberFor, One, Saturating, - UniqueSaturatedInto, - }, + traits::{BlakeTwo256, Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto}, }; use fc_rpc_core::{types::*, EthFilterApiServer}; use fp_rpc::{EthereumRuntimeRPCApi, TransactionStatus}; -use fp_storage::EthereumStorageSchema; use crate::{eth::cache::EthBlockDataCacheTask, frontier_backend_client, internal_err}; @@ -236,7 +232,6 @@ where let client = Arc::clone(&self.client); let block_data_cache = Arc::clone(&self.block_data_cache); - let backend = Arc::clone(&self.backend); let max_past_logs = self.max_past_logs; match path { @@ -269,7 +264,6 @@ where let mut ret: Vec = Vec::new(); let _ = filter_range_logs( client.as_ref(), - backend.as_ref(), &block_data_cache, &mut ret, max_past_logs, @@ -310,7 +304,6 @@ where let client = Arc::clone(&self.client); let block_data_cache = Arc::clone(&self.block_data_cache); - let backend = Arc::clone(&self.backend); let max_past_logs = self.max_past_logs; let filter = filter_result?; @@ -339,7 +332,6 @@ where let mut ret: Vec = Vec::new(); let _ = filter_range_logs( client.as_ref(), - backend.as_ref(), &block_data_cache, &mut ret, max_past_logs, @@ -415,7 +407,6 @@ where let _ = filter_range_logs( client.as_ref(), - backend.as_ref(), &block_data_cache, &mut ret, max_past_logs, @@ -431,7 +422,6 @@ where async fn filter_range_logs( client: &C, - backend: &fc_db::Backend, block_data_cache: &EthBlockDataCacheTask, ret: &mut Vec, max_past_logs: u32, @@ -463,52 +453,13 @@ where let address_bloom_filter = FilteredParams::adresses_bloom_filter(&filter.address); let topics_bloom_filter = FilteredParams::topics_bloom_filter(&topics_input); - // Get schema cache. A single read before the block range iteration. - // This prevents having to do an extra DB read per block range iteration to getthe actual schema. - let mut local_cache: BTreeMap, EthereumStorageSchema> = BTreeMap::new(); - if let Ok(Some(schema_cache)) = frontier_backend_client::load_cached_schema::(backend) { - for (schema, hash) in schema_cache { - if let Ok(Some(header)) = client.header(BlockId::Hash(hash)) { - let number = *header.number(); - local_cache.insert(number, schema); - } - } - } - let cache_keys: Vec> = local_cache.keys().cloned().collect(); - let mut default_schema: Option<&EthereumStorageSchema> = None; - if cache_keys.len() == 1 { - // There is only one schema and that's the one we use. - default_schema = local_cache.get(&cache_keys[0]); - } - while current_number <= to { let id = BlockId::Number(current_number); let substrate_hash = client .expect_block_hash_from_id(&id) .map_err(|_| internal_err(format!("Expect block number from id: {}", id)))?; - let schema = match default_schema { - // If there is a single schema, we just assign. - Some(default_schema) => *default_schema, - _ => { - // If there are multiple schemas, we iterate over the - hopefully short - list - // of keys and assign the one belonging to the current_number. - // Because there are more than 1 schema, and current_number cannot be < 0, - // (i - 1) will always be >= 0. - let mut default_schema: Option<&EthereumStorageSchema> = None; - for (i, k) in cache_keys.iter().enumerate() { - if ¤t_number < k { - default_schema = local_cache.get(&cache_keys[i - 1]); - } - } - match default_schema { - Some(schema) => *schema, - // Fallback to DB read. This will happen i.e. when there is no cache - // task configured at service level. - _ => frontier_backend_client::onchain_storage_schema::(client, id), - } - } - }; + let schema = frontier_backend_client::onchain_storage_schema::(client, id); let block = block_data_cache.current_block(schema, substrate_hash).await; diff --git a/template/node/src/service.rs b/template/node/src/service.rs index 35dcbdee30..0ac8cf1651 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -743,7 +743,7 @@ fn spawn_frontier_tasks( Duration::new(6, 0), client.clone(), backend, - frontier_backend.clone(), + frontier_backend, 3, 0, SyncStrategy::Normal, @@ -767,16 +767,10 @@ fn spawn_frontier_tasks( "frontier-fee-history", None, EthTask::fee_history_task( - client.clone(), + client, overrides, fee_history_cache, fee_history_cache_limit, ), ); - - task_manager.spawn_essential_handle().spawn( - "frontier-schema-cache-task", - None, - EthTask::ethereum_schema_cache_task(client, frontier_backend), - ); }