Skip to content

Commit

Permalink
core/state: finish prefetching async and process storage updates async
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed May 3, 2024
1 parent f166ce1 commit f5ec2e7
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 103 deletions.
22 changes: 19 additions & 3 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -314,6 +323,9 @@ func (s *stateObject) updateTrie() (Trie, error) {
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -342,26 +354,30 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
khash := crypto.HashData(hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -381,7 +397,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down
77 changes: 34 additions & 43 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,10 +93,12 @@ type StateDB struct {

// These maps hold the state changes (including the corresponding
// original value) that occurred in this **block**.
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding

storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format
storagesLock sync.Mutex // Mutex protecting the maps during concurrent updates/commits

// This map holds 'live' objects, which will get modified while
// processing a state transition.
Expand Down Expand Up @@ -161,9 +164,9 @@ type StateDB struct {
TrieDBCommits time.Duration

AccountUpdated int
StorageUpdated int
StorageUpdated atomic.Int64
AccountDeleted int
StorageDeleted int
StorageDeleted atomic.Int64

// Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed
Expand Down Expand Up @@ -210,7 +213,7 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil {
s.prefetcher.terminate()
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
Expand All @@ -223,7 +226,7 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// from the gathered metrics.
func (s *StateDB) StopPrefetcher() {
if s.prefetcher != nil {
s.prefetcher.terminate()
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
Expand Down Expand Up @@ -542,9 +545,6 @@ func (s *StateDB) GetTransientState(addr common.Address, key common.Hash) common

// updateStateObject writes the given object to the trie.
func (s *StateDB) updateStateObject(obj *stateObject) {
// Track the amount of time wasted on updating the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())

// Encode the account and update the account trie
addr := obj.Address()
if err := s.trie.UpdateAccount(addr, &obj.data); err != nil {
Expand Down Expand Up @@ -573,10 +573,6 @@ func (s *StateDB) updateStateObject(obj *stateObject) {

// deleteStateObject removes the given object from the state trie.
func (s *StateDB) deleteStateObject(addr common.Address) {
// Track the amount of time wasted on deleting the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())

// Delete the account from the trie
if err := s.trie.DeleteAccount(addr); err != nil {
s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err))
}
Expand Down Expand Up @@ -835,48 +831,40 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

// If there was a trie prefetcher operating, terminate it (blocking until
// all tasks finish) and then proceed with the trie hashing.
var subfetchers chan *subfetcher
// If there was a trie prefetcher operating, terminate it async so that the
// individual storage tries can be updated as soon as the disk load finishes.
if s.prefetcher != nil {
subfetchers = s.prefetcher.terminateAsync()
s.prefetcher.terminate(true)
defer func() {
s.prefetcher.report()
s.prefetcher = nil // Pre-byzantium, unset any used up prefetcher
}()
}
// Although naively it makes sense to retrieve the account trie and then do
// the contract storage and account updates sequentially, that short circuits
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefetches just a few more milliseconds of time
// to pull useful data from disk.
start := time.Now()

updated := make(map[common.Address]struct{})
if subfetchers != nil {
for f := range subfetchers {
if op, ok := s.mutations[f.addr]; ok {
if !op.applied && !op.isDelete() {
s.stateObjects[f.addr].updateRoot()
}
updated[f.addr] = struct{}{}
}
}
}
// Process all storage updates concurrently. The state object update root
// method will internally call a blocking trie fetch from the prefetcher,
// so there's no need to explicitly wait for the prefetchers to finish.
var (
start = time.Now()
workers errgroup.Group
)
for addr, op := range s.mutations {
if op.applied {
continue
}
if op.isDelete() {
if op.applied || op.isDelete() {
continue
}
s.stateObjects[addr].updateRoot()
obj := s.stateObjects[addr] // closure for the task runner below
workers.Go(func() error {
obj.updateRoot()
return nil
})
}
workers.Wait()
s.StorageUpdates += time.Since(start)

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
start = time.Now()

if s.prefetcher != nil {
if trie, err := s.prefetcher.trie(common.Hash{}, s.originalRoot); err != nil {
log.Error("Failed to retrieve account pre-fetcher trie", "err", err)
Expand Down Expand Up @@ -916,6 +904,8 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.deleteStateObject(deletedAddr)
s.AccountDeleted += 1
}
s.AccountUpdates += time.Since(start)

if s.prefetcher != nil {
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
}
Expand Down Expand Up @@ -1258,15 +1248,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
return common.Hash{}, err
}
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
storageUpdatedMeter.Mark(s.StorageUpdated.Load())
accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted))
storageDeletedMeter.Mark(s.StorageDeleted.Load())
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0
s.StorageUpdated.Store(0)
s.StorageDeleted.Store(0)

// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
Expand Down
78 changes: 25 additions & 53 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,23 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
}
}

// terminate iterates over all the subfetchers, waiting on any that still spin.
func (p *triePrefetcher) terminate() {
// terminate iterates over all the subfetchers and issues a terminateion request
// to all of them. Depending on the async parameter, the method will either block
// until all subfetchers spin down, or return immediately.
func (p *triePrefetcher) terminate(async bool) {
// Short circuit if the fetcher is already closed
select {
case <-p.term:
return
default:
}
// Termiante all sub-fetchers synchronously and close the main fetcher
// Termiante all sub-fetchers, sync or async, depending on the request
for _, fetcher := range p.fetchers {
fetcher.terminate()
fetcher.terminate(async)
}
close(p.term)
}

// terminateAsync iterates over all the subfetchers and terminates them async,
// feeding each into a result channel as they finish.
func (p *triePrefetcher) terminateAsync() chan *subfetcher {
// Short circuit if the fetcher is already closed
select {
case <-p.term:
return nil
default:
}
// Terminate all the sub-fetchers asynchronously and feed them into a result
// channel as they finish
var (
res = make(chan *subfetcher, len(p.fetchers))
pend sync.WaitGroup
)
for _, fetcher := range p.fetchers {
pend.Add(1)
go func(f *subfetcher) {
f.terminate()
res <- f
pend.Done()
}(fetcher)
}
go func() {
pend.Wait()
close(res)
}()
close(p.term)
return res
}

// report aggregates the pre-fetching and usage metrics and reports them.
func (p *triePrefetcher) report() {
if !metrics.Enabled {
Expand Down Expand Up @@ -173,17 +144,19 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
return fetcher.schedule(keys)
}

// trie returns the trie matching the root hash, or nil if either the fetcher
// is terminated or the trie is not available.
// trie returns the trie matching the root hash, blocking until the fetcher of
// the given trie terminates. If no fetcher exists for the request, nil will be
// returned.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) (Trie, error) {
// Bail if no trie was prefetched for this root
fetcher := p.fetchers[p.trieID(owner, root)]
if fetcher == nil {
log.Warn("Prefetcher missed to load trie", "owner", owner, "root", root)
log.Error("Prefetcher missed to load trie", "owner", owner, "root", root)
p.deliveryMissMeter.Mark(1)
return nil, nil
}
return fetcher.peek()
// Subfetcher exists, retrieve its trie
return fetcher.peek(), nil
}

// used marks a batch of state items used to allow creating statistics as to
Expand Down Expand Up @@ -269,27 +242,26 @@ func (sf *subfetcher) schedule(keys [][]byte) error {

// peek retrieves the fetcher's trie, populated with any pre-fetched data. The
// returned trie will be a shallow copy, so modifying it will break subsequent
// peeks for the original data.
//
// This method can only be called after closing the subfetcher.
func (sf *subfetcher) peek() (Trie, error) {
// Ensure the subfetcher finished operating on its trie
select {
case <-sf.term:
default:
return nil, errNotTerminated
}
return sf.trie, nil
// peeks for the original data. The method will block until all the scheduled
// data has been loaded and the fethcer terminated.
func (sf *subfetcher) peek() Trie {
// Block until the fertcher terminates, then retrieve the trie
<-sf.term
return sf.trie
}

// terminate waits for the subfetcher to finish its tasks, after which it tears
// down all the internal background loaders.
func (sf *subfetcher) terminate() {
// terminate requests the subfetcher to stop accepting new tasks and spin down
// as soon as everything is loaded. Depending on the async parameter, the method
// will either block until all disk loads finish or return immediately.
func (sf *subfetcher) terminate(async bool) {
select {
case <-sf.stop:
default:
close(sf.stop)
}
if async {
return
}
<-sf.term
}

Expand Down
5 changes: 1 addition & 4 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ func TestUseAfterTerminate(t *testing.T) {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
if _, err := prefetcher.trie(common.Hash{}, db.originalRoot); err == nil {
t.Errorf("Trie retrieval succeeded before terminate")
}
prefetcher.terminate()
prefetcher.terminate(false)

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
Expand Down

0 comments on commit f5ec2e7

Please sign in to comment.