Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use errgroup to write segments to disk in background #998

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
125 changes: 57 additions & 68 deletions transcode/transcode.go
Expand Up @@ -8,6 +8,7 @@
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,11 +19,12 @@
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/metrics"
"github.com/livepeer/catalyst-api/video"
"golang.org/x/sync/errgroup"
)

const (
UploadTimeout = 5 * time.Minute
SegmentChannelSize = 10
UploadTimeout = 5 * time.Minute
SegmentWritersCount = 10
)

type TranscodeSegmentRequest struct {
Expand Down Expand Up @@ -139,29 +141,32 @@
}
}
renditionList.AddRenditionSegment(maxProfile.Name,
&video.TSegmentList{
SegmentDataTable: make(map[int][]byte),
})
&video.TSegmentList{})

Check warning on line 144 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L144

Added line #L144 was not covered by tests
} else {
for _, profile := range transcodeProfiles {
renditionList.AddRenditionSegment(profile.Name,
&video.TSegmentList{
SegmentDataTable: make(map[int][]byte),
})
&video.TSegmentList{})

Check warning on line 148 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L148

Added line #L148 was not covered by tests
}
}
}

// Create a buffered channel where transcoded segments are sent to be written to disk
segmentChannel := make(chan video.TranscodedSegmentInfo, SegmentChannelSize)

// Create a waitgroup to synchronize when the disk writing goroutine finishes
var wg sync.WaitGroup
var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

Check warning on line 162 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L155-L162

Added lines #L155 - L162 were not covered by tests
}
segFileWriter := newSegmentFileWriter(TransmuxStorageDir)

// Setup parallel transcode sessions
var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segFileWriter)
segmentsCount++
if err != nil {
return err
Expand All @@ -174,57 +179,17 @@
return nil
})

var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

// Start the disk-writing (consumer) goroutine
wg.Add(1)
go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) {
var segmentBatch []video.TranscodedSegmentInfo
defer wg.Done()

// Keep checking for new segments in the buffered channel
for segInfo := range segmentChannel {
segmentBatch = append(segmentBatch, segInfo)
// Begin writing to disk if at-least 50% of buffered channel is full
if len(segmentBatch) >= SegmentChannelSize/2 {
err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
if err != nil {
return
}
segmentBatch = nil
}
}
// Handle any remaining segments after the channel is closed
if len(segmentBatch) > 0 {
err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
if err != nil {
return
}
}
}(TransmuxStorageDir, &renditionList)
}

// Start the transcoding (producer) goroutines
jobs.Start()
if err = jobs.Wait(); err != nil {
// return first error to caller
return outputs, segmentsCount, err
}

// If the disk-writing gorouine was started, then close the segment channel to
// signal that no more segments will be sent. This will be a no-op if MP4s are not requested.
close(segmentChannel)
// Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested.
wg.Wait()
if err = segFileWriter.wait(); err != nil {
return outputs, segmentsCount, fmt.Errorf("error writing segments to disk: %w", err)
}

Check warning on line 192 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L191-L192

Added lines #L191 - L192 were not covered by tests

// Build the manifests and push them to storage
manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip)
Expand Down Expand Up @@ -416,6 +381,37 @@
return outputs, segmentsCount, nil
}

type segmentFileWriter struct {
group errgroup.Group
transmuxTopLevelDir string
}

func newSegmentFileWriter(transmuxTopLevelDir string) *segmentFileWriter {
s := &segmentFileWriter{transmuxTopLevelDir: transmuxTopLevelDir}
s.group.SetLimit(SegmentWritersCount)
return s
}

func (s *segmentFileWriter) wait() error {
return s.group.Wait()
}

