Skip to content

Commit

Permalink
ethclient/lightclient: SubscribeNewHead
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Mar 1, 2024
1 parent 10b06fb commit 27e8458
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 10 deletions.
22 changes: 15 additions & 7 deletions ethclient/lightclient/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type canonicalChain struct {
lock sync.Mutex
headTracker *light.HeadTracker
blocksAndHeaders *blocksAndHeaders
newHeadCb func(*types.Header)

head, finality *types.Header
recent map[uint64]common.Hash // nil until initialized
Expand All @@ -46,27 +47,30 @@ type canonicalChain struct {
requests *requestMap[uint64, common.Hash] // requested; neither recent nor finalized
}

func newCanonicalChain(headTracker *light.HeadTracker, blocksAndHeaders *blocksAndHeaders) *canonicalChain {
func newCanonicalChain(headTracker *light.HeadTracker, blocksAndHeaders *blocksAndHeaders, newHeadCb func(*types.Header)) *canonicalChain {
return &canonicalChain{
headTracker: headTracker,
blocksAndHeaders: blocksAndHeaders,
newHeadCb: newHeadCb,
finalized: lru.NewCache[uint64, common.Hash](10000),
requests: newRequestMap[uint64, common.Hash](),
}
}

// Process implements request.Module in order to get notified about new heads.
func (c *canonicalChain) Process(requester request.Requester, events []request.Event) {
if optimistic, ok := c.headTracker.ValidatedOptimistic(); ok {
head := optimistic.Attested.ExecHeader()
c.setHead(head)
c.blocksAndHeaders.addHeader(head)
}
if finality, ok := c.headTracker.ValidatedFinality(); ok {
finalized := finality.Finalized.ExecHeader()
c.setFinality(finalized)
c.blocksAndHeaders.addHeader(finalized)
}
if optimistic, ok := c.headTracker.ValidatedOptimistic(); ok {
head := optimistic.Attested.ExecHeader()
c.blocksAndHeaders.addHeader(head)
if c.setHead(head) {
c.newHeadCb(head)
}
}
}

func (c *canonicalChain) getHash(ctx context.Context, number uint64) (common.Hash, error) {
Expand All @@ -84,11 +88,14 @@ func (c *canonicalChain) getHash(ctx context.Context, number uint64) (common.Has
return c.requests.waitForValue(ctx, number, ch)
}

func (c *canonicalChain) setHead(head *types.Header) {
func (c *canonicalChain) setHead(head *types.Header) bool {
c.lock.Lock()
defer c.lock.Unlock()

headNum, headHash := head.Number.Uint64(), head.Hash()
if c.head != nil && c.head.Hash() == headHash {
return false
}
if c.recent == nil || c.head == nil || c.head.Number.Uint64()+1 != headNum || headHash != head.ParentHash {
c.recent = make(map[uint64]common.Hash)
if headNum > 0 {
Expand All @@ -108,6 +115,7 @@ func (c *canonicalChain) setHead(head *types.Header) {
c.recentTail++
}
c.requests.deliver(headNum, headHash, nil)
return true
}

func (c *canonicalChain) setFinality(finality *types.Header) {
Expand Down
47 changes: 44 additions & 3 deletions ethclient/lightclient/lightclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"math/big"
ssync "sync"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/light"
Expand All @@ -35,6 +36,8 @@ type Client struct {
scheduler *request.Scheduler
canonicalChain *canonicalChain
blocksAndHeaders *blocksAndHeaders
headSubLock ssync.Mutex
headSubs map[*headSub]struct{}
}

func NewClient(config light.ClientConfig, db ethdb.Database, rpcClient *rpc.Client) *Client {
Expand All @@ -47,12 +50,13 @@ func NewClient(config light.ClientConfig, db ethdb.Database, rpcClient *rpc.Clie
//chainHeadFeed := new(event.Feed)
scheduler := request.NewScheduler()
blocksAndHeaders := newBlocksAndHeaders(rpcClient)
canonicalChain := newCanonicalChain(headTracker, blocksAndHeaders)
client := &Client{
scheduler: scheduler,
blocksAndHeaders: blocksAndHeaders,
canonicalChain: canonicalChain,
headSubs: make(map[*headSub]struct{}),
}
canonicalChain := newCanonicalChain(headTracker, blocksAndHeaders, client.broadcastNewHead)
client.canonicalChain = canonicalChain

checkpointInit := sync.NewCheckpointInit(committeeChain, config.Checkpoint)
forwardSync := sync.NewForwardUpdateSync(committeeChain)
Expand Down Expand Up @@ -143,5 +147,42 @@ func (c *Client) TransactionInBlock(ctx context.Context, blockHash common.Hash,
}

func (c *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil //TODO
sub := &headSub{
client: c,
headCh: ch,
errCh: make(chan error, 1),
}
c.headSubLock.Lock()
c.headSubs[sub] = struct{}{}
c.headSubLock.Unlock()
return sub, nil
}

func (c *Client) broadcastNewHead(head *types.Header) {
c.headSubLock.Lock()
for sub := range c.headSubs {
sub.headCh <- head
}
c.headSubLock.Unlock()
}

func (c *Client) unsubscribeNewHead(sub *headSub) {
c.headSubLock.Lock()
delete(c.headSubs, sub)
c.headSubLock.Unlock()
}

type headSub struct {
client *Client
headCh chan<- *types.Header
errCh chan error
}

func (h *headSub) Unsubscribe() {
h.client.unsubscribeNewHead(h)
close(h.errCh)
}

func (h *headSub) Err() <-chan error {
return h.errCh
}
122 changes: 122 additions & 0 deletions ethclient/lightclient/request_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package lightclient

import (
"context"
"sync"
)

type valueAndError[V any] struct {
value V
err error
}

type objectRequest[V any] struct {
pending map[chan valueAndError[V]]struct{}
cancelFn func()
}

type requestMap[K comparable, V any] struct {
lock sync.Mutex
requests map[K]*objectRequest[V]
}

func newRequestMap[K comparable, V any]() *requestMap[K, V] {
return &requestMap[K, V]{
requests: make(map[K]*objectRequest[V]),
}
}

func (r *requestMap[K, V]) add(key K) (chan valueAndError[V], bool) {
r.lock.Lock()
defer r.lock.Unlock()

ch := make(chan valueAndError[V], 1)
req, ok := r.requests[key]
if !ok {
req = &objectRequest[V]{pending: make(map[chan valueAndError[V]]struct{})}
}
req.pending[ch] = struct{}{}
return ch, !ok
}

func (r *requestMap[K, V]) has(key K) bool {
r.lock.Lock()
defer r.lock.Unlock()

_, ok := r.requests[key]
return ok
}

func (r *requestMap[K, V]) requestContext(key K) context.Context {
r.lock.Lock()
defer r.lock.Unlock()

ctx, cancelFn := context.WithCancel(context.Background())
if req, ok := r.requests[key]; ok {
req.cancelFn = cancelFn
} else {
cancelFn()
}
return ctx
}

func (r *requestMap[K, V]) deliver(key K, value V, err error) {
r.lock.Lock()
defer r.lock.Unlock()

req, ok := r.requests[key]
if !ok {
return
}
for ch := range req.pending {
ch <- valueAndError[V]{value: value, err: err}
}
delete(r.requests, key)
if req.cancelFn != nil {
req.cancelFn()
}
}

func (r *requestMap[K, V]) remove(key K, ch chan valueAndError[V]) {
r.lock.Lock()
defer r.lock.Unlock()

req, ok := r.requests[key]
if !ok {
return
}
delete(req.pending, ch)
if len(req.pending) == 0 {
delete(r.requests, key)
if req.cancelFn != nil {
req.cancelFn()
}
}
}

func (r *requestMap[K, V]) waitForValue(ctx context.Context, key K, ch chan valueAndError[V]) (V, error) {
var empty V
select {
case v := <-ch:
return v.value, v.err
case <-ctx.Done():
r.remove(key, ch)
return empty, ctx.Err()
}
}

0 comments on commit 27e8458

Please sign in to comment.