Skip to content

Commit

Permalink
Image API
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 15, 2024
1 parent 452977f commit 2596867
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 1 deletion.
6 changes: 6 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
for _, path := range [...]string{"/asset/hls/:playbackID/*file", "/webrtc/:playbackID"} {
router.OPTIONS(path, playback)
}
image := middleware.LogAndMetrics(metrics.Metrics.ImageRequestDurationSec)(
withCORS(
handlers.NewImageHandler(cli.PublicBucketURLs).Handle,
),
)
router.GET("/asset/image/:playbackID", image)

// Handling incoming playback redirection requests
redirectHandler := withLogging(withCORS(geoHandlers.RedirectHandler()))
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Cli struct {
APIServer string
SourceOutput string
PrivateBucketURLs []*url.URL
PublicBucketURLs []*url.URL
ExternalTranscoder string
VodPipelineStrategy string
MetricsDBConnectionString string
Expand Down
163 changes: 163 additions & 0 deletions handlers/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package handlers

import (
"errors"
"fmt"
"github.com/grafov/m3u8"
"github.com/julienschmidt/httprouter"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/playback"
ffmpeg "github.com/u2takey/ffmpeg-go"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
)

type ImageHandler struct {
PublicBucketURLs []*url.URL
}

func NewImageHandler(urls []*url.URL) *ImageHandler {
return &ImageHandler{
PublicBucketURLs: urls,
}

Check warning on line 28 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L25-L28

Added lines #L25 - L28 were not covered by tests
}

func (p *ImageHandler) Handle(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
timeString := req.URL.Query().Get("time")
time, err := strconv.ParseFloat(timeString, 64)
if err != nil {
caterrs.WriteHTTPBadRequest(w, "failed to parse time", nil)
return
}

playbackID := params.ByName("playbackID")
if playbackID == "" {
caterrs.WriteHTTPBadRequest(w, "playbackID was empty", nil)
return
}

Check warning on line 43 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L41-L43

Added lines #L41 - L43 were not covered by tests

err = p.handle(w, playbackID, time)
if err != nil {
log.LogNoRequestID("image API error", "err", err)
switch {
case errors.Is(err, caterrs.ObjectNotFoundError):
caterrs.WriteHTTPNotFound(w, "not found", nil)
default:
caterrs.WriteHTTPInternalServerError(w, "internal server error", nil)

Check warning on line 52 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}
}
}

func (p *ImageHandler) handle(w http.ResponseWriter, playbackID string, time float64) error {
var (
err error
segmentFile string
dir = playbackID
)

fileInfoReader, err := playback.OsFetch(p.PublicBucketURLs, dir, "index.m3u8", "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

// download master playlist
manifest, _, err := m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 73 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L72-L73

Added lines #L72 - L73 were not covered by tests
masterPlaylist, ok := manifest.(*m3u8.MasterPlaylist)
if !ok || masterPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MasterPlaylist")
}

Check warning on line 77 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L76-L77

Added lines #L76 - L77 were not covered by tests
if len(masterPlaylist.Variants) < 1 {
return fmt.Errorf("no renditions found")
}

Check warning on line 80 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L79-L80

Added lines #L79 - L80 were not covered by tests

renditionUri := masterPlaylist.Variants[0].URI
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, renditionUri, "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

Check warning on line 86 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L85-L86

Added lines #L85 - L86 were not covered by tests
dir = filepath.Join(dir, filepath.Dir(renditionUri))
manifest, _, err = m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return fmt.Errorf("failed to decode manifest: %w", err)
}

Check warning on line 91 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L90-L91

Added lines #L90 - L91 were not covered by tests
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

Check warning on line 95 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L94-L95

Added lines #L94 - L95 were not covered by tests

// find the segment required
currentTime := 0.0
extractTime := 0.0
for _, segment := range mediaPlaylist.GetAllSegments() {
currentTime += segment.Duration
if currentTime > time {
segmentFile = segment.URI
extractTime = time - currentTime + segment.Duration
break
}
}
if segmentFile == "" {
return fmt.Errorf("playbackID media not found: %w", caterrs.ObjectNotFoundError)
}

tmpDir, err := os.MkdirTemp(os.TempDir(), "image-api-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}

Check warning on line 115 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L114-L115

Added lines #L114 - L115 were not covered by tests
defer os.RemoveAll(tmpDir)

// download the segment
fileInfoReader, err = playback.OsFetch(p.PublicBucketURLs, dir, segmentFile, "")
if err != nil {
return fmt.Errorf("failed to get media: %w", err)
}

Check warning on line 122 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L121-L122

Added lines #L121 - L122 were not covered by tests
segBytes, err := io.ReadAll(fileInfoReader.Body)
if err != nil {
return fmt.Errorf("failed to get bytes: %w", err)
}

Check warning on line 126 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L125-L126

Added lines #L125 - L126 were not covered by tests

inputFile := path.Join(tmpDir, "in.ts")
if err = os.WriteFile(inputFile, segBytes, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

Check warning on line 131 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L130-L131

Added lines #L130 - L131 were not covered by tests
outputFile := path.Join(tmpDir, "out.jpg")

// TODO disable ffmpeg verbose output
err = ffmpeg.
Input(inputFile).
Output(
outputFile,
ffmpeg.KwArgs{
"ss": fmt.Sprintf("00:00:%d", int64(extractTime)),
"vframes": "1",
// TODO change resolution to a queryparam
"vf": "scale=320:240:force_original_aspect_ratio=decrease",
},
).OverWriteOutput().ErrorToStdOut().Run()
if err != nil {
return fmt.Errorf("ffmpeg failed: %w", err)
}

Check warning on line 148 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L147-L148

Added lines #L147 - L148 were not covered by tests

bs, err := os.ReadFile(outputFile)
if err != nil {
return err
}

Check warning on line 153 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L152-L153

Added lines #L152 - L153 were not covered by tests

w.WriteHeader(http.StatusOK)
count, err := w.Write(bs)
if err != nil {
log.LogNoRequestID("image handler failed to write response", "err", err)

Check warning on line 158 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L158

Added line #L158 was not covered by tests
} else {
log.LogNoRequestID("image handler wrote response", "count", count)
}
return nil
}
98 changes: 98 additions & 0 deletions handlers/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package handlers

import (
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"
"gopkg.in/vansante/go-ffprobe.v2"
)

func TestImageHandler_Handle(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
handler := &ImageHandler{
PublicBucketURLs: []*url.URL{{Scheme: "file", Path: wd + "/../test"}},
}
tests := []struct {
name string
time string
playbackID string
expectedStatus int
}{
{
name: "first segment",
time: "5",
},
{
name: "second segment",
time: "21",
},
{
name: "final segment",
time: "29",
},
{
name: "out of bounds",
time: "30",
expectedStatus: http.StatusNotFound,
},
{
name: "invalid time",
time: "",
expectedStatus: http.StatusBadRequest,
},
{
name: "playbackID not found",
time: "29",
playbackID: "foo",
expectedStatus: http.StatusNotFound,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "?time="+tt.time, nil)
require.NoError(t, err)

if tt.playbackID == "" {
tt.playbackID = "fixtures" // just use the fixtures directory for testing
}
handler.Handle(w, req, []httprouter.Param{{
Key: "playbackID",
Value: tt.playbackID,
}})
resp := w.Result()
if tt.expectedStatus == 0 {
tt.expectedStatus = 200
}
require.Equal(t, tt.expectedStatus, resp.StatusCode)

if tt.expectedStatus != 200 {
return
}
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

outfile, err := os.CreateTemp(os.TempDir(), "out*.jpg")
require.NoError(t, err)
defer os.Remove(outfile.Name())
_, err = outfile.Write(respBytes)
require.NoError(t, err)
log.Println(outfile.Name())
probeData, err := ffprobe.ProbeURL(context.Background(), outfile.Name())
require.NoError(t, err)
require.Equal(t, "image2", probeData.Format.FormatName)
require.Len(t, probeData.Streams, 1)
require.Greater(t, probeData.Streams[0].Width, 0)
require.Greater(t, probeData.Streams[0].Height, 0)
})
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLSliceVarFlag(fs, &cli.PrivateBucketURLs, "private-bucket", "", "URL for the private media bucket")
config.URLSliceVarFlag(fs, &cli.PublicBucketURLs, "public-bucket", "", "URL for the public media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CatalystAPIMetrics struct {
SerfEventBufferSize prometheus.Gauge
AccessControlRequestCount *prometheus.CounterVec
AccessControlRequestDurationSec *prometheus.SummaryVec
ImageRequestDurationSec *prometheus.SummaryVec // TODO

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down
7 changes: 6 additions & 1 deletion playback/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/url"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -103,6 +104,10 @@ func appendAccessKey(uri, gatingParam, gatingParamName string) (string, error) {
}

func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
return OsFetch(buckets, filepath.Join("hls", playbackID), file, byteRange)
}

func OsFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.FileInfoReader, error) {
if len(buckets) < 1 {
return nil, errors.New("playback failed, no private buckets configured")
}
Expand All @@ -113,7 +118,7 @@ func osFetch(buckets []*url.URL, playbackID, file, byteRange string) (*drivers.F
)
// try all private buckets until object is found or return error
for _, bucket := range buckets {
osURL := bucket.JoinPath("hls").JoinPath(playbackID).JoinPath(file)
osURL := bucket.JoinPath(playbackID).JoinPath(file)
f, err = clients.GetOSURL(osURL.String(), byteRange)
if err == nil {
// object found successfully so return early
Expand Down
4 changes: 4 additions & 0 deletions test/fixtures/index.m3u8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:PROGRAM-ID=0,BANDWIDTH=1036798,RESOLUTION=1280x720,NAME="0-720p0"
tiny.m3u8

0 comments on commit 2596867

Please sign in to comment.