Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

image api #961

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/livepeer/catalyst-api/handlers/misttriggers"
"github.com/livepeer/catalyst-api/log"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/livepeer/catalyst-api/metrics"
"github.com/livepeer/catalyst-api/middleware"
"github.com/livepeer/catalyst-api/pipeline"
"github.com/livepeer/go-api-client"
Expand Down Expand Up @@ -113,6 +114,14 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
),
),
)
router.GET("/api/image/:playbackID/thumbnail.jpg",
middleware.LogAndMetrics(metrics.Metrics.ImageAPIDurationSec)(
withAuth(
cli.APIToken,
handlers.NewImageHandler(cli.PrivateBucketURLs).Handle,
),
),
)

// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Cli struct {
APIServer string
SourceOutput string
PrivateBucketURLs []*url.URL
PublicBucketURLs []*url.URL
ExternalTranscoder string
VodPipelineStrategy string
MetricsDBConnectionString string
Expand Down
199 changes: 199 additions & 0 deletions handlers/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package handlers

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"time"

"github.com/grafov/m3u8"
"github.com/julienschmidt/httprouter"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/metrics"
"github.com/livepeer/catalyst-api/playback"
ffmpeg "github.com/u2takey/ffmpeg-go"
)

type ImageHandler struct {
PublicBucketURLs []*url.URL
}

func NewImageHandler(urls []*url.URL) *ImageHandler {
return &ImageHandler{
PublicBucketURLs: urls,
}

Check warning on line 32 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}

func (p *ImageHandler) Handle(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
timeString := req.URL.Query().Get("time")
time, err := strconv.ParseFloat(timeString, 64)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse time", nil)
return
}
width, err := parseResolution(req, "width", 320)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse width", nil)
return
}

Check warning on line 48 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L45-L48

Added lines #L45 - L48 were not covered by tests
height, err := parseResolution(req, "height", 240)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
caterrs.WriteHTTPBadRequest(w, "failed to parse height", nil)
return
}

Check warning on line 54 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L51-L54

Added lines #L51 - L54 were not covered by tests

playbackID := params.ByName("playbackID")
if playbackID == "" {
caterrs.WriteHTTPBadRequest(w, "playbackID was empty", nil)
return
}

Check warning on line 60 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L58-L60

Added lines #L58 - L60 were not covered by tests

err = p.handle(w, playbackID, time, width, height)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
switch {
case errors.Is(err, caterrs.ObjectNotFoundError):
caterrs.WriteHTTPNotFound(w, "not found", nil)
default:
caterrs.WriteHTTPInternalServerError(w, "internal server error", nil)

Check warning on line 69 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}
}
}

func parseResolution(req *http.Request, key string, defaultVal int64) (int64, error) {
val := req.URL.Query().Get(key)
if val == "" {
return defaultVal, nil
}
return strconv.ParseInt(val, 10, 32)

Check warning on line 79 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L79

Added line #L79 was not covered by tests
}

