Skip to content

Commit

Permalink
task/runner: Add metrics on task latency and result (#157)
Browse files Browse the repository at this point in the history
* task: Create metric for task results count

* task: Move log & metric to publishTaskResult

Need to use the correct value of unretriable field

* task: Fix humanization of some catalyst errors

Including keeping the unretriable flag from Catalyst

* task: Add metric for task step duration

* task/runner: Remove error already handled as invalidVideo
  • Loading branch information
victorges committed Feb 28, 2023
1 parent 05a32ea commit c4d56d3
Showing 1 changed file with 83 additions and 19 deletions.
102 changes: 83 additions & 19 deletions task/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"runtime/debug"
"strconv"
"strings"
"time"

Expand All @@ -17,6 +18,8 @@ import (
"github.com/livepeer/livepeer-data/pkg/data"
"github.com/livepeer/livepeer-data/pkg/event"
"github.com/livepeer/task-runner/clients"
"github.com/livepeer/task-runner/metrics"
"github.com/prometheus/client_golang/prometheus"
amqp "github.com/rabbitmq/amqp091-go"
)

Expand All @@ -35,7 +38,7 @@ var (
// without including extraneous error information from Catalyst
errInvalidVideo = UnretriableError{errors.New("invalid video file codec or container, check your input file against the input codec and container support matrix")}
// TODO(yondonfu): Add link in this error message to a page with the input codec/container support matrix
errProbe = UnretriableError{errors.New("failed to probe or open file, check your input file against the input codec and container support matrix")}
errProbe = UnretriableError{errors.New("failed to probe or open file, check your input file against the input codec and container support matrix")}
)

var (
Expand All @@ -46,8 +49,25 @@ var (
"transcode": TaskTranscode,
"transcode-file": TaskTranscodeFile,
}

errInternalProcessingError = errors.New("internal error processing file")
taskFatalErrorInfo = &data.ErrorInfo{Message: errInternalProcessingError.Error(), Unretriable: true}

taskResultCount = metrics.Factory.NewCounterVec(
prometheus.CounterOpts{
Name: metrics.FQName("task_result_count"),
Help: "Breakdown of task execution results by task type and status (success, error, unretriable_error)",
},
[]string{"task_type", "status"},
)
taskStepDurationSec = metrics.Factory.NewHistogramVec(
prometheus.HistogramOpts{
Name: metrics.FQName("task_step_duration_sec"),
Help: "Time spent executing a task by task type, step and result flags",
Buckets: []float64{5, 15, 60, 300, 900, 3600},
},
[]string{"task_type", "step", "finished", "errored"},
)
)

type TaskHandlerOutput struct {
Expand Down Expand Up @@ -259,14 +279,19 @@ func (r *runner) handleAMQPMessage(msg amqp.Delivery) (err error) {
}
}()

startTime := time.Now()
output, err := r.handleTask(ctx, task)
if err == nil && output != nil && output.Continue {

willContinue := err == nil && output != nil && output.Continue
taskStepDurationSec.
WithLabelValues(task.Type, task.Step, strconv.FormatBool(!willContinue), strconv.FormatBool(err != nil)).
Observe(time.Since(startTime).Seconds())

if willContinue {
glog.Infof("Task handler will continue task async type=%q id=%s output=%+v", task.Type, task.ID, output)
return nil
}

glog.Infof("Task handler processed task type=%q id=%s output=%+v error=%q unretriable=%v", task.Type, task.ID, output, err, IsUnretriable(err))

// return the error directly so that if publishing the result fails we nack the message to try again
return r.publishTaskResult(task, output, err)
}
Expand Down Expand Up @@ -426,18 +451,29 @@ func (r *runner) scheduleTaskStep(taskID, step string, input interface{}) error
return nil
}

