Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) memory optimizations #994

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 93 additions & 17 deletions transcode/transcode.go
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -22,7 +23,7 @@ import (

const (
UploadTimeout = 5 * time.Minute
TransmuxStorageDir = "/tmp/transmux_stage"
SegmentChannelSize = 10
)

type TranscodeSegmentRequest struct {
Expand Down Expand Up @@ -53,6 +54,12 @@ type TranscodeSegmentRequest struct {
IsClip bool
}

type TranscodedSegmentInfo struct {
RequestID string
RenditionName string
SegmentIndex int
}

func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) {
log.AddContext(transcodeRequest.RequestID, "source_manifest", transcodeRequest.SourceManifestURL, "stream_name", streamName)
log.Log(transcodeRequest.RequestID, "RunTranscodeProcess (v2) Beginning")
Expand Down Expand Up @@ -148,9 +155,16 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}
}

// Create a buffered channel where transcoded segments are sent to be written to disk
segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize)

// Create a waitgroup to synchronize when the disk writing goroutine finishes
var wg sync.WaitGroup

// Setup parallel transcode sessions
var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel)
segmentsCount++
if err != nil {
return err
Expand All @@ -162,12 +176,55 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}
return nil
})

var TransmuxStorageDir string
if transcodeRequest.GenerateMP4 {
var err error
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

// Start the disk-writing (consumer) goroutine
wg.Add(1)
go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) {
var segmentBatch []TranscodedSegmentInfo
defer wg.Done()

// Keep checking for new segments in the buffered channel
for segInfo := range segmentChannel {
segmentBatch = append(segmentBatch, segInfo)
// Begin writing to disk if at-least 50% of buffered channel is full
if len(segmentBatch) >= SegmentChannelSize/2 {
writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex)
segmentBatch = nil
}
}
// Handle any remaining segments after the channel is closed
if len(segmentBatch) > 0 {
writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch)
fmt.Println("XXX: writing to disk here after channel close")
}
}(TransmuxStorageDir, &renditionList)
}

// Start the transcoding (producer) goroutines
jobs.Start()
if err = jobs.Wait(); err != nil {
// return first error to caller
return outputs, segmentsCount, err
}

// If the disk-writing gorouine was started, then close the segment channel to
// signal that no more segments will be sent. This will be a no-op if MP4s are not requested.
close(segmentChannel)
// Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested.
wg.Wait()

// Build the manifests and push them to storage
manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip)
if err != nil {
Expand All @@ -193,14 +250,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st

var concatFiles []string
for rendition, segments := range renditionList.RenditionSegmentTable {
// Create folder to hold transmux-ed files in local storage temporarily
TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_")
if err != nil && !os.IsExist(err) {
log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err)
return outputs, segmentsCount, err
}
defer os.RemoveAll(TransmuxStorageDir)

// Create a single .ts file for a given rendition by concatenating all segments in order
if rendition == "low-bitrate" {
// skip mp4 generation for low-bitrate profile
Expand All @@ -210,14 +259,8 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
concatFiles = append(concatFiles, concatTsFileName)
defer os.Remove(concatTsFileName)

// For now, use the stream based concat for clipping only and file based concat for everything else.
// Eventually, all mp4 generation can be moved to stream based concat once proven effective.
var totalBytes int64
if transcodeRequest.IsClip {
totalBytes, err = video.ConcatTS(concatTsFileName, segments, true)
} else {
totalBytes, err = video.ConcatTS(concatTsFileName, segments, false)
}
totalBytes, err = video.ConcatTS(concatTsFileName, segments, true)
if err != nil {
log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err)
continue
Expand Down Expand Up @@ -359,6 +402,29 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return outputs, segmentsCount, nil
}

func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) {
for _, segInfo := range segmentBatch {

// All accesses to renditionList and segmentList is protected by a mutex behind the scenes
segmentList := renditionList.GetSegmentList(segInfo.RenditionName)
segmentData := segmentList.GetSegment(segInfo.SegmentIndex)
segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts")
segmentFile, err := os.Create(segmentFilename)
if err != nil {
return 0, fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err)
}
defer segmentFile.Close()
_, err = segmentFile.Write(segmentData)
if err != nil {
return 0, fmt.Errorf("error writing segment err: %w", err)
}
// "Delete" buffered segment data from memory in hopes the garbage-collector releases it
segmentList.RemoveSegmentData(segInfo.SegmentIndex)

}
return 0, nil
}

func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) {
var mp4OutputsPre []video.OutputVideoFile
// e. Upload all mp4 related output files
Expand Down Expand Up @@ -432,6 +498,7 @@ func transcodeSegment(
transcodedStats []*video.RenditionStats,
renditionList *video.TRenditionList,
broadcaster clients.BroadcasterClient,
segmentChannel chan<- TranscodedSegmentInfo,
) error {
start := time.Now()

Expand Down Expand Up @@ -516,6 +583,15 @@ func transcodeSegment(
// be generated i.e. all profiles for mp4 inputs and only highest quality
// rendition for hls inputs like recordings.
segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData)

// send this transcoded segment to the segment channel so that it can be written
// to disk in parallel
segmentChannel <- TranscodedSegmentInfo{
RequestID: transcodeRequest.RequestID,
RenditionName: transcodedSegment.Name, // Use actual rendition name
SegmentIndex: segment.Index, // Use actual segment index
}

}
}

Expand Down
6 changes: 6 additions & 0 deletions video/media.go
Expand Up @@ -39,6 +39,12 @@ func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) {
s.mu.Unlock()
}

func (s *TSegmentList) RemoveSegmentData(segIdx int) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = []byte{}
s.mu.Unlock()
}

func (s *TSegmentList) GetSegment(segIdx int) []byte {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
25 changes: 16 additions & 9 deletions video/transmux.go
Expand Up @@ -138,19 +138,26 @@ func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConca
}()

// Write each segment to disk and add segment filename to the text file
for segName, segData := range segmentsList.GetSortedSegments() {
for segName, _ := range segmentsList.GetSortedSegments() {
// Open a new file to write each segment to disk
segmentFilename := fileBaseWithoutExt + "_" + strconv.Itoa(segName) + ".ts"
segmentFile, err := os.Create(segmentFilename)
/* segmentFile, err := os.Create(segmentFilename)
if err != nil {
return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err)
}
defer segmentFile.Close()
// Write the segment data to disk
segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData])
if err != nil {
return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err)
}
*/
fileInfo, err := os.Stat(segmentFilename)
if err != nil {
return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err)
}
defer segmentFile.Close()
// Write the segment data to disk
segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData])
if err != nil {
return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err)
return totalBytes, fmt.Errorf("error stat segment %d err: %w", segName, err)
}
segBytes := fileInfo.Size()

segmentFilenames = append(segmentFilenames, segmentFilename)
totalBytes = totalBytes + int64(segBytes)
// Add filename to the text file
Expand Down