func (p *ImageHandler) handle(w http.ResponseWriter, playbackID string, t float64, width int64, height int64) error {
var (
err error
segmentFile string
dir = playbackID
start = time.Now()
)

// download master playlist
fileInfoReader, err := playback.OsFetch(p.PublicBucketURLs, dir, "index.m3u8", "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

manifest, _, err := m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 99 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L98-L99

Added lines #L98 - L99 were not covered by tests
masterPlaylist, ok := manifest.(*m3u8.MasterPlaylist)
if !ok || masterPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MasterPlaylist")
}

Check warning on line 103 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L102-L103

Added lines #L102 - L103 were not covered by tests
if len(masterPlaylist.Variants) < 1 {
return fmt.Errorf("no renditions found")
}

Check warning on line 106 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L105-L106

Added lines #L105 - L106 were not covered by tests

// download rendition playlist
renditionUri := masterPlaylist.Variants[0].URI
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, renditionUri, "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

Check warning on line 113 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L112-L113

Added lines #L112 - L113 were not covered by tests
dir = filepath.Join(dir, filepath.Dir(renditionUri))
manifest, _, err = m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 118 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L117-L118

Added lines #L117 - L118 were not covered by tests
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

Check warning on line 122 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L121-L122

Added lines #L121 - L122 were not covered by tests

// find the segment required
currentTime := 0.0
extractTime := 0.0
for _, segment := range mediaPlaylist.GetAllSegments() {
currentTime += segment.Duration
if currentTime > t {
segmentFile = segment.URI
extractTime = t - currentTime + segment.Duration
break
}
}
if segmentFile == "" {
return fmt.Errorf("playbackID media not found: %w", caterrs.ObjectNotFoundError)
}

tmpDir, err := os.MkdirTemp(os.TempDir(), "image-api-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}

Check warning on line 142 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L141-L142

Added lines #L141 - L142 were not covered by tests
defer os.RemoveAll(tmpDir)

// download the segment
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, segmentFile, "")
if err != nil {
return fmt.Errorf("failed to get media: %w", err)
}

Check warning on line 149 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L148-L149

Added lines #L148 - L149 were not covered by tests
segBytes, err := io.ReadAll(fileInfoReader.Body)
if err != nil {
return fmt.Errorf("failed to get bytes: %w", err)
}

Check warning on line 153 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L152-L153

Added lines #L152 - L153 were not covered by tests

inputFile := path.Join(tmpDir, "in.ts")
if err = os.WriteFile(inputFile, segBytes, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

Check warning on line 158 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L157-L158

Added lines #L157 - L158 were not covered by tests
outputFile := path.Join(tmpDir, "out.jpg")

metrics.Metrics.ImageAPIDownloadDurationSec.WithLabelValues().Observe(time.Since(start).Seconds())

// extract image
extractStart := time.Now()
defer func() {
metrics.Metrics.ImageAPIExtractDurationSec.WithLabelValues().Observe(time.Since(extractStart).Seconds())
}()

var ffmpegErr bytes.Buffer
err = ffmpeg.
Input(inputFile).
Output(
outputFile,
ffmpeg.KwArgs{
"ss": fmt.Sprintf("00:00:%d", int64(extractTime)),
"vframes": "1",
"vf": fmt.Sprintf("scale=%d:%d:force_original_aspect_ratio=decrease", width, height),
},
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()
if err != nil {
return fmt.Errorf("ffmpeg failed [%s]: %w", ffmpegErr.String(), err)
}

Check warning on line 182 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L181-L182

Added lines #L181 - L182 were not covered by tests

bs, err := os.ReadFile(outputFile)
if err != nil {
return err
}

Check warning on line 187 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L186-L187

Added lines #L186 - L187 were not covered by tests

w.Header().Set("content-type", "image/jpg")
w.Header().Set("content-length", strconv.Itoa(len(bs)))
w.WriteHeader(http.StatusOK)
count, err := w.Write(bs)
if err != nil {
log.LogNoRequestID("image handler failed to write response", "err", err)

Check warning on line 194 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L194

Added line #L194 was not covered by tests
} else {
log.LogNoRequestID("image handler wrote response", "count", count)
}
return nil
}
98 changes: 98 additions & 0 deletions handlers/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package handlers

import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"
"gopkg.in/vansante/go-ffprobe.v2"
)

func TestImageHandler_Handle(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
handler := &ImageHandler{
PublicBucketURLs: []*url.URL{{Scheme: "file", Path: wd + "/../test"}},
}
tests := []struct {
name string
time string
playbackID string
expectedStatus int
}{
{
name: "first segment",
time: "5",
},
{
name: "second segment",
time: "21",
},
{
name: "final segment",
time: "29",
},
{
name: "out of bounds",
time: "30",
expectedStatus: http.StatusNotFound,
},
{
name: "invalid time",
time: "",
expectedStatus: http.StatusBadRequest,
},
{
name: "playbackID not found",
time: "29",
playbackID: "foo",
expectedStatus: http.StatusNotFound,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "?time="+tt.time, nil)
require.NoError(t, err)

if tt.playbackID == "" {
tt.playbackID = "fixtures" // just use the fixtures directory for testing
}
handler.Handle(w, req, []httprouter.Param{{
Key: "playbackID",
Value: tt.playbackID,
}})
resp := w.Result()
if tt.expectedStatus == 0 {
tt.expectedStatus = 200
}
require.Equal(t, tt.expectedStatus, resp.StatusCode)

if tt.expectedStatus != 200 {
return
}
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

outfile, err := os.CreateTemp(os.TempDir(), "out*.jpg")
require.NoError(t, err)
defer os.Remove(outfile.Name())
_, err = outfile.Write(respBytes)
require.NoError(t, err)
log.Println(outfile.Name())
probeData, err := ffprobe.ProbeURL(context.Background(), outfile.Name())
require.NoError(t, err)
require.Equal(t, "image2", probeData.Format.FormatName)
require.Len(t, probeData.Streams, 1)
require.Greater(t, probeData.Streams[0].Width, 0)
require.Greater(t, probeData.Streams[0].Height, 0)
})
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLSliceVarFlag(fs, &cli.PrivateBucketURLs, "private-bucket", "", "URL for the private media bucket")
config.URLSliceVarFlag(fs, &cli.PublicBucketURLs, "public-bucket", "", "URL for the public media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
Expand Down
14 changes: 14 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type CatalystAPIMetrics struct {
SerfEventBufferSize prometheus.Gauge
AccessControlRequestCount *prometheus.CounterVec
AccessControlRequestDurationSec *prometheus.SummaryVec
ImageAPIDurationSec *prometheus.SummaryVec
ImageAPIDownloadDurationSec *prometheus.SummaryVec
ImageAPIExtractDurationSec *prometheus.SummaryVec

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down Expand Up @@ -112,6 +115,9 @@ func NewMetrics() *CatalystAPIMetrics {
Name: "access_control_request_duration_seconds",
Help: "The latency of the access control requests",
}, []string{"allowed", "playbackID"}),
ImageAPIDurationSec: durationSummary("image_api_response_duration", "Total time taken to process Image API request", "success", "status_code", "version"),
ImageAPIDownloadDurationSec: durationSummary("image_api_download_duration", "Time taken to download media from storage while generating an image"),
ImageAPIExtractDurationSec: durationSummary("image_api_extract_duration", "Time taken to generate image"),

// Clients metrics
TranscodingStatusUpdate: ClientMetrics{
Expand Down Expand Up @@ -212,4 +218,12 @@ func NewMetrics() *CatalystAPIMetrics {
return m
}

func durationSummary(name, help string, labelNames ...string) *prometheus.SummaryVec {
return promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: name,
Help: help,
Objectives: map[float64]float64{0.5: 0.05, 0.8: 0.01, 0.95: 0.01},
}, labelNames)
}

var Metrics = NewMetrics()
7 changes: 6 additions & 1 deletion playback/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -103,6 +104,10 @@ func appendAccessKey(uri, gatingParam, gatingParamName string) (string, error) {
}

func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
return OsFetch(buckets, filepath.Join("hls", playbackID), file, byteRange)
}

func OsFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
if len(buckets) < 1 {
return nil, errors.New("playback failed, no private buckets configured")
}
Expand All @@ -113,7 +118,7 @@ func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.F
)
// try all private buckets until object is found or return error
for _, bucket := range buckets {
osURL := bucket.JoinPath("hls").JoinPath(playbackID).JoinPath(file)
osURL := bucket.JoinPath(playbackID).JoinPath(file)
f, err = clients.GetOSURL(osURL.String(), byteRange)
if err == nil {
// object found successfully so return early
Expand Down