Skip to content

Commit

Permalink
Use errgroup to write segments to disk in background
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 9, 2024
1 parent 1ba6bd7 commit e211fe0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 226 deletions.
125 changes: 57 additions & 68 deletions transcode/transcode.go
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,11 +19,12 @@ import (
"github.com/livepeer/catalyst-api/log"
"github.com/livepeer/catalyst-api/metrics"
"github.com/livepeer/catalyst-api/video"
"golang.org/x/sync/errgroup"
)

const (
UploadTimeout = 5 * time.Minute
SegmentChannelSize = 10
UploadTimeout = 5 * time.Minute
SegmentWritersCount = 10
)

type TranscodeSegmentRequest struct {
Expand Down Expand Up @@ -139,29 +141,32 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}
}
renditionList.AddRenditionSegment(maxProfile.Name,
&video.TSegmentList{
SegmentDataTable: make(map[int][]byte),
})
&video.TSegmentList{})

Check warning on line 144 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L144

Added line #L144 was not covered by tests
} else {
for _, profile := range transcodeProfiles {
renditionList.AddRenditionSegment(profile.Name,
&video.TSegmentList{
SegmentDataTable: make(map[int][]byte),
})
&video.TSegmentList{})

Check warning on line 148 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L148

Added line #L148 was not covered by tests
}
}
}

// Create a buffered channel where transcoded segments are sent to be written to disk
segmentChannel := make(chan video.TranscodedSegmentInfo, SegmentChannelSize)

// Create a waitgroup to synchronize when the disk writing goroutine finishes
var wg sync.WaitGroup
var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

Check warning on line 162 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L155-L162

Added lines #L155 - L162 were not covered by tests
}
segFileWriter := newSegmentFileWriter(TransmuxStorageDir)

// Setup parallel transcode sessions
var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segFileWriter)
segmentsCount++
if err != nil {
return err
Expand All @@ -174,57 +179,17 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return nil
})

var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

// Start the disk-writing (consumer) goroutine
wg.Add(1)
go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) {
var segmentBatch []video.TranscodedSegmentInfo
defer wg.Done()

// Keep checking for new segments in the buffered channel
for segInfo := range segmentChannel {
segmentBatch = append(segmentBatch, segInfo)
// Begin writing to disk if at-least 50% of buffered channel is full
if len(segmentBatch) >= SegmentChannelSize/2 {
err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
if err != nil {
return
}
segmentBatch = nil
}
}
// Handle any remaining segments after the channel is closed
if len(segmentBatch) > 0 {
err := video.WriteSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
if err != nil {
return
}
}
}(TransmuxStorageDir, &renditionList)
}

// Start the transcoding (producer) goroutines
jobs.Start()
if err = jobs.Wait(); err != nil {
// return first error to caller
return outputs, segmentsCount, err
}

// If the disk-writing gorouine was started, then close the segment channel to
// signal that no more segments will be sent. This will be a no-op if MP4s are not requested.
close(segmentChannel)
// Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested.
wg.Wait()
if err = segFileWriter.wait(); err != nil {
return outputs, segmentsCount, fmt.Errorf("error writing segments to disk: %w", err)
}

Check warning on line 192 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L191-L192

Added lines #L191 - L192 were not covered by tests

// Build the manifests and push them to storage
manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip)
Expand Down Expand Up @@ -416,6 +381,37 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return outputs, segmentsCount, nil
}

type segmentFileWriter struct {
group errgroup.Group
transmuxTopLevelDir string
}

func newSegmentFileWriter(transmuxTopLevelDir string) *segmentFileWriter {
s := &segmentFileWriter{transmuxTopLevelDir: transmuxTopLevelDir}
s.group.SetLimit(SegmentWritersCount)
return s
}

func (s *segmentFileWriter) wait() error {
return s.group.Wait()
}

