Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create option to filter Os by min livepeer version used #3050

Merged
merged 12 commits into from
May 13, 2024
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.OrchAddr = flag.String("orchAddr", *cfg.OrchAddr, "Comma-separated list of orchestrators to connect to")
cfg.OrchWebhookURL = flag.String("orchWebhookUrl", *cfg.OrchWebhookURL, "Orchestrator discovery callback URL")
cfg.OrchBlacklist = flag.String("orchBlocklist", "", "Comma-separated list of blocklisted orchestrators")
cfg.OrchMinLivepeerVersion = flag.String("orchMinLivepeerVersion", *cfg.OrchMinLivepeerVersion, "Minimal go-livepeer version orchestrator should have to be selected")
cfg.SelectRandWeight = flag.Float64("selectRandFreq", *cfg.SelectRandWeight, "Weight of the random factor in the orchestrator selection algorithm")
cfg.SelectStakeWeight = flag.Float64("selectStakeWeight", *cfg.SelectStakeWeight, "Weight of the stake factor in the orchestrator selection algorithm")
cfg.SelectPriceWeight = flag.Float64("selectPriceWeight", *cfg.SelectPriceWeight, "Weight of the price factor in the orchestrator selection algorithm")
Expand Down
10 changes: 8 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
AuthWebhookURL *string
OrchWebhookURL *string
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
}

Expand Down Expand Up @@ -230,6 +231,7 @@
// API
defaultAuthWebhookURL := ""
defaultOrchWebhookURL := ""
defaultMinLivepeerVersion := ""

Check warning on line 234 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L234

Added line #L234 was not covered by tests

// Flags
defaultTestOrchAvail := true
Expand Down Expand Up @@ -314,8 +316,9 @@
FVfailGsKey: &defaultFVfailGsKey,

// API
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,
AuthWebhookURL: &defaultAuthWebhookURL,
OrchWebhookURL: &defaultOrchWebhookURL,
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,

Check warning on line 321 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L319-L321

Added lines #L319 - L321 were not covered by tests

// Flags
TestOrchAvail: &defaultTestOrchAvail,
Expand Down Expand Up @@ -849,7 +852,7 @@
if mfv == nil {
panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue))
return
} else {

Check warning on line 855 in cmd/livepeer/starter/starter.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

if block ends with a return statement, so drop this else and outdent its block
n.SetMaxFaceValue(mfv)
}

Expand Down Expand Up @@ -1169,6 +1172,9 @@
}

n.Capabilities = core.NewCapabilities(transcoderCaps, core.MandatoryOCapabilities())
if cfg.OrchMinLivepeerVersion != nil {
n.Capabilities.SetMinVersionConstraint(*cfg.OrchMinLivepeerVersion)

Check warning on line 1176 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L1175-L1176

Added lines #L1175 - L1176 were not covered by tests
}

if drivers.NodeStorage == nil {
// base URI will be empty for broadcasters; that's OK
Expand Down
58 changes: 54 additions & 4 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
import (
"errors"
"fmt"

"sync"

"github.com/Masterminds/semver/v3"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/lpms/ffmpeg"
)

type Capability int
type CapabilityString []uint64
type Constraints struct{}
type Constraints struct {
minVersion string
}
type Capabilities struct {
bitstring CapabilityString
mandatories CapabilityString
version string
constraints Constraints
capacities map[Capability]int
mutex sync.Mutex
Expand Down Expand Up @@ -316,6 +320,34 @@
return &Capabilities{bitstring: NewCapabilityString(capList)}, nil
}

func (bcast *Capabilities) LivepeerVersionCompatibleWith(orch *net.Capabilities) bool {
if bcast == nil || orch == nil || bcast.constraints.minVersion == "" {
// should not happen, but just in case, return true by default
return true
}
if orch.Version == "" || orch.Version == "undefined" {
// Orchestrator/Transcoder version is not set, so it's incompatible
return false
}

minVer, err := semver.NewVersion(bcast.constraints.minVersion)
if err != nil {
glog.Warningf("error while parsing minVersion: %v", err)
return true
}
ver, err := semver.NewVersion(orch.Version)
if err != nil {
glog.Warningf("error while parsing version: %v", err)
return false
}

// Ignore prerelease versions as in go-livepeer we actually define post-release suffixes
minVerNoSuffix, _ := minVer.SetPrerelease("")
verNoSuffix, _ := ver.SetPrerelease("")

return !verNoSuffix.LessThan(&minVerNoSuffix)
}

func (bcast *Capabilities) CompatibleWith(orch *net.Capabilities) bool {
// Ensure bcast and orch are compatible with one another.

Expand All @@ -325,6 +357,9 @@
// cf. common.CapabilityComparator
return false
}
if !bcast.LivepeerVersionCompatibleWith(orch) {
return false
}

// For now, check this:
// ( orch.mandatories AND bcast.bitstring ) == orch.mandatories &&
Expand All @@ -346,7 +381,7 @@
}
c.mutex.Lock()
defer c.mutex.Unlock()
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Capacities: make(map[uint32]uint32)}
netCaps := &net.Capabilities{Bitstring: c.bitstring, Mandatories: c.mandatories, Version: c.version, Capacities: make(map[uint32]uint32), Constraints: &net.Capabilities_Constraints{MinVersion: c.constraints.minVersion}}
for capability, capacity := range c.capacities {
netCaps.Capacities[uint32(capability)] = uint32(capacity)
}
Expand All @@ -361,6 +396,8 @@
bitstring: caps.Bitstring,
mandatories: caps.Mandatories,
capacities: make(map[Capability]int),
version: caps.Version,
constraints: Constraints{minVersion: caps.Constraints.GetMinVersion()},
}
if caps.Capacities == nil || len(caps.Capacities) == 0 {
// build capacities map if not present (struct received from previous versions)
Expand All @@ -381,7 +418,7 @@
}

