Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AI] Text to video pipeline support #3002

Open
wants to merge 3 commits into
base: ai-video
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions ai/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ func (w *FileWorker) ImageToVideo(ctx context.Context, req worker.ImageToVideoMu
return &resp, nil
}

func (w *FileWorker) TextToVideo(ctx context.Context, req worker.TextToVideoJSONRequestBody) (*worker.VideoResponse, error) {
fname, ok := w.files["text-to-video"]
if !ok {
return nil, errors.New("text-to-video response file not found")
}

data, err := os.ReadFile(fname)
if err != nil {
return nil, err
}

var resp worker.VideoResponse
if err := json.Unmarshal(data, &resp); err != nil {
return nil, err
}

return &resp, nil
}

func (w *FileWorker) Warm(ctx context.Context, containerName, modelID string) error {
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,18 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
constraints[core.Capability_ImageToVideo].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_ImageToVideo, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
case "text-to-video":
_, ok := constraints[core.Capability_TextToVideo]
if !ok {
aiCaps = append(aiCaps, core.Capability_TextToVideo)
constraints[core.Capability_TextToVideo] = &core.Constraints{
Models: make(map[string]*core.ModelConstraint),
}
}

constraints[core.Capability_TextToVideo].Models[config.ModelID] = modelConstraint

n.SetBasePriceForCap("default", core.Capability_TextToVideo, config.ModelID, big.NewRat(config.PricePerUnit, config.PixelsPerUnit))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TextToImage(context.Context, worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error)
ImageToImage(context.Context, worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error)
ImageToVideo(context.Context, worker.ImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
TextToVideo(context.Context, worker.TextToVideoJSONRequestBody) (*worker.VideoResponse, error)

Check failure on line 18 in core/ai.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-cpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 18 in core/ai.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 18 in core/ai.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-gpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 18 in core/ai.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-arm64

undefined: worker.TextToVideoJSONRequestBody
Warm(context.Context, string, string, worker.RunnerEndpoint) error
Stop(context.Context) error
}
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
Capability_TextToImage
Capability_ImageToImage
Capability_ImageToVideo
Capability_TextToVideo
)

var CapabilityNameLookup = map[Capability]string{
Expand Down Expand Up @@ -104,6 +105,7 @@ var CapabilityNameLookup = map[Capability]string{
Capability_TextToImage: "Text to image",
Capability_ImageToImage: "Image to image",
Capability_ImageToVideo: "Image to video",
Capability_TextToVideo: "Text to video",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down Expand Up @@ -192,6 +194,7 @@ func OptionalCapabilities() []Capability {
Capability_TextToImage,
Capability_ImageToImage,
Capability_ImageToVideo,
Capability_TextToVideo,
}
}

Expand Down
82 changes: 82 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@
return orch.node.imageToVideo(ctx, req)
}

func (orch *orchestrator) TextToVideo(ctx context.Context, req worker.TextToVideoJSONRequestBody) (*worker.ImageResponse, error) {

Check failure on line 120 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-cpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 120 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 120 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-gpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 120 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-arm64

undefined: worker.TextToVideoJSONRequestBody
return orch.node.textToVideo(ctx, req)
}

