Skip to content

Commit

Permalink
core/state: update storage objects concurrently to one another
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed May 2, 2024
1 parent fbf6238 commit 8bb6a49
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
23 changes: 20 additions & 3 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -307,6 +316,9 @@ func (s *stateObject) updateTrie() (Trie, error) {
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -335,26 +347,31 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
khash := crypto.HashData(hasher, key[:])

// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -374,7 +391,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down
30 changes: 21 additions & 9 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,10 +93,12 @@ type StateDB struct {

// These maps hold the state changes (including the corresponding
// original value) that occurred in this **block**.
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding

storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format
storagesLock sync.Mutex // Lock protecting concurrent updates to the storage maps

// This map holds 'live' objects, which will get modified while
// processing a state transition.
Expand Down Expand Up @@ -161,9 +164,9 @@ type StateDB struct {
TrieDBCommits time.Duration

AccountUpdated int
StorageUpdated int
AccountDeleted int
StorageDeleted int
StorageUpdated atomic.Int64
StorageDeleted atomic.Int64

// Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed
Expand Down Expand Up @@ -857,16 +860,24 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefetches just a few more milliseconds of time
// to pull useful data from disk.
start := time.Now()
var (
start = time.Now()
workers errgroup.Group
)
for addr, op := range s.mutations {
if op.applied {
continue
}
if op.isDelete() {
continue
}
s.stateObjects[addr].updateRoot()
obj := s.stateObjects[addr] // closure for the goroutine below
workers.Go(func() error {
obj.updateRoot()
return nil
})
}
workers.Wait()
s.StorageUpdates += time.Since(start)

// Now we're about to start to write changes to the trie. The trie is so far
Expand Down Expand Up @@ -1251,15 +1262,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
return common.Hash{}, err
}
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
storageUpdatedMeter.Mark(s.StorageUpdated.Load())
accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted))
storageDeletedMeter.Mark(s.StorageDeleted.Load())
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0
s.StorageUpdated.Store(0)
s.StorageDeleted.Store(0)

// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
Expand Down

0 comments on commit 8bb6a49

Please sign in to comment.