Skip to content

Commit

Permalink
health: Create API to wait for a stream to become active (#180)
Browse files Browse the repository at this point in the history
* health: Create WaitStarted to wait for stream start

* health: Only flag initialized if not inactive

* health: Move ConditionActive to health package

Let's say that it's been promoted as a first-party
condition of the health package lol

Doesn't really matter much as we want to kill all of this.

* health: Abide to linter

I don't think it reads as good but whatever

* health: Rename WaitStarted to WaitActive

* api: Create /stream/:id/wait-active API
  • Loading branch information
victorges committed Jan 31, 2024
1 parent 4693e48 commit 5ab219c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 15 deletions.
27 changes: 24 additions & 3 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,19 @@ func (h *apiHandler) streamHealthHandler() chi.Router {
if opts.AuthURL != "" {
router.Use(authorization(opts.AuthURL))
}
router.Use(
streamStatus(healthcore),
regionProxy(opts.RegionalHostFormat, opts.OwnRegion))

regionalMiddlewares := []middleware{
streamStatus(healthcore),
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
}
h.withMetrics(router, "get_stream_health").
With(regionalMiddlewares...).
MethodFunc("GET", "/health", h.getStreamHealth)
h.withMetrics(router, "stream_health_events").
With(regionalMiddlewares...).
MethodFunc("GET", "/events", h.subscribeEvents)
h.withMetrics(router, "wait_stream_active").
MethodFunc("GET", "/wait-active", h.waitStreamActive)

return router
}
Expand Down Expand Up @@ -558,6 +563,22 @@ func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request) {
}
}

func (h *apiHandler) waitStreamActive(rw http.ResponseWriter, r *http.Request) {
if h.core == nil {
respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable"))
return
}

streamID := apiParam(r, streamIDParam)
err := h.core.WaitActive(r.Context(), streamID)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

rw.WriteHeader(http.StatusNoContent)
}

func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription <-chan data.Event) <-chan jsse.Event {
if subscription == nil {
events := make(chan jsse.Event, len(pastEvents))
Expand Down
24 changes: 24 additions & 0 deletions health/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ func (c *Core) handleSingleEvent(evt data.Event) (err error) {
glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts)
}
}

for _, cond := range record.LastStatus.Conditions {
if cond.Type != ConditionActive {
continue
}
// We flag the record as initialized unless, from the received events,
// we know for sure that the stream is inactive.
isInactive := cond.Status != nil && !*cond.Status
if !isInactive {
record.FlagInitialized()
}
break
}

return nil
}

Expand Down Expand Up @@ -247,6 +261,16 @@ func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID
return pastEvents, subs, nil
}

func (c *Core) WaitActive(ctx context.Context, manifestID string) error {
// We actually create the record here if it doesn't exist, so that we can
// wait for it to be initialized.
record := c.storage.GetOrCreate(manifestID, c.conditionTypes)
if err := record.WaitInitialized(ctx); err != nil {
return err
}
return nil
}

func getPastEventsLocked(record *Record, lastEvtID *uuid.UUID, from, to *time.Time) ([]data.Event, error) {
fromIdx, toIdx := 0, len(record.PastEvents)
if lastEvtID != nil {
Expand Down
50 changes: 43 additions & 7 deletions health/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type Record struct {
Conditions []data.ConditionType

sync.RWMutex
disposed chan struct{}
initialized chan struct{}
disposed chan struct{}

PastEvents []data.Event
EventsByID map[uuid.UUID]data.Event
Expand All @@ -32,11 +33,43 @@ func NewRecord(id string, conditionTypes []data.ConditionType) *Record {
conditions[i] = data.NewCondition(cond, time.Time{}, nil, nil)
}
return &Record{
ID: id,
Conditions: conditionTypes,
disposed: make(chan struct{}),
EventsByID: map[uuid.UUID]data.Event{},
LastStatus: data.NewHealthStatus(id, conditions),
ID: id,
Conditions: conditionTypes,
initialized: make(chan struct{}),
disposed: make(chan struct{}),
EventsByID: map[uuid.UUID]data.Event{},
LastStatus: data.NewHealthStatus(id, conditions),
}
}

// FlagInitialized will flag the record as initialized. It is meant to be called
// after the first event is processed, meaning the record is not empty anymore.
//
// This is used to allow waiting until a stream is started by creating its
// record in an uninitialized state first and calling `WaitInitialized`. The
// initialization flag is simply a channel that is closed, which will unblock
// all goroutines waiting to receive from it (`WaitInitialized`).
func (r *Record) FlagInitialized() {
if !r.IsInitialized() {
close(r.initialized)
}
}

func (r *Record) IsInitialized() bool {
select {
case <-r.initialized:
return true
default:
return false
}
}

func (r *Record) WaitInitialized(ctx context.Context) error {
select {
case <-r.initialized:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down Expand Up @@ -102,7 +135,10 @@ func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)

func (s *RecordStorage) Get(id string) (*Record, bool) {
if saved, ok := s.records.Load(id); ok {
return saved.(*Record), true
// Until Initialize is called, the record is considered inexistent
if record := saved.(*Record); record.IsInitialized() {
return record, true
}
}
return nil, false
}
Expand Down
2 changes: 2 additions & 0 deletions health/reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/livepeer/livepeer-data/pkg/event"
)

const ConditionActive data.ConditionType = "Active"

type Reducer interface {
Bindings() []event.BindingArgs
Conditions() []data.ConditionType
Expand Down
9 changes: 4 additions & 5 deletions health/reducers/stream_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/livepeer-data/health"
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/event"
)

const (
streamStateBindingKey = "stream.state.#"

ConditionActive data.ConditionType = "Active"
)

type ActiveConditionExtraData struct {
Expand All @@ -28,7 +27,7 @@ func (t StreamStateReducer) Bindings() []event.BindingArgs {
}

func (t StreamStateReducer) Conditions() []data.ConditionType {
return []data.ConditionType{ConditionActive}
return []data.ConditionType{health.ConditionActive}
}

func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, evtIface data.Event) (*data.HealthStatus, interface{}) {
Expand All @@ -55,7 +54,7 @@ func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, ev
current = data.NewHealthStatus(current.ID, conditions)
}
for i, cond := range conditions {
if cond.Type == ConditionActive {
if cond.Type == health.ConditionActive {
newCond := data.NewCondition(cond.Type, evt.Timestamp(), &isActive, cond)
newCond.ExtraData = ActiveConditionExtraData{NodeID: evt.NodeID, Region: evt.Region}
conditions[i] = newCond
Expand All @@ -75,7 +74,7 @@ func clearConditions(conditions []*data.Condition) []*data.Condition {
}

func GetLastActiveData(status *data.HealthStatus) ActiveConditionExtraData {
data, ok := status.Condition(ConditionActive).ExtraData.(ActiveConditionExtraData)
data, ok := status.Condition(health.ConditionActive).ExtraData.(ActiveConditionExtraData)
if !ok {
return ActiveConditionExtraData{}
}
Expand Down

0 comments on commit 5ab219c

Please sign in to comment.