Skip to content

Commit

Permalink
Initialize round by any B/O who has the initializeRound flag set to t…
Browse files Browse the repository at this point in the history
…rue (#3029)
  • Loading branch information
leszko committed Apr 24, 2024
1 parent 9305333 commit e9cbadb
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 282 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.MaxGasPrice = flag.Int("maxGasPrice", *cfg.MaxGasPrice, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000")
cfg.EthController = flag.String("ethController", *cfg.EthController, "Protocol smart contract address")
cfg.InitializeRound = flag.Bool("initializeRound", *cfg.InitializeRound, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
cfg.InitializeRoundMaxDelay = flag.Duration("initializeRoundMaxDelay", *cfg.InitializeRoundMaxDelay, "Maximum delay to wait before initializing a round")
cfg.TicketEV = flag.String("ticketEV", *cfg.TicketEV, "The expected value for PM tickets")
cfg.MaxFaceValue = flag.String("maxFaceValue", *cfg.MaxFaceValue, "set max ticket face value in WEI")
// Broadcaster max acceptable ticket EV
Expand Down
205 changes: 104 additions & 101 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,76 +74,77 @@ const (
)

type LivepeerConfig struct {
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
TestOrchAvail *bool
Network *string
RtmpAddr *string
CliAddr *string
HttpAddr *string
ServiceAddr *string
OrchAddr *string
VerifierURL *string
EthController *string
VerifierPath *string
LocalVerify *bool
HttpIngest *bool
Orchestrator *bool
Transcoder *bool
Broadcaster *bool
OrchSecret *string
TranscodingOptions *string
MaxAttempts *int
SelectRandWeight *float64
SelectStakeWeight *float64
SelectPriceWeight *float64
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Netint *string
TestTranscoder *bool
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
EthOrchAddr *string
EthUrl *string
TxTimeout *time.Duration
MaxTxReplacements *int
GasLimit *int
MinGasPrice *int64
MaxGasPrice *int
InitializeRound *bool
InitializeRoundMaxDelay *time.Duration
TicketEV *string
MaxFaceValue *string
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Redeemer *bool
RedeemerAddr *string
Reward *bool
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
MetadataQueueUri *string
MetadataAmqpExchange *string
MetadataPublishTimeout *time.Duration
Datadir *string
Objectstore *string
Recordstore *string
FVfailGsBucket *string
FVfailGsKey *string
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
TestOrchAvail *bool
}

// DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
Expand Down Expand Up @@ -190,6 +191,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxGasPrice := 0
defaultEthController := ""
defaultInitializeRound := false
defaultInitializeRoundMaxDelay := 30 * time.Second
defaultTicketEV := "8000000000"
defaultMaxFaceValue := "0"
defaultMaxTicketEV := "3000000000000"
Expand Down Expand Up @@ -264,36 +266,37 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
EthPassword: &defaultEthPassword,
EthKeystorePath: &defaultEthKeystorePath,
EthOrchAddr: &defaultEthOrchAddr,
EthUrl: &defaultEthUrl,
TxTimeout: &defaultTxTimeout,
MaxTxReplacements: &defaultMaxTxReplacements,
GasLimit: &defaultGasLimit,
MaxGasPrice: &defaultMaxGasPrice,
EthController: &defaultEthController,
InitializeRound: &defaultInitializeRound,
TicketEV: &defaultTicketEV,
MaxFaceValue: &defaultMaxFaceValue,
MaxTicketEV: &defaultMaxTicketEV,
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Redeemer: &defaultRedeemer,
RedeemerAddr: &defaultRedeemerAddr,
Monitor: &defaultMonitor,
MetricsPerStream: &defaultMetricsPerStream,
MetricsExposeClientIP: &defaultMetricsExposeClientIP,
MetadataQueueUri: &defaultMetadataQueueUri,
MetadataAmqpExchange: &defaultMetadataAmqpExchange,
MetadataPublishTimeout: &defaultMetadataPublishTimeout,
EthAcctAddr: &defaultEthAcctAddr,
EthPassword: &defaultEthPassword,
EthKeystorePath: &defaultEthKeystorePath,
EthOrchAddr: &defaultEthOrchAddr,
EthUrl: &defaultEthUrl,
TxTimeout: &defaultTxTimeout,
MaxTxReplacements: &defaultMaxTxReplacements,
GasLimit: &defaultGasLimit,
MaxGasPrice: &defaultMaxGasPrice,
EthController: &defaultEthController,
InitializeRound: &defaultInitializeRound,
InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay,
TicketEV: &defaultTicketEV,
MaxFaceValue: &defaultMaxFaceValue,
MaxTicketEV: &defaultMaxTicketEV,
MaxTotalEV: &defaultMaxTotalEV,
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Redeemer: &defaultRedeemer,
RedeemerAddr: &defaultRedeemerAddr,
Monitor: &defaultMonitor,
MetricsPerStream: &defaultMetricsPerStream,
MetricsExposeClientIP: &defaultMetricsExposeClientIP,
MetadataQueueUri: &defaultMetadataQueueUri,
MetadataAmqpExchange: &defaultMetadataAmqpExchange,
MetadataPublishTimeout: &defaultMetadataPublishTimeout,

// Ingest:
HttpIngest: &defaultHttpIngest,
Expand Down Expand Up @@ -975,7 +978,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.InitializeRound {
// Start round initializer
// The node will only initialize rounds if it in the upcoming active set for the round
initializer := eth.NewRoundInitializer(n.Eth, timeWatcher)
initializer := eth.NewRoundInitializer(n.Eth, timeWatcher, *cfg.InitializeRoundMaxDelay)
go func() {
if err := initializer.Start(); err != nil {
serviceErr <- err
Expand Down
95 changes: 23 additions & 72 deletions eth/roundinitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package eth

import (
"math/big"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/golang/glog"
)
Expand All @@ -29,20 +30,22 @@ type timeWatcher interface {
// This selection process is purely a client side implementation that attempts to minimize on-chain transaction collisions, but
// collisions are still possible if initialization transactions are submitted by parties that are not using this selection process
type RoundInitializer struct {
client LivepeerEthClient
tw timeWatcher
quit chan struct{}
maxDelay time.Duration
client LivepeerEthClient
tw timeWatcher
quit chan struct{}

nextRoundStartL1Block *big.Int
mu sync.Mutex
}

// NewRoundInitializer creates a RoundInitializer instance
func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher) *RoundInitializer {
func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher, maxDelay time.Duration) *RoundInitializer {
return &RoundInitializer{
client: client,
tw: tw,
quit: make(chan struct{}),
maxDelay: maxDelay,
client: client,
tw: tw,
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -104,23 +107,23 @@ func (r *RoundInitializer) tryInitialize() error {
r.mu.Lock()
defer r.mu.Unlock()

currentL1Blk := r.tw.LastSeenL1Block()
lastInitializedL1BlkHash := r.tw.LastInitializedL1BlockHash()

epochSeed := r.currentEpochSeed(currentL1Blk, r.nextRoundStartL1Block, lastInitializedL1BlkHash)

ok, err := r.shouldInitialize(epochSeed)
if err != nil {
return err
if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
// Round already initialized
return nil
}

// Noop if the caller should not initialize the round
if !ok {
return nil
if r.maxDelay > 0 {
randDelay := time.Duration(rand.Int63n(int64(r.maxDelay)))
glog.Infof("Waiting %v before attempting to initialize round", randDelay)
time.Sleep(randDelay)

if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
glog.Infof("Round is already initialized, not initializing")
return nil
}
}

currentRound := new(big.Int).Add(r.tw.LastInitializedRound(), big.NewInt(1))

glog.Infof("New round - preparing to initialize round to join active set, current round is %d", currentRound)

tx, err := r.client.InitializeRound()
Expand All @@ -136,55 +139,3 @@ func (r *RoundInitializer) tryInitialize() error {

return nil
}

func (r *RoundInitializer) shouldInitialize(epochSeed *big.Int) (bool, error) {
transcoders, err := r.client.TranscoderPool()
if err != nil {
return false, err
}

numActive := big.NewInt(int64(len(transcoders)))

// Should not initialize if the upcoming active set is empty
if numActive.Cmp(big.NewInt(0)) == 0 {
return false, nil
}

// Find the caller's rank in the upcoming active set
rank := int64(-1)
maxRank := numActive.Int64()
caller := r.client.Account().Address
for i := int64(0); i < maxRank; i++ {
if transcoders[i].Address == caller {
rank = i
break
}
}

// Should not initialize if the caller is not in the upcoming active set
if rank == -1 {
return false, nil
}

// Use the seed to select a position within the active set
selection := new(big.Int).Mod(epochSeed, numActive)
// Should not initialize if the selection does not match the caller's rank in the active set
if selection.Int64() != int64(rank) {
return false, nil
}

// If the selection matches the caller's rank the caller should initialize the round
return true, nil
}

// Returns the seed used to select a round initializer in the current epoch for the current round
// This seed is not meant to be unpredictable. The only requirement for the seed is that it is calculated the same way for each
// party running the round initializer
func (r *RoundInitializer) currentEpochSeed(currentL1Block, roundStartL1Block *big.Int, lastInitializedL1BlkHash [32]byte) *big.Int {
epochNum := new(big.Int).Sub(currentL1Block, roundStartL1Block)
epochNum.Div(epochNum, epochL1Blocks)

// The seed for the current epoch is calculated as:
// keccak256(lastInitializedL1BlkHash | epochNum)
return crypto.Keccak256Hash(append(lastInitializedL1BlkHash[:], epochNum.Bytes()...)).Big()
}

0 comments on commit e9cbadb

Please sign in to comment.