Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert: thumbnail refactor #1221

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 5 additions & 16 deletions handlers/ffmpeg/ffmpeg.go
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/pipeline"
"github.com/livepeer/catalyst-api/thumbnails"
)

type HandlersCollection struct {
Expand Down Expand Up @@ -44,12 +43,6 @@ func (h *HandlersCollection) NewFile() httprouter.Handle {
err error
)
reg := regexp.MustCompile(`[^/]+.m3u8$`)
// job.SegmentingTargetURL comes in the format the Mist wants, looking like:
// protocol://abc@123:s3.com/a/b/c/<something>.m3u8
// but since this endpoint receives both .ts segments and m3u8 updates, we strip off the filename
// and pass the one ffmpeg gives us to UploadToOSURL instead
targetURLBase := reg.ReplaceAllString(job.SegmentingTargetURL, "")

if reg.MatchString(filename) {
// ensure that playlist type in the manifest is set to vod
buf := bytes.Buffer{}
Expand Down Expand Up @@ -84,16 +77,12 @@ func (h *HandlersCollection) NewFile() httprouter.Handle {
errors.WriteHTTPInternalServerError(w, "Error reading body", err)
return
}

go func() {
if job.ThumbnailsTargetURL == nil {
return
}
if err := thumbnails.GenerateThumb(filename, content, job.ThumbnailsTargetURL); err != nil {
log.LogError(job.RequestID, "generate thumb failed", err, "in", path.Join(targetURLBase, filename), "out", job.ThumbnailsTargetURL)
}
}()
}
// job.SegmentingTargetURL comes in the format the Mist wants, looking like:
// protocol://abc@123:s3.com/a/b/c/<something>.m3u8
// but since this endpoint receives both .ts segments and m3u8 updates, we strip off the filename
// and pass the one ffmpeg gives us to UploadToOSURL instead
targetURLBase := reg.ReplaceAllString(job.SegmentingTargetURL, "")

if err := backoff.Retry(func() error {
err := clients.UploadToOSURL(targetURLBase, filename, bytes.NewReader(content), config.SEGMENT_WRITE_TIMEOUT)
Expand Down
10 changes: 6 additions & 4 deletions pipeline/coordinator.go
Expand Up @@ -40,10 +40,11 @@ const (
// Execute the FFMPEG pipeline first and fallback to the external transcoding
// provider on errors.
StrategyFallbackExternal Strategy = "fallback_external"
)

const (
// Only mp4s of maxMP4OutDuration will have MP4s generated for each rendition
maxMP4OutDuration = 2 * time.Minute
maxRecordingMP4Duration = 12 * time.Hour
maxRecordingThumbsDuration = maxRecordingMP4Duration
maxMP4OutDuration = 2 * time.Minute
)

func (s Strategy) IsValid() bool {
Expand Down Expand Up @@ -383,7 +384,8 @@ func ShouldGenerateMP4(sourceURL, mp4TargetUrl *url.URL, fragMp4TargetUrl *url.U
return false
}
// We're currently memory-bound for generating MP4s above a certain file size
// This has been hitting us for long recordings, so do a crude "is it longer than 12 hours?" check and skip the MP4 if it is
// This has been hitting us for long recordings, so do a crude "is it longer than 3 hours?" check and skip the MP4 if it is
const maxRecordingMP4Duration = 12 * time.Hour
if clients.IsHLSInput(sourceURL) && durationSecs > maxRecordingMP4Duration.Seconds() {
return false
}
Expand Down
26 changes: 12 additions & 14 deletions pipeline/ffmpeg.go
Expand Up @@ -71,26 +71,24 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
}
} else {
job.SegmentingTargetURL = job.SourceFile
}
job.SegmentingDone = time.Now()
if job.HlsTargetURL != nil {
f.sendSourcePlayback(job)
}
job.ReportProgress(clients.TranscodeStatusPreparingCompleted, 1)

// don't generate thumbs for very long recordings since it involves downloading segments
if job.InputFileInfo.Duration <= 0 || job.InputFileInfo.Duration > maxRecordingThumbsDuration.Seconds() {
job.ThumbnailsTargetURL = nil
}
if job.ThumbnailsTargetURL != nil {
go func() {
if job.ThumbnailsTargetURL == nil {
return
}
err := thumbnails.GenerateThumbsFromManifest(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
log.Log(job.RequestID, "generating thumbs VTT")
err := thumbnails.GenerateThumbs(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
if err != nil {
log.LogError(job.RequestID, "generate thumbs failed", err, "in", job.SegmentingTargetURL, "out", job.ThumbnailsTargetURL)
} else {
log.Log(job.RequestID, "generate thumbs succeeded", "in", job.SegmentingTargetURL, "out", job.ThumbnailsTargetURL)
}
}()
}
job.SegmentingDone = time.Now()
if job.HlsTargetURL != nil {
f.sendSourcePlayback(job)
}
job.ReportProgress(clients.TranscodeStatusPreparingCompleted, 1)

// Transcode Beginning
log.Log(job.RequestID, "Beginning transcoding via FFMPEG/Livepeer pipeline")
Expand Down Expand Up @@ -170,7 +168,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {

// wait for thumbs background process
if job.ThumbnailsTargetURL != nil {
err := thumbnails.GenerateThumbsVTT(job.RequestID, job.SegmentingTargetURL, job.ThumbnailsTargetURL)
err := thumbnails.WaitForThumbs(job.RequestID, job.ThumbnailsTargetURL)
if err != nil {
log.LogError(job.RequestID, "waiting for thumbs failed", err, "out", job.ThumbnailsTargetURL)
} else {
Expand Down
199 changes: 65 additions & 134 deletions thumbnails/thumbnails.go
Expand Up @@ -8,9 +8,6 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -25,180 +22,115 @@ const resolution = "854:480"
const vttFilename = "thumbnails.vtt"
const outputDir = "thumbnails"

// Wait a maximum of 5 mins for thumbnails to finish
var thumbWaitBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)

func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) {
var (
rc io.ReadCloser
err error
)
func GenerateThumbs(requestID, input string, output *url.URL) error {
inputURL, err := url.Parse(input)
if err != nil {
return err
}
// download and parse the manifest
var rc io.ReadCloser
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, input, nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
return fmt.Errorf("error downloading manifest: %w", err)
}
manifest, playlistType, err := m3u8.DecodeFrom(rc, true)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
return fmt.Errorf("failed to decode manifest: %w", err)
}

if playlistType != m3u8.MEDIA {
return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
return fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported")
}
mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist)
if !ok || mediaPlaylist == nil {
return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist")
return fmt.Errorf("failed to parse playlist as MediaPlaylist")
}
return mediaPlaylist, nil
}

func GenerateThumbsVTT(requestID string, input string, output *url.URL) error {
// download and parse the manifest
mediaPlaylist, err := getMediaManifest(requestID, input)
tempDir, err := os.MkdirTemp(os.TempDir(), "thumbs-*")
if err != nil {
return err
return fmt.Errorf("failed to make temp dir: %w", err)
}
defer os.RemoveAll(tempDir)

const layout = "15:04:05.000"
outputLocation := output.JoinPath(outputDir)
outputLocation := output.JoinPath(outputDir).String()
builder := &bytes.Buffer{}
_, err = builder.WriteString("WEBVTT\n")
if err != nil {
return err
}

var currentTime time.Time
// loop through each segment, generate a vtt entry for it
for _, segment := range mediaPlaylist.GetAllSegments() {
filename, err := thumbFilename(path.Base(segment.URI))
var (
currentTime time.Time
segments = mediaPlaylist.GetAllSegments()
thumbOuts = make([]string, len(segments))
)
// loop through each segment, generate a thumbnail image and upload it to storage
for i, segment := range segments {
thumbOut, err := processSegment(inputURL, segment, tempDir, outputLocation)
if err != nil {
return err
}
// check thumbnail file exists on storage
err = backoff.Retry(func() error {
_, err := clients.GetFile(context.Background(), requestID, outputLocation.JoinPath(filename).String(), nil)
return err
}, thumbWaitBackoff)
if err != nil {
return fmt.Errorf("failed to find thumb %s: %w", filename, err)
}
thumbOuts[i] = thumbOut

start := currentTime.Format(layout)
currentTime = currentTime.Add(time.Duration(segment.Duration) * time.Second)
end := currentTime.Format(layout)
_, err = builder.WriteString(fmt.Sprintf("%s --> %s\n%s\n\n", start, end, filename))
_, err = builder.WriteString(fmt.Sprintf("%s --> %s\n%s\n\n", start, end, path.Base(thumbOut)))
if err != nil {
return err
}
}

// upload VTT file
vttContent := builder.Bytes()
err = backoff.Retry(func() error {
return clients.UploadToOSURLFields(outputLocation.String(), vttFilename, bytes.NewReader(vttContent), time.Minute, &drivers.FileProperties{ContentType: "text/vtt"})
}, clients.UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload vtt: %w", err)
}
return nil
}

func GenerateThumb(segmentURI string, input []byte, output *url.URL) error {
tempDir, err := os.MkdirTemp(os.TempDir(), "thumbs-*")
if err != nil {
return fmt.Errorf("failed to make temp dir: %w", err)
}
defer os.RemoveAll(tempDir)
outputLocation := output.JoinPath(outputDir)

inFilename := filepath.Join(tempDir, segmentURI)
if err := os.WriteFile(inFilename, input, 0644); err != nil {
return err
// parallelise the thumb uploads
uploadGroup, _ := errgroup.WithContext(context.Background())
uploadGroup.SetLimit(5)
for _, thumbOut := range thumbOuts {
thumbOut := thumbOut
uploadGroup.Go(func() error {
return backoff.Retry(func() error {
// upload thumbnail to storage
fileReader, err := os.Open(thumbOut)
if err != nil {
return err
}
defer fileReader.Close()
err = clients.UploadToOSURL(outputLocation, path.Base(thumbOut), fileReader, 2*time.Minute)
if err != nil {
return fmt.Errorf("failed to upload thumbnail %s: %w", thumbOut, err)
}
return nil
}, clients.UploadRetryBackoff())
})
}

filename, err := thumbFilename(segmentURI)
err = uploadGroup.Wait()
if err != nil {
return err
}

thumbOut := path.Join(tempDir, filename)
if err := processSegment(inFilename, thumbOut); err != nil {
return err
}

err = backoff.Retry(func() error {
// upload thumbnail to storage
fileReader, err := os.Open(thumbOut)
if err != nil {
return err
}
defer fileReader.Close()
err = clients.UploadToOSURL(outputLocation.String(), path.Base(thumbOut), fileReader, 2*time.Minute)
if err != nil {
return fmt.Errorf("failed to upload thumbnail %s: %w", thumbOut, err)
}
return nil
}, clients.UploadRetryBackoff())
err = clients.UploadToOSURLFields(outputLocation, vttFilename, builder, time.Minute, &drivers.FileProperties{ContentType: "text/vtt"})
if err != nil {
return err
return fmt.Errorf("failed to upload vtt: %w", err)
}

return nil
}

func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error {
// parse manifest and generate one thumbnail per segment
mediaPlaylist, err := getMediaManifest(requestID, input)
func processSegment(inputURL *url.URL, segment *m3u8.MediaSegment, tempDir string, outputLocation string) (string, error) {
segURL := inputURL.JoinPath("..", segment.URI)
signed, err := clients.SignURL(segURL)
if err != nil {
return err
return "", fmt.Errorf("error signing segment url %s: %w", segURL, err)
}
inputURL, err := url.Parse(input)
if err != nil {
return err
}

// parallelise the thumb uploads
uploadGroup, _ := errgroup.WithContext(context.Background())
uploadGroup.SetLimit(5)
for _, segment := range mediaPlaylist.GetAllSegments() {
segment := segment
uploadGroup.Go(func() error {
segURL := inputURL.JoinPath("..", segment.URI)
var (
rc io.ReadCloser
err error
)
// save the segment to memory
err = backoff.Retry(func() error {
rc, err = clients.GetFile(context.Background(), requestID, segURL.String(), nil)
return err
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error downloading manifest: %w", err)
}
bs, err := io.ReadAll(rc)
if err != nil {
return err
}

// generate thumbnail for the segment
return GenerateThumb(path.Base(segment.URI), bs, output)
})
}
return uploadGroup.Wait()
}

func processSegment(input string, thumbOut string) error {
// generate thumbnail
var ffmpegErr bytes.Buffer

err := backoff.Retry(func() error {
thumbOut := path.Join(tempDir, fmt.Sprintf("keyframes_%d.jpg", segment.SeqId))
err = backoff.Retry(func() error {
ffmpegErr = bytes.Buffer{}
return ffmpeg.
Input(input, ffmpeg.KwArgs{"skip_frame": "nokey"}). // only extract key frames
Input(signed, ffmpeg.KwArgs{"skip_frame": "nokey"}). // only extract key frames
Output(
thumbOut,
ffmpeg.KwArgs{
Expand All @@ -210,20 +142,19 @@ func processSegment(input string, thumbOut string) error {
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()
}, clients.DownloadRetryBackoff())
if err != nil {
return fmt.Errorf("error running ffmpeg for thumbnails %s [%s]: %w", input, ffmpegErr.String(), err)
return "", fmt.Errorf("error running ffmpeg for thumbnails %s [%s]: %w", segURL, ffmpegErr.String(), err)
}

return nil
return thumbOut, nil
}

var segmentPrefix = "index"
// Wait a maximum of 5 mins for thumbnails to finish
var vttBackoff = backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10)

func thumbFilename(segmentURI string) (string, error) {
// segmentURI will be index%d.ts
index := strings.TrimSuffix(strings.TrimPrefix(segmentURI, segmentPrefix), ".ts")
i, err := strconv.ParseInt(index, 10, 32)
if err != nil {
return "", fmt.Errorf("thumbFilename failed for %s: %w", segmentURI, err)
}
return fmt.Sprintf("keyframes_%d.jpg", i), nil
func WaitForThumbs(requestID string, output *url.URL) error {
vtt := output.JoinPath(outputDir, vttFilename).String()
return backoff.Retry(func() error {
_, err := clients.GetFile(context.Background(), requestID, vtt, nil)
return err
}, vttBackoff)
}