Skip to content

Commit

Permalink
Image API
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 14, 2024
1 parent 1ba6bd7 commit ffb1cac
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 0 deletions.
6 changes: 6 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
for _, path := range [...]string{"/asset/hls/:playbackID/*file", "/webrtc/:playbackID"} {
router.OPTIONS(path, playback)
}
image := middleware.LogAndMetrics(metrics.Metrics.ImageRequestDurationSec)(
withCORS(
handlers.NewImageHandler(cli.PublicBucketURLs).Handle,
),
)
router.GET("/image/hls/:playbackID/:secs", image)

// Handling incoming playback redirection requests
redirectHandler := withLogging(withCORS(geoHandlers.RedirectHandler()))
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Cli struct {
APIServer string
SourceOutput string
PrivateBucketURLs []*url.URL
PublicBucketURLs []*url.URL
ExternalTranscoder string
VodPipelineStrategy string
MetricsDBConnectionString string
Expand Down
173 changes: 173 additions & 0 deletions handlers/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package handlers

import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/grafov/m3u8"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/clients"
caterrs "github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/go-tools/drivers"
ffmpeg "github.com/u2takey/ffmpeg-go"
)

type ImageHandler struct {
PublicBucketURLs []*url.URL
}

func NewImageHandler(urls []*url.URL) *ImageHandler {
return &ImageHandler{
PublicBucketURLs: urls,
}

Check warning on line 33 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}

func (p *ImageHandler) Handle(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
timeString := req.URL.Query().Get("time")
time, err := strconv.ParseFloat(timeString, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("failed to parse time"))
return
}

Check warning on line 43 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L40-L43

Added lines #L40 - L43 were not covered by tests

playbackID := params.ByName("playbackID")
if playbackID == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("playbackID was empty"))
return
}

Check warning on line 50 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L47-L50

Added lines #L47 - L50 were not covered by tests

err = p.handle(w, playbackID, time)
if err != nil {
log.Println(err)
// TODO convert err to status code
w.WriteHeader(500)
}

Check warning on line 57 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L54-L57

Added lines #L54 - L57 were not covered by tests
}

func (p *ImageHandler) handle(w http.ResponseWriter, playbackID string, time float64) error {
var (
page drivers.PageInfo
err error
manifestFile string
segmentFile string
bucketLocation *url.URL
)
// list the contents of the playbackID folder in storage and check for an HLS manifest
for _, bucket := range p.PublicBucketURLs {
bucketLocation = bucket.JoinPath(playbackID, "/")
page, err = clients.ListOSURL(context.Background(), bucketLocation.String())
if err == nil {
break
}
// if this is the final bucket in the list then the error set here will be used in the final return
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey ||
strings.Contains(err.Error(), "no such file") {
err = fmt.Errorf("invalid request: %w %w", caterrs.ObjectNotFoundError, err)
} else {
err = fmt.Errorf("failed to get file for playback: %w", err)
}

Check warning on line 82 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L76-L82

Added lines #L76 - L82 were not covered by tests
}
if err != nil {
return err
}

Check warning on line 86 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L85-L86

Added lines #L85 - L86 were not covered by tests
for _, file := range page.Files() {
if strings.HasSuffix(file.Name, "m3u8") {
manifestFile = file.Name
break
}
}
if manifestFile == "" {
return fmt.Errorf("playbackID not found: %w", caterrs.ObjectNotFoundError)
}

Check warning on line 95 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L94-L95

Added lines #L94 - L95 were not covered by tests

// find the segment required
fileInfoReader, err := clients.GetOSURL(bucketLocation.JoinPath(manifestFile).String(), "")
if err != nil {
return fmt.Errorf("failed to read manifest: %w", err)
}

Check warning on line 101 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L100-L101

Added lines #L100 - L101 were not covered by tests
manifest, _, err := m3u8.DecodeFrom(fileInfoReader.Body, true)
if err != nil {
return err
}

Check warning on line 105 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L104-L105

Added lines #L104 - L105 were not covered by tests
// TODO check if master playlist

mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}

Check warning on line 111 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L110-L111

Added lines #L110 - L111 were not covered by tests
currentTime := 0.0
for _, segment := range mediaPlaylist.GetAllSegments() {
currentTime += segment.Duration
if currentTime > time {
segmentFile = segment.URI
break
}
}
if segmentFile == "" {
return fmt.Errorf("playbackID media not found: %w", caterrs.ObjectNotFoundError)
}

Check warning on line 122 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L121-L122