func NewCapabilities(caps []Capability, m []Capability) *Capabilities {
c := &Capabilities{capacities: make(map[Capability]int)}
c := &Capabilities{capacities: make(map[Capability]int), version: LivepeerVersion}
if len(caps) > 0 {
c.bitstring = NewCapabilityString(caps)
// initialize capacities to 1 by default, mandatory capabilities doesn't have capacities
Expand Down Expand Up @@ -567,3 +604,16 @@
}
return bcast.bitstring.CompatibleWith(legacyCapabilityString)
}

func (bcast *Capabilities) SetMinVersionConstraint(minVersionConstraint string) {
if bcast != nil {
bcast.constraints.minVersion = minVersionConstraint
}
}

func (bcast *Capabilities) MinVersionConstraint() string {
if bcast != nil {
return bcast.constraints.minVersion

Check warning on line 616 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L614-L616

Added lines #L614 - L616 were not covered by tests
}
return ""

Check warning on line 618 in core/capabilities.go

View check run for this annotation

Codecov / codecov/patch

core/capabilities.go#L618

Added line #L618 was not covered by tests
}
103 changes: 103 additions & 0 deletions core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ func TestCapability_CompatibleWithNetCap(t *testing.T) {
orch = NewCapabilities(nil, nil)
bcast = NewCapabilities(nil, []Capability{1})
assert.True(bcast.CompatibleWith(orch.ToNetCapabilities()))

// broadcaster is not compatible with orchestrator - old O's version
orch = NewCapabilities(nil, nil)
bcast = NewCapabilities(nil, nil)
bcast.constraints.minVersion = "0.4.1"
orch.version = "0.4.0"
assert.False(bcast.CompatibleWith(orch.ToNetCapabilities()))

// broadcaster is not compatible with orchestrator - the same version
orch = NewCapabilities(nil, nil)
bcast = NewCapabilities(nil, nil)
bcast.constraints.minVersion = "0.4.1"
orch.version = "0.4.1"
assert.True(bcast.CompatibleWith(orch.ToNetCapabilities()))
}

