Skip to content

Commit

Permalink
ethclient/lightclient: add canonical tail fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed May 13, 2024
1 parent aa764f9 commit fe7568d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/bltest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ loop:
} else {
log.Error("BlockByHash", "hash", head.ParentHash, "error", err)
}
num := big.NewInt(2)
num := big.NewInt(10)
num.Sub(head.Number, num)
if block, err := client.BlockByNumber(ctx, num); err == nil {
log.Info("BlockByNumber", "number", num, "block.Hash", block.Hash(), "block.Number", block.Number(), "len(block.Transactions)", len(block.Transactions()))
Expand Down
55 changes: 48 additions & 7 deletions ethclient/lightclient/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/light"
Expand All @@ -43,20 +44,24 @@ type canonicalChain struct {
newHeadCb func(common.Hash)

head, finality *btypes.ExecutionHeader
recent map[uint64]common.Hash // nil until initialized
recentTail uint64 // if recent != nil then recent hashes are available from recentTail to head
recent map[uint64]common.Hash // nil until initialized
recentTail uint64 // if recent != nil then recent hashes are available from recentTail to head
tailFetchCh chan struct{}
finalized *lru.Cache[uint64, common.Hash] // finalized but not recent hashes
requests *requestMap[uint64, common.Hash] // requested; neither recent nor finalized
}

func newCanonicalChain(headTracker *light.HeadTracker, blocksAndHeaders *blocksAndHeaders, newHeadCb func(common.Hash)) *canonicalChain {
return &canonicalChain{
c := &canonicalChain{
headTracker: headTracker,
blocksAndHeaders: blocksAndHeaders,
newHeadCb: newHeadCb,
finalized: lru.NewCache[uint64, common.Hash](10000),
requests: newRequestMap[uint64, common.Hash](nil),
tailFetchCh: make(chan struct{}),
}
go c.tailFetcher()
return c
}

// Process implements request.Module in order to get notified about new heads.
Expand All @@ -75,6 +80,36 @@ func (c *canonicalChain) Process(requester request.Requester, events []request.E
}
}

func (c *canonicalChain) tailFetcher() { //TODO stop
for {
c.lock.Lock()
var (
tailNum uint64
tailHash common.Hash
)
if c.recent != nil {
tailNum, tailHash = c.recentTail, c.recent[c.recentTail]
}
needTail := tailNum
for _, reqNum := range c.requests.allKeys() {
if reqNum < needTail {
needTail = reqNum
}
}
c.lock.Unlock()
if needTail < tailNum { //TODO check recentCanonicalLength
log.Debug("Fetching tail headers", "have", tailNum, "need", needTail)
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
//TODO parallel fetch by number
if header, err := c.blocksAndHeaders.getHeader(ctx, tailHash); err == nil {
c.addRecentTail(header)
}
} else {
<-c.tailFetchCh
}
}
}

func (c *canonicalChain) getHash(ctx context.Context, number uint64) (common.Hash, error) {
c.lock.Lock()
if hash, ok := c.recent[number]; ok {
Expand All @@ -87,6 +122,11 @@ func (c *canonicalChain) getHash(ctx context.Context, number uint64) (common.Has
}
req := c.requests.request(number)
c.lock.Unlock()
select {
case c.tailFetchCh <- struct{}{}:
default:
}
defer req.release()
return req.getResult(ctx)
}

Expand All @@ -98,7 +138,7 @@ func (c *canonicalChain) setHead(head *btypes.ExecutionHeader) bool {
if c.head != nil && c.head.BlockHash() == headHash {
return false
}
if c.recent == nil || c.head == nil || c.head.BlockNumber()+1 != headNum || headHash != head.ParentHash() {
if c.recent == nil || c.head == nil || c.head.BlockNumber()+1 != headNum || c.head.BlockHash() != head.ParentHash() {
c.recent = make(map[uint64]common.Hash)
if headNum > 0 {
c.recent[headNum-1] = head.ParentHash()
Expand All @@ -117,6 +157,7 @@ func (c *canonicalChain) setHead(head *btypes.ExecutionHeader) bool {
c.recentTail++
}
c.requests.tryDeliver(headNum, headHash)
log.Debug("SetHead", "recentTail", c.recentTail, "head", headNum)
return true
}

Expand Down Expand Up @@ -186,12 +227,12 @@ func (c *canonicalChain) resolveBlockNumber(number *big.Int) (uint64, *btypes.Ex
}

func (c *canonicalChain) blockNumberToHash(ctx context.Context, number *big.Int) (common.Hash, error) {
num, header, err := c.resolveBlockNumber(number)
num, pheader, err := c.resolveBlockNumber(number)
if err != nil {
return common.Hash{}, err
}
if header != nil {
return header.BlockHash(), nil
if pheader != nil {
return pheader.BlockHash(), nil
}
return c.getHash(ctx, num)
}
Expand Down
12 changes: 12 additions & 0 deletions ethclient/lightclient/request_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (rm *requestMap[K, V]) request(key K) *mappedRequest[K, V] {
deliveredCh: make(chan struct{}),
cancelFn: cancelFn,
}
rm.requests[key] = r
if rm.requestFn != nil {
go func() {
result, err := rm.requestFn(ctx, key)
Expand All @@ -69,6 +70,17 @@ func (rm *requestMap[K, V]) has(key K) bool {
return ok
}

func (rm *requestMap[K, V]) allKeys() []K {
rm.lock.Lock()
defer rm.lock.Unlock()

keys := make([]K, 0, len(rm.requests))
for key := range rm.requests {
keys = append(keys, key)
}
return keys
}

// should only be called with validated results of successful requests
func (rm *requestMap[K, V]) tryDeliver(key K, result V) {
rm.lock.Lock()
Expand Down

0 comments on commit fe7568d

Please sign in to comment.