func (s *segmentFileWriter) writeSegment(requestID string, renditionName string, segIndex int, segmentData []byte) {
s.group.Go(func() error {
segmentFilename := filepath.Join(s.transmuxTopLevelDir, requestID+"_"+renditionName+"_"+strconv.Itoa(segIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return fmt.Errorf("error writing segment err: %w", err)
}
return nil

Check warning on line 411 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L399-L411

Added lines #L399 - L411 were not covered by tests
})
}

func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) {
var mp4OutputsPre []video.OutputVideoFile
// e. Upload all mp4 related output files
Expand Down Expand Up @@ -489,7 +485,7 @@ func transcodeSegment(
transcodedStats []*video.RenditionStats,
renditionList *video.TRenditionList,
broadcaster clients.BroadcasterClient,
segmentChannel chan<- video.TranscodedSegmentInfo,
segFilewriter *segmentFileWriter,
) error {
start := time.Now()

Expand Down Expand Up @@ -569,20 +565,13 @@ func transcodeSegment(
// get inner segments table from outer rendition table
segmentsList := renditionList.GetSegmentList(transcodedSegment.Name)
if segmentsList != nil {
// add new entry for segment # and corresponding byte stream if the profile
// add new entry for segment # if the profile

Check warning on line 568 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L568

Added line #L568 was not covered by tests
// exists in the renditionList which contains only profiles for which mp4s will
// be generated i.e. all profiles for mp4 inputs and only highest quality
// rendition for hls inputs like recordings.
segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData)

// send this transcoded segment to the segment channel so that it can be written
// to disk in parallel
segmentChannel <- video.TranscodedSegmentInfo{
RequestID: transcodeRequest.RequestID,
RenditionName: transcodedSegment.Name, // Use actual rendition name
SegmentIndex: segment.Index, // Use actual segment index
}
segmentsList.AddSegment(segment.Index)

Check warning on line 572 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L572

Added line #L572 was not covered by tests

segFilewriter.writeSegment(transcodeRequest.RequestID, transcodedSegment.Name, segment.Index, transcodedSegment.MediaData)

Check warning on line 574 in transcode/transcode.go

View check run for this annotation

Codecov / codecov/patch

transcode/transcode.go#L574

Added line #L574 was not covered by tests
}
}

Expand Down
64 changes: 7 additions & 57 deletions video/media.go
@@ -1,11 +1,7 @@
package video

import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
)

Expand Down Expand Up @@ -33,36 +29,19 @@ import (
*/

type TSegmentList struct {
mu sync.Mutex
SegmentDataTable map[int][]byte
mu sync.Mutex
SegmentList []int
}

func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = data
s.mu.Unlock()
}

func (s *TSegmentList) RemoveSegmentData(segIdx int) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = []byte{}
s.mu.Unlock()
}

func (s *TSegmentList) GetSegment(segIdx int) []byte {
func (s *TSegmentList) AddSegment(segIdx int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.SegmentDataTable[segIdx]
s.SegmentList = append(s.SegmentList, segIdx)
}

func (s *TSegmentList) GetSortedSegments() []int {
segmentsTable := s.SegmentDataTable
segments := make([]int, 0, len(segmentsTable))
for k := range segmentsTable {
segments = append(segments, k)
}
sort.Ints(segments)
return segments
sort.Ints(s.SegmentList)
return s.SegmentList
}

type TRenditionList struct {
Expand All @@ -72,8 +51,8 @@ type TRenditionList struct {

func (r *TRenditionList) AddRenditionSegment(rendName string, sList *TSegmentList) {
r.mu.Lock()
defer r.mu.Unlock()
r.RenditionSegmentTable[rendName] = sList
r.mu.Unlock()
}

func (r *TRenditionList) GetSegmentList(rendName string) *TSegmentList {
Expand All @@ -92,32 +71,3 @@ type RenditionStats struct {
ManifestLocation string
BitsPerSecond uint32
}

type TranscodedSegmentInfo struct {
RequestID string
RenditionName string
SegmentIndex int
}

func WriteSegmentsToDisk(transmuxTopLevelDir string, renditionList *TRenditionList, segmentBatch []TranscodedSegmentInfo) error {
for _, segInfo := range segmentBatch {

// All accesses to renditionList and segmentList is protected by a mutex behind the scenes
segmentList := renditionList.GetSegmentList(segInfo.RenditionName)
segmentData := segmentList.GetSegment(segInfo.SegmentIndex)
segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return fmt.Errorf("error writing segment err: %w", err)
}
// "Delete" buffered segment data from memory in hopes the garbage-collector releases it
segmentList.RemoveSegmentData(segInfo.SegmentIndex)

}
return nil
}

0 comments on commit e211fe0

Please sign in to comment.