Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: update storage objects concurrently to one another #29696

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 20 additions & 3 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -307,6 +316,9 @@ func (s *stateObject) updateTrie() (Trie, error) {
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -335,26 +347,31 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
khash := crypto.HashData(hasher, key[:])

// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -374,7 +391,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down
30 changes: 21 additions & 9 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,10 +93,12 @@ type StateDB struct {

// These maps hold the state changes (including the corresponding
// original value) that occurred in this **block**.
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding

storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format
storagesLock sync.Mutex // Lock protecting concurrent updates to the storage maps

// This map holds 'live' objects, which will get modified while
// processing a state transition.
Expand Down Expand Up @@ -161,9 +164,9 @@ type StateDB struct {
TrieDBCommits time.Duration

AccountUpdated int
StorageUpdated int
AccountDeleted int
StorageDeleted int
StorageUpdated atomic.Int64
StorageDeleted atomic.Int64

// Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed
Expand Down Expand Up @@ -857,16 +860,24 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefetches just a few more milliseconds of time
// to pull useful data from disk.
start := time.Now()
var (
start = time.Now()
workers errgroup.Group
)
for addr, op := range s.mutations {
if op.applied {
continue
}
if op.isDelete() {
continue
}
s.stateObjects[addr].updateRoot()
obj := s.stateObjects[addr] // closure for the goroutine below
workers.Go(func() error {
obj.updateRoot()
return nil
})
}
workers.Wait()
s.StorageUpdates += time.Since(start)

// Now we're about to start to write changes to the trie. The trie is so far
Expand Down Expand Up @@ -1251,15 +1262,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
return common.Hash{}, err
}
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
storageUpdatedMeter.Mark(s.StorageUpdated.Load())
accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted))
storageDeletedMeter.Mark(s.StorageDeleted.Load())
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0
s.StorageUpdated.Store(0)
s.StorageDeleted.Store(0)

// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
Expand Down