Added lines #L121 - L122 were not covered by tests

tmpDir, err := os.MkdirTemp(os.TempDir(), "image-api-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}

Check warning on line 127 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L126-L127

Added lines #L126 - L127 were not covered by tests
defer os.RemoveAll(tmpDir)

// download the segment
fileInfoReader, err = clients.GetOSURL(bucketLocation.JoinPath(segmentFile).String(), "")
if err != nil {
return fmt.Errorf("failed to get media: %w", err)
}

Check warning on line 134 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L133-L134

Added lines #L133 - L134 were not covered by tests
segBytes, err := io.ReadAll(fileInfoReader.Body)
if err != nil {
return fmt.Errorf("failed to get bytes: %w", err)
}

Check warning on line 138 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L137-L138

Added lines #L137 - L138 were not covered by tests

inputFile := path.Join(tmpDir, "in.ts")
if err = os.WriteFile(inputFile, segBytes, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

Check warning on line 143 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L142-L143

Added lines #L142 - L143 were not covered by tests
outputFile := path.Join(tmpDir, "out.jpg")

err = ffmpeg.
Input(inputFile).
Output(
outputFile,
ffmpeg.KwArgs{
"ss": fmt.Sprintf("00:00:0%d", int64(currentTime-time)),
"vframes": "1",
"vf": "scale=320:240:force_original_aspect_ratio=decrease",
},
).OverWriteOutput().ErrorToStdOut().Run()
if err != nil {
return fmt.Errorf("ffmpeg failed: %w", err)
}

Check warning on line 158 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L157-L158

Added lines #L157 - L158 were not covered by tests

bs, err := os.ReadFile(outputFile)
if err != nil {
return err
}

Check warning on line 163 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L162-L163

Added lines #L162 - L163 were not covered by tests

w.WriteHeader(http.StatusOK)
count, err := w.Write(bs)
if err != nil {
log.Println("image handler failed to write response", err)

Check warning on line 168 in handlers/image.go

View check run for this annotation

Codecov / codecov/patch

handlers/image.go#L168

Added line #L168 was not covered by tests
} else {
log.Println("image handler wrote response", count)
}
return nil
}
51 changes: 51 additions & 0 deletions handlers/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package handlers

import (
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/stretchr/testify/require"
)

func TestImageHandler_Handle(t *testing.T) {
wd, err := os.Getwd()
require.NoError(t, err)
handler := &ImageHandler{
PublicBucketURLs: []*url.URL{{Scheme: "file", Path: wd + "/../test"}},
}
tests := []struct {
name string
}{
{
name: "happy",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "?time=5", nil)
require.NoError(t, err)
handler.Handle(w, req, []httprouter.Param{{
Key: "playbackID",
Value: "fixtures", // just use the fixtures directory for testing
}})
resp := w.Result()
log.Println(resp.StatusCode)
respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

outfile, err := os.CreateTemp(os.TempDir(), "out*.jpg")
require.NoError(t, err)
defer os.Remove(outfile.Name())
_, err = outfile.Write(respBytes)
require.NoError(t, err)
log.Println(outfile.Name())
})
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
fs.StringVar(&cli.APIToken, "api-token", "IAmAuthorized", "Auth header value for API access")
fs.StringVar(&cli.SourceOutput, "source-output", "", "URL for the video source segments used if source_segments is not defined in the upload request")
config.URLSliceVarFlag(fs, &cli.PrivateBucketURLs, "private-bucket", "", "URL for the private media bucket")
config.URLSliceVarFlag(fs, &cli.PublicBucketURLs, "public-bucket", "", "URL for the public media bucket")
fs.StringVar(&cli.ExternalTranscoder, "external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket")
fs.StringVar(&cli.VodPipelineStrategy, "vod-pipeline-strategy", string(pipeline.StrategyCatalystFfmpegDominance), "Which strategy to use for the VOD pipeline")
fs.StringVar(&cli.MetricsDBConnectionString, "metrics-db-connection-string", "", "Connection string to use for the metrics Postgres DB. Takes the form: host=X port=X user=X password=X dbname=X")
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type CatalystAPIMetrics struct {
SerfEventBufferSize prometheus.Gauge
AccessControlRequestCount *prometheus.CounterVec
AccessControlRequestDurationSec *prometheus.SummaryVec
ImageRequestDurationSec *prometheus.SummaryVec

JobsInFlight prometheus.Gauge
HTTPRequestsInFlight prometheus.Gauge
Expand Down

0 comments on commit ffb1cac

Please sign in to comment.