func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Payment, manifestID ManifestID) error {
if orch.node == nil || orch.node.Recipient == nil {
return nil
Expand Down Expand Up @@ -931,6 +935,84 @@
return n.AIWorker.ImageToImage(ctx, req)
}

func (n *LivepeerNode) textToVideo(ctx context.Context, req worker.TextToVideoJSONRequestBody) (*worker.ImageResponse, error) {

Check failure on line 938 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-cpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 938 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 938 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for linux-gpu-amd64

undefined: worker.TextToVideoJSONRequestBody

Check failure on line 938 in core/orchestrator.go

View workflow job for this annotation

GitHub Actions / Build binaries for darwin-arm64

undefined: worker.TextToVideoJSONRequestBody
// We might support generating more than one video in the future (i.e. multiple input images/prompts)
numVideos := 1

// Generate frames
start := time.Now()
resp, err := n.AIWorker.TextToVideo(ctx, req)
if err != nil {
return nil, err
}

if len(resp.Frames) != numVideos {
return nil, fmt.Errorf("unexpected number of text-to-video outputs expected=%v actual=%v", numVideos, len(resp.Frames))
}

took := time.Since(start)
clog.V(common.DEBUG).Infof(ctx, "Generating frames took=%v", took)

sessionID := string(RandomManifestID())
framerate := 7
if req.Fps != nil {
framerate = *req.Fps
}
inProfile := ffmpeg.VideoProfile{
Framerate: uint(framerate),
FramerateDen: 1,
}
height := 576
if req.Height != nil {
height = *req.Height
}
width := 1024
if req.Width != nil {
width = *req.Width
}
outProfile := ffmpeg.VideoProfile{
Name: "text-to-video",
Resolution: fmt.Sprintf("%vx%v", width, height),
Bitrate: "6000k",
Format: ffmpeg.FormatMP4,
}
// HACK: Re-use worker.ImageResponse to return results
// Transcode frames into segments.
videos := make([]worker.Media, len(resp.Frames))
for i, batch := range resp.Frames {
// Create slice of frame urls for a batch
urls := make([]string, len(batch))
for j, frame := range batch {
urls[j] = frame.Url
}

// Transcode slice of frame urls into a segment
res := n.transcodeFrames(ctx, sessionID, urls, inProfile, outProfile)
if res.Err != nil {
return nil, res.Err
}

// Assume only single rendition right now
seg := res.TranscodeData.Segments[0]
name := fmt.Sprintf("%v.mp4", RandomManifestID())
segData := bytes.NewReader(seg.Data)
uri, err := res.OS.SaveData(ctx, name, segData, nil, 0)
if err != nil {
return nil, err
}

videos[i] = worker.Media{
Url: uri,
}

if len(batch) > 0 {
videos[i].Seed = batch[0].Seed
}
}

return &worker.ImageResponse{Images: videos}, nil
}

func (n *LivepeerNode) imageToVideo(ctx context.Context, req worker.ImageToVideoMultipartRequestBody) (*worker.ImageResponse, error) {
// We might support generating more than one video in the future (i.e. multiple input images/prompts)
numVideos := 1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
go.uber.org/goleak v1.3.0
golang.org/x/net v0.19.0
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.31.0
pgregory.net/rapid v1.1.0
)

Expand Down Expand Up @@ -197,7 +198,6 @@ require (
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,6 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-netroute v0.2.0/go.mod h1:Vio7LTzZ+6hoT4CMZi5/6CpY3Snzh2vgZhWgxMNwlQI=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/livepeer/ai-worker v0.0.0-20240213170524-1c860ab6e412 h1:zc6XbU0KvJ8C2jDdsZP8IkWeKN0TCQMIUyl3fOEydoU=
github.com/livepeer/ai-worker v0.0.0-20240213170524-1c860ab6e412/go.mod h1:JvUlcQktSgkEfzotuelfw9OpjGi2qZTW/3tWB/klb/c=
github.com/livepeer/ai-worker v0.0.0-20240214164547-fbdca26a9b2d h1:YpBW6wqwpQ9ffgXPnCEoMbsQEbCT5G2AbTb91jrD6BU=
github.com/livepeer/ai-worker v0.0.0-20240214164547-fbdca26a9b2d/go.mod h1:JvUlcQktSgkEfzotuelfw9OpjGi2qZTW/3tWB/klb/c=
github.com/livepeer/ai-worker v0.0.0-20240214223314-3d355743417a h1:LZwlUatQLU7rkICGMmca71DE67iGhln/QttAehEs8LA=
github.com/livepeer/ai-worker v0.0.0-20240214223314-3d355743417a/go.mod h1:JvUlcQktSgkEfzotuelfw9OpjGi2qZTW/3tWB/klb/c=
github.com/livepeer/ai-worker v0.0.0-20240220213200-59b5b237cd8b h1:zCQv/A3Pafdr/NutU7zLChkF+XcAdzY0ObjN1E4nr44=
github.com/livepeer/ai-worker v0.0.0-20240220213200-59b5b237cd8b/go.mod h1:JvUlcQktSgkEfzotuelfw9OpjGi2qZTW/3tWB/klb/c=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b h1:VQcnrqtCA2UROp7q8ljkh2XA/u0KRgVv0S1xoUvOweE=
Expand Down
38 changes: 38 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func startAIServer(lp lphttp) error {
lp.transRPC.Handle("/text-to-image", oapiReqValidator(lp.TextToImage()))
lp.transRPC.Handle("/image-to-image", oapiReqValidator(lp.ImageToImage()))
lp.transRPC.Handle("/image-to-video", oapiReqValidator(lp.ImageToVideo()))
lp.transRPC.Handle("/text-to-video", oapiReqValidator(lp.TextToVideo()))

return nil
}
Expand Down Expand Up @@ -107,6 +108,23 @@ func (h *lphttp) ImageToVideo() http.Handler {
})
}

func (h *lphttp) TextToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
orch := h.orchestrator

remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)

var req worker.TextToVideoJSONRequestBody
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondWithError(w, err.Error(), http.StatusBadRequest)
return
}

handleAIRequest(ctx, w, r, orch, req)
})
}

func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, orch Orchestrator, req interface{}) {
payment, err := getPayment(r.Header.Get(paymentHeader))
if err != nil {
Expand Down Expand Up @@ -182,6 +200,26 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
// The # of frames outputted by stable-video-diffusion-img2vid-xt models
frames := int64(25)

outPixels = height * width * int64(frames)
case worker.TextToVideoJSONRequestBody:
cap = core.Capability_TextToVideo
modelID = *v.ModelId
submitFn = func(ctx context.Context) (*worker.ImageResponse, error) {
return orch.TextToVideo(ctx, v)
}

// TODO: The orchestrator should require the broadcaster to always specify a height and width
height := int64(576)
if v.Height != nil {
height = int64(*v.Height)
}
width := int64(1024)
if v.Width != nil {
width = int64(*v.Width)
}
// The # of frames outputted by stable-video-diffusion-img2vid-xt models
frames := int64(25)

outPixels = height * width * int64(frames)
default:
respondWithError(w, "Unknown request type", http.StatusBadRequest)
Expand Down