func (s *segmentFileWriter) writeSegment(requestID string, renditionName string, segIndex int, segmentData []byte) {
s.group.Go(func() error {
segmentFilename := filepath.Join(s.transmuxTopLevelDir, requestID+"_"+renditionName+"_"+strconv.Itoa(segIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return fmt.Errorf("error writing segment err: %w", err)
}
return nil

Check warning on line 411 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L399-L411

Added lines #L399 - L411 were not covered by tests
})
}

func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) {
var mp4OutputsPre []video.OutputVideoFile
// e. Upload all mp4 related output files
Expand Down Expand Up @@ -489,7 +485,7 @@
transcodedStats []*video.RenditionStats,
renditionList *video.TRenditionList,
broadcaster clients.BroadcasterClient,
segmentChannel chan<- video.TranscodedSegmentInfo,
segFilewriter *segmentFileWriter,
) error {
start := time.Now()

Expand Down Expand Up @@ -569,20 +565,13 @@
// get inner segments table from outer rendition table
segmentsList := renditionList.GetSegmentList(transcodedSegment.Name)
if segmentsList != nil {
// add new entry for segment # and corresponding byte stream if the profile
// add new entry for segment # if the profile

Check warning on line 568 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L568

Added line #L568 was not covered by tests
// exists in the renditionList which contains only profiles for which mp4s will
// be generated i.e. all profiles for mp4 inputs and only highest quality
// rendition for hls inputs like recordings.
segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData)

// send this transcoded segment to the segment channel so that it can be written
// to disk in parallel
segmentChannel <- video.TranscodedSegmentInfo{
RequestID: transcodeRequest.RequestID,
RenditionName: transcodedSegment.Name, // Use actual rendition name
SegmentIndex: segment.Index, // Use actual segment index
}
segmentsList.AddSegment(segment.Index)

Check warning on line 572 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L572

Added line #L572 was not covered by tests

segFilewriter.writeSegment(transcodeRequest.RequestID, transcodedSegment.Name, segment.Index, transcodedSegment.MediaData)

Check warning on line 574 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L574

Added line #L574 was not covered by tests
}
}

Expand Down
64 changes: 7 additions & 57 deletions video/media.go
@@ -1,11 +1,7 @@
package video

import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
)

Expand Down Expand Up @@ -33,36 +29,19 @@ import (
*/

type TSegmentList struct {
mu sync.Mutex
SegmentDataTable map[int][]byte
mu sync.Mutex
SegmentList []int
}

func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = data
s.mu.Unlock()
}

func (s *TSegmentList) RemoveSegmentData(segIdx int) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = []byte{}
s.mu.Unlock()
}

func (s *TSegmentList) GetSegment(segIdx int) []byte {
func (s *TSegmentList) AddSegment(segIdx int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.SegmentDataTable[segIdx]
s.SegmentList = append(s.SegmentList, segIdx)
}

func (s *TSegmentList) GetSortedSegments() []int {
segmentsTable := s.SegmentDataTable
segments := make([]int, 0, len(segmentsTable))
for k := range segmentsTable {
segments = append(segments, k)
}
sort.Ints(segments)
return segments
sort.Ints(s.SegmentList)
return s.SegmentList
}

type TRenditionList struct {
Expand All @@ -72,8 +51,8 @@ type TRenditionList struct {

func (r *TRenditionList) AddRenditionSegment(rendName string, sList *TSegmentList) {
r.mu.Lock()
defer r.mu.Unlock()
r.RenditionSegmentTable[rendName] = sList
r.mu.Unlock()
}

func (r *TRenditionList) GetSegmentList(rendName string) *TSegmentList {
Expand All @@ -92,32 +71,3 @@ type RenditionStats struct {
ManifestLocation string
BitsPerSecond uint32
}

type TranscodedSegmentInfo struct {
RequestID string
RenditionName string
SegmentIndex int
}

func WriteSegmentsToDisk(transmuxTopLevelDir string, renditionList *TRenditionList, segmentBatch []TranscodedSegmentInfo) error {
for _, segInfo := range segmentBatch {

// All accesses to renditionList and segmentList is protected by a mutex behind the scenes
segmentList := renditionList.GetSegmentList(segInfo.RenditionName)
segmentData := segmentList.GetSegment(segInfo.SegmentIndex)
segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return fmt.Errorf("error writing segment err: %w", err)
}
// "Delete" buffered segment data from memory in hopes the garbage-collector releases it
segmentList.RemoveSegmentData(segInfo.SegmentIndex)

}
return nil
}