func TestCapability_RoundTrip_Net(t *testing.T) {
Expand Down Expand Up @@ -474,3 +488,92 @@ func TestCapabilities_LegacyCheck(t *testing.T) {

assert.Len(legacyCapabilities, legacyLen) // sanity check no modifications
}

func TestLiveeerVersionCompatibleWith(t *testing.T) {
tests := []struct {
name string
broadcasterMinVersion string
transcoderVersion string
expected bool
}{
{
name: "broadcaster required version is the same as the transcoder version",
broadcasterMinVersion: "0.4.1",
transcoderVersion: "0.4.1",
expected: true,
},
{
name: "broadcaster required version is less than the transcoder version",
broadcasterMinVersion: "0.4.0",
transcoderVersion: "0.4.1",
expected: true,
},
{
name: "broadcaster required version is more than the transcoder version",
broadcasterMinVersion: "0.4.2",
transcoderVersion: "0.4.1",
expected: false,
},
{
name: "broadcaster required version is the same as the transcoder dirty version",
broadcasterMinVersion: "0.4.1",
transcoderVersion: "0.4.1-b3278dce-dirty",
expected: true,
},
{
name: "broadcaster required version is before the transcoder dirty version",
broadcasterMinVersion: "0.4.0",
transcoderVersion: "0.4.1-b3278dce-dirty",
expected: true,
},
{
name: "broadcaster required version is after the transcoder dirty version",
broadcasterMinVersion: "0.4.2",
transcoderVersion: "0.4.1-b3278dce-dirty",
expected: false,
},
{
name: "broadcaster required version is empty",
broadcasterMinVersion: "",
transcoderVersion: "0.4.1",
expected: true,
},
{
name: "both versions are undefined",
broadcasterMinVersion: "",
transcoderVersion: "",
expected: true,
},
{
name: "transcoder version is empty",
broadcasterMinVersion: "0.4.0",
transcoderVersion: "",
expected: false,
},
{
name: "transcoder version is undefined",
broadcasterMinVersion: "0.4.0",
transcoderVersion: "undefined",
expected: false,
},
{
name: "unparsable broadcaster's min version",
broadcasterMinVersion: "nonparsablesemversion",
transcoderVersion: "0.4.1",
expected: true,
},
{
name: "unparsable transcoder's version",
broadcasterMinVersion: "0.4.1",
transcoderVersion: "nonparsablesemversion",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bCapabilities := &Capabilities{constraints: Constraints{minVersion: tt.broadcasterMinVersion}}
tCapabilities := &Capabilities{version: tt.transcoderVersion}
assert.Equal(t, tt.expected, bCapabilities.LivepeerVersionCompatibleWith(tCapabilities.ToNetCapabilities()))
})
}
}
2 changes: 1 addition & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*Livep
AutoAdjustPrice: true,
SegmentChans: make(map[ManifestID]SegmentChan),
segmentMutex: &sync.RWMutex{},
Capabilities: &Capabilities{capacities: map[Capability]int{}},
Capabilities: &Capabilities{capacities: map[Capability]int{}, version: LivepeerVersion},
priceInfo: make(map[string]*AutoConvertedPrice),
StorageConfigs: make(map[string]*transcodeConfig),
storageMutex: &sync.RWMutex{},
Expand Down
19 changes: 18 additions & 1 deletion core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ func TestSelectTranscoder(t *testing.T) {
strm := &StubTranscoderServer{manager: m, WithholdResults: false}
strm2 := &StubTranscoderServer{manager: m}

LivepeerVersion = "0.4.1"
capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})
LivepeerVersion = "undefined"

richCapabilities := NewCapabilities(append(DefaultCapabilities(), Capability_HEVC_Encode), []Capability{})
allCapabilities := NewCapabilities(append(DefaultCapabilities(), OptionalCapabilities()...), []Capability{})

Expand All @@ -259,7 +262,7 @@ func TestSelectTranscoder(t *testing.T) {
go func() { m.Manage(strm, 1, capabilities.ToNetCapabilities()) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, 1, richCapabilities.ToNetCapabilities()); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register
time.Sleep(1 * time.Millisecond) // allow time for second stream to register e for third stream to register

assert.NotNil(m.liveTranscoders[strm])
assert.NotNil(m.liveTranscoders[strm2])
Expand Down Expand Up @@ -341,6 +344,20 @@ func TestSelectTranscoder(t *testing.T) {
assert.Equal(1, t1.load)
m.completeStreamSession(testSessionId)
assert.Equal(0, t1.load)

// assert one transcoder with the correct Livepeer version is selected
minVersionCapabilities := NewCapabilities(DefaultCapabilities(), []Capability{})
minVersionCapabilities.SetMinVersionConstraint("0.4.0")
currentTranscoder, err = m.selectTranscoder(testSessionId, minVersionCapabilities)
assert.Nil(err)
m.completeStreamSession(testSessionId)

// assert no transcoders available for min version higher than any transcoder
minVersionHighCapabilities := NewCapabilities(DefaultCapabilities(), []Capability{})
minVersionHighCapabilities.SetMinVersionConstraint("0.4.2")
currentTranscoder, err = m.selectTranscoder(testSessionId, minVersionHighCapabilities)
assert.NotNil(err)
m.completeStreamSession(testSessionId)
}

func TestCompleteStreamSession(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,9 @@ func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string, caps *Cap
findCompatibleTranscoder := func(rtm *RemoteTranscoderManager) int {
for i := len(rtm.remoteTranscoders) - 1; i >= 0; i-- {
// no capabilities = default capabilities, all transcoders must support them
if caps == nil || caps.bitstring.CompatibleWith(rtm.remoteTranscoders[i].capabilities.bitstring) {
if caps == nil ||
(caps.bitstring.CompatibleWith(rtm.remoteTranscoders[i].capabilities.bitstring) &&
caps.LivepeerVersionCompatibleWith(rtm.remoteTranscoders[i].capabilities.ToNetCapabilities())) {
return i
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.1 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.1 h1:hJ3s7GbWlGK4YVV92sO88BQSyF4ZLVy7/awqOlPxFbA=
Expand Down