Skip to content

Commit

Permalink
Add min acceptable livepeer version as the broadcaster param
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed May 9, 2024
1 parent 50998d2 commit 703c92f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchAddr = flag.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.OrchWebhookURL = flag.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.OrchBlacklist = flag.String("orchBlocklist", "", "Comma-separated list of blocklisted orchestrators")
cfg.OrchMinLivepeerVersion = flag.String("orchMinLivepeerVersion", "", "Minimal go-livepeer version orchestrator should have to be selected")
cfg.SelectRandWeight = flag.Float64("selectRandFreq", *cfg.SelectRandWeight, "Weight of the random factor in the orchestrator selection algorithm")
cfg.SelectStakeWeight = flag.Float64("selectStakeWeight", *cfg.SelectStakeWeight, "Weight of the stake factor in the orchestrator selection algorithm")
cfg.SelectPriceWeight = flag.Float64("selectPriceWeight", *cfg.SelectPriceWeight, "Weight of the price factor in the orchestrator selection algorithm")
Expand Down
4 changes: 4 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type LivepeerConfig struct {
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
}

Expand Down Expand Up @@ -1169,6 +1170,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

n.Capabilities = core.NewCapabilities(transcoderCaps, core.MandatoryOCapabilities())
if cfg.OrchMinLivepeerVersion != nil {
n.Capabilities.AddMinVersion(core.NewMinVersionContraint(*cfg.OrchMinLivepeerVersion))
}

if drivers.NodeStorage == nil {
// base URI will be empty for broadcasters; that's OK
Expand Down
15 changes: 13 additions & 2 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (bcast *Capabilities) CompatibleWith(orch *net.Capabilities) bool {
return false
}

// TODO Compare bcast.constraints.minVersion <= orch.Version instead of this mock equality check
if bcast.constraints.minVersion != orch.Version {
return false
}
Expand All @@ -353,7 +354,7 @@ func (c *Capabilities) ToNetCapabilities() *net.Capabilities {
}
c.mutex.Lock()
defer c.mutex.Unlock()
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Version: c.version, Capacities: make(map[uint32]uint32)}
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Version: c.version, Capacities: make(map[uint32]uint32), Constraints: &net.Capabilities_Constraints{MinVersion: c.constraints.minVersion}}
for capability, capacity := range c.capacities {
netCaps.Capacities[uint32(capability)] = uint32(capacity)
}
Expand Down Expand Up @@ -389,8 +390,12 @@ func CapabilitiesFromNetCapabilities(caps *net.Capabilities) *Capabilities {
return coreCaps
}

func NewMinVersionContraint(minVersion string) *Capabilities {
return &Capabilities{constraints: Constraints{minVersion: minVersion}}
}

func NewCapabilities(caps []Capability, m []Capability) *Capabilities {
c := &Capabilities{capacities: make(map[Capability]int)}
c := &Capabilities{capacities: make(map[Capability]int), version: LivepeerVersion}
if len(caps) > 0 {
c.bitstring = NewCapabilityString(caps)
// initialize capacities to 1 by default, mandatory capabilities doesn't have capacities
Expand Down Expand Up @@ -576,3 +581,9 @@ func (bcast *Capabilities) LegacyOnly() bool {
}
return bcast.bitstring.CompatibleWith(legacyCapabilityString)
}

func (bcast *Capabilities) AddMinVersion(capabilities *Capabilities) {
if capabilities.constraints.minVersion != "" {
bcast.constraints.minVersion = capabilities.constraints.minVersion
}
}
1 change: 1 addition & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ func (bsm *BroadcastSessionsManager) shouldSkipVerification(sessions []*Broadcas
}

func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *core.StreamParameters, sel BroadcastSessionsSelectorFactory) *BroadcastSessionsManager {
params.Capabilities.AddMinVersion(node.Capabilities)
var trustedPoolSize, untrustedPoolSize float64
if node.OrchestratorPool != nil {
trustedPoolSize = float64(node.OrchestratorPool.SizeWith(common.ScoreAtLeast(common.Score_Trusted)))
Expand Down
1 change: 1 addition & 0 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
func endRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
return func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
params := streamParams(rtmpStrm.AppData())
params.Capabilities.AddMinVersion(s.LivepeerNode.Capabilities)
if params == nil {
return errMismatchedParams
}
Expand Down

0 comments on commit 703c92f

Please sign in to comment.