Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix external containers #72

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions worker/container.go
Expand Up @@ -17,8 +17,8 @@ const (

type RunnerContainer struct {
RunnerContainerConfig

Client *ClientWithResponses
Capacity int
Client *ClientWithResponses
}

type RunnerEndpoint struct {
Expand Down Expand Up @@ -61,7 +61,7 @@ func NewRunnerContainer(ctx context.Context, cfg RunnerContainerConfig) (*Runner
return nil, err
}

cctx, cancel := context.WithTimeout(ctx, cfg.containerTimeout)
cctx, cancel := context.WithTimeout(context.Background(), cfg.containerTimeout)
if err := runnerWaitUntilReady(cctx, client, pollingInterval); err != nil {
cancel()
return nil, err
Expand All @@ -70,6 +70,7 @@ func NewRunnerContainer(ctx context.Context, cfg RunnerContainerConfig) (*Runner

return &RunnerContainer{
RunnerContainerConfig: cfg,
Capacity: 1,
Client: client,
}, nil
}
Expand Down
50 changes: 36 additions & 14 deletions worker/worker.go
Expand Up @@ -40,22 +40,24 @@ func (sb EnvValue) String() string {
type OptimizationFlags map[string]EnvValue

type Worker struct {
manager *DockerManager
manager *DockerManager
noManagedContainers bool

externalContainers map[string]*RunnerContainer
mu *sync.Mutex
}

func NewWorker(containerImageID string, gpus []string, modelDir string) (*Worker, error) {
func NewWorker(containerImageID string, gpus []string, modelDir string, noManagedContainers bool) (*Worker, error) {
manager, err := NewDockerManager(containerImageID, gpus, modelDir)
if err != nil {
return nil, err
}

return &Worker{
manager: manager,
externalContainers: make(map[string]*RunnerContainer),
mu: &sync.Mutex{},
manager: manager,
noManagedContainers: noManagedContainers,
externalContainers: make(map[string]*RunnerContainer),
mu: &sync.Mutex{},
}, nil
}

Expand Down Expand Up @@ -218,7 +220,11 @@ func (w *Worker) Warm(ctx context.Context, pipeline string, modelID string, endp
}

name := dockerContainerName(pipeline, modelID)
slog.Info("Starting external container", slog.String("name", name), slog.String("modelID", modelID))
if endpoint.URL != "" {
name = cfg.Endpoint.URL
slog.Info("name of container: ", slog.String("url", cfg.Endpoint.URL))
}
slog.Info("Starting external container", slog.String("name", name), slog.String("pipeline", pipeline), slog.String("modelID", modelID))
w.externalContainers[name] = rc

return nil
Expand Down Expand Up @@ -258,25 +264,41 @@ func (w *Worker) HasCapacity(pipeline, modelID string) bool {
func (w *Worker) borrowContainer(ctx context.Context, pipeline, modelID string) (*RunnerContainer, error) {
w.mu.Lock()

name := dockerContainerName(pipeline, modelID)
rc, ok := w.externalContainers[name]
if ok {
w.mu.Unlock()
// We allow concurrent in-flight requests for external containers and assume that it knows
// how to handle them
return rc, nil
for key, rc := range w.externalContainers {
if rc.Pipeline == pipeline && rc.ModelID == modelID {
// The current implementation of ai-runner containers does not have a queue so only do one request at a time to each container
if rc.Capacity > 0 {
slog.Info("selecting container to run request", slog.Int("type", int(rc.Type)), slog.Int("capacity", rc.Capacity), slog.String("url", rc.Endpoint.URL))
w.externalContainers[key].Capacity -= 1
w.mu.Unlock()
return rc, nil
}
}
}

w.mu.Unlock()

if w.noManagedContainers {
return nil, errors.New("no runners available")
}

return w.manager.Borrow(ctx, pipeline, modelID)
}

func (w *Worker) returnContainer(rc *RunnerContainer) {
slog.Info("returning container to be available", slog.Int("type", int(rc.Type)), slog.Int("capacity", rc.Capacity), slog.String("url", rc.Endpoint.URL))

switch rc.Type {
case Managed:
w.manager.Return(rc)
case External:
// Noop because we allow concurrent in-flight requests for external containers
w.mu.Lock()
defer w.mu.Unlock()
//free external container for next request
for key, _ := range w.externalContainers {
if w.externalContainers[key].Endpoint.URL == rc.Endpoint.URL {
w.externalContainers[key].Capacity += 1
}
}
}
}