func (r *runner) publishTaskResult(task data.TaskInfo, output *TaskHandlerOutput, resultErr error) error {
if r.HumanizeErrors {
resultErr = humanizeError(resultErr)
}
func (r *runner) publishTaskResult(task data.TaskInfo, output *TaskHandlerOutput, rawErr error) error {
errInfo := makeErrorInfo(rawErr)

taskResultCount.WithLabelValues(task.Type, taskResultStatusLabel(errInfo)).Inc()
glog.Infof("Task handler processed task type=%q id=%s output=%+v error=%q humanError=%q unretriable=%v",
task.Type, task.ID, output, errInfo.RawError, errInfo.HumanError, errInfo.Unretriable)

var body *data.TaskResultEvent
if resultErr != nil {
body = data.NewTaskResultEvent(task, errorInfo(resultErr), nil)
if errInfo.RawError != nil {
evtErrInfo := &data.ErrorInfo{
Message: errInfo.RawError.Error(),
Unretriable: errInfo.Unretriable,
}
if r.HumanizeErrors {
evtErrInfo.Message = errInfo.HumanError.Error()
}
body = data.NewTaskResultEvent(task, evtErrInfo, nil)
} else if output != nil {
body = data.NewTaskResultEvent(task, errorInfo(resultErr), output.TaskOutput)
body = data.NewTaskResultEvent(task, nil, output.TaskOutput)
} else {
return errors.New("output or resultErr must be non-nil")
}

key := fmt.Sprintf("task.result.%s.%s", task.Type, task.ID)
if err := r.publishLogged(task, r.ExchangeName, key, body); err != nil {
return fmt.Errorf("error publishing task result event: %w", err)
Expand Down Expand Up @@ -481,11 +517,29 @@ func (r *runner) Shutdown(ctx context.Context) error {
return r.amqp.Shutdown(ctx)
}

func errorInfo(err error) *data.ErrorInfo {
type taskErrInfo struct {
RawError, HumanError error
Unretriable bool
}

func makeErrorInfo(err error) taskErrInfo {
if err == nil {
return nil
return taskErrInfo{}
}
humanErr := humanizeError(err)

if IsUnretriable(err) && !IsUnretriable(humanErr) {
// catch if we ever create a human error that loses the (opt-in) unretriable
// flag of the original error. we still consider the original error below.
glog.Warningf("Error is unretriable but humanized error is retriable. originalErr=%q humanErr=%q", err, humanErr)
}
unretriable := IsUnretriable(err) || IsUnretriable(humanErr)

return taskErrInfo{
RawError: err,
HumanError: humanErr,
Unretriable: unretriable,
}
return &data.ErrorInfo{Message: err.Error(), Unretriable: IsUnretriable(err)}
}

// Caller should check if err is a CatalystError first
Expand Down Expand Up @@ -543,20 +597,20 @@ func humanizeCatalystError(err error) error {
}
}
if strings.Contains(errMsg, "error running ffprobe") && strings.Contains(errMsg, "exit status 1") {
return errInvalidVideo
return errInvalidVideo
}
if strings.Contains(errMsg, "failed probe/open") {
return errProbe
}

// Livepeer pipeline errors
if strings.Contains(errMsg, "unsupported input pixel format") {
return errors.New("unsupported input pixel format, must be 'yuv420p' or 'yuvj420p'")
} else if strings.Contains(errMsg, "Unsupported video input") {
// TODO(yondonfu): This check probably does not work because the error message will be lowercased
return errors.New("unsupported file format")
return UnretriableError{errors.New("unsupported input pixel format, must be 'yuv420p' or 'yuvj420p'")}
}

if IsUnretriable(err) {
return UnretriableError{errInternalProcessingError}
}
return errInternalProcessingError
}

Expand Down Expand Up @@ -633,6 +687,16 @@ func publishLoggedRaw(producer event.AMQPProducer, task data.TaskInfo, exchange,
return nil
}

func taskResultStatusLabel(errInfo taskErrInfo) string {
if errInfo.RawError == nil {
return "success"
} else if errInfo.Unretriable {
return "unretriable_error"
} else {
return "error"
}
}

func taskResultMessageKey(ttype, id string) string {
return fmt.Sprintf("task.result.%s.%s", ttype, id)
}

0 comments on commit c4d56d3

Please sign in to comment.