Skip to content

Commit

Permalink
Use errgroup to write segment to disk in background
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Dec 4, 2023
1 parent 8414b43 commit a279d65
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 81 deletions.
114 changes: 39 additions & 75 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"golang.org/x/sync/errgroup"
"net/url"
"os"
"path/filepath"
Expand All @@ -22,8 +23,7 @@ import (
)

const (
UploadTimeout = 5 * time.Minute
SegmentChannelSize = 10
UploadTimeout = 5 * time.Minute
)

type TranscodeSegmentRequest struct {
Expand Down Expand Up @@ -54,12 +54,6 @@ type TranscodeSegmentRequest struct {
IsClip bool
}

type TranscodedSegmentInfo struct {
RequestID string
RenditionName string
SegmentIndex int
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) {
log.AddContext(transcodeRequest.RequestID, "source_manifest", transcodeRequest.SourceManifestURL, "stream_name", streamName)
log.Log(transcodeRequest.RequestID, "RunTranscodeProcess (v2) Beginning")
Expand Down Expand Up @@ -155,16 +149,23 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}
}

// Create a buffered channel where transcoded segments are sent to be written to disk
segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize)

// Create a waitgroup to synchronize when the disk writing goroutine finishes
var wg sync.WaitGroup
var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)
}
segFileWriter := newSegmentFileWriter(TransmuxStorageDir)

// Setup parallel transcode sessions
var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segFileWriter)
segmentsCount++
if err != nil {
return err
Expand All @@ -177,51 +178,15 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return nil
})

var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

// Start the disk-writing (consumer) goroutine
wg.Add(1)
go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) {
var segmentBatch []TranscodedSegmentInfo
defer wg.Done()

// Keep checking for new segments in the buffered channel
for segInfo := range segmentChannel {
segmentBatch = append(segmentBatch, segInfo)
// Begin writing to disk if at-least 50% of buffered channel is full
if len(segmentBatch) >= SegmentChannelSize/2 {
writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
segmentBatch = nil
}
}
// Handle any remaining segments after the channel is closed
if len(segmentBatch) > 0 {
writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
}
}(TransmuxStorageDir, &renditionList)
}

// Start the transcoding (producer) goroutines
jobs.Start()
if err = jobs.Wait(); err != nil {
// return first error to caller
return outputs, segmentsCount, err
}

// If the disk-writing gorouine was started, then close the segment channel to
// signal that no more segments will be sent. This will be a no-op if MP4s are not requested.
close(segmentChannel)
// Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested.
wg.Wait()
segFileWriter.wait()

Check failure on line 189 in transcode/transcode.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `segFileWriter.wait` is not checked (errcheck)

// Build the manifests and push them to storage
manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip)
Expand Down Expand Up @@ -400,27 +365,35 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return outputs, segmentsCount, nil
}

func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) {
for _, segInfo := range segmentBatch {
type segmentFileWriter struct {
group errgroup.Group
transmuxTopLevelDir string
}

func newSegmentFileWriter(transmuxTopLevelDir string) *segmentFileWriter {
s := &segmentFileWriter{transmuxTopLevelDir: transmuxTopLevelDir}
s.group.SetLimit(5)
return s
}

// All accesses to renditionList and segmentList is protected by a mutex behind the scenes
segmentList := renditionList.GetSegmentList(segInfo.RenditionName)
segmentData := segmentList.GetSegment(segInfo.SegmentIndex)
segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts")
func (s *segmentFileWriter) wait() error {
return s.group.Wait()
}

func (s *segmentFileWriter) writeSegment(requestID string, renditionName string, segIndex int, segmentData []byte) {
s.group.Go(func() error {
segmentFilename := filepath.Join(s.transmuxTopLevelDir, requestID+"_"+renditionName+"_"+strconv.Itoa(segIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return 0, fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
return fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return 0, fmt.Errorf("error writing segment err: %w", err)
return fmt.Errorf("error writing segment err: %w", err)
}
// "Delete" buffered segment data from memory in hopes the garbage-collector releases it
segmentList.RemoveSegmentData(segInfo.SegmentIndex)

}
return 0, nil
return nil
})
}

func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) {
Expand Down Expand Up @@ -496,7 +469,7 @@ func transcodeSegment(
transcodedStats []*video.RenditionStats,
renditionList *video.TRenditionList,
broadcaster clients.BroadcasterClient,
segmentChannel chan<- TranscodedSegmentInfo,
segFilewriter *segmentFileWriter,
) error {
start := time.Now()

Expand Down Expand Up @@ -580,16 +553,7 @@ func transcodeSegment(
// exists in the renditionList which contains only profiles for which mp4s will
// be generated i.e. all profiles for mp4 inputs and only highest quality
// rendition for hls inputs like recordings.
segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData)

// send this transcoded segment to the segment channel so that it can be written
// to disk in parallel
segmentChannel <- TranscodedSegmentInfo{
RequestID: transcodeRequest.RequestID,
RenditionName: transcodedSegment.Name, // Use actual rendition name
SegmentIndex: segment.Index, // Use actual segment index
}

segFilewriter.writeSegment(transcodeRequest.RequestID, transcodedSegment.Name, segment.Index, transcodedSegment.MediaData)
}
}

Expand Down
6 changes: 0 additions & 6 deletions video/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) {
s.mu.Unlock()
}

func (s *TSegmentList) RemoveSegmentData(segIdx int) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = []byte{}
s.mu.Unlock()
}

func (s *TSegmentList) GetSegment(segIdx int) []byte {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit a279d65

Please sign in to comment.