Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/catalyst: implement stack of configuration files #680

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -118,7 +118,7 @@ ENV CATALYST_DOWNLOADER_PATH=/usr/local/bin \

RUN mkdir /data

CMD ["/usr/local/bin/catalyst", "--", "/usr/local/bin/MistController", "-c", "/etc/livepeer/full-stack.json"]
CMD ["/usr/local/bin/livepeer-catalyst", "--", "/usr/local/bin/MistController", "-c", "/etc/livepeer/full-stack.json"]

FROM ${FROM_LOCAL_PARENT} AS box-local

Expand Down
10 changes: 7 additions & 3 deletions Makefile
Expand Up @@ -113,9 +113,11 @@ livepeer-api-pkg:
box-kill:
[[ "$$KILL" == "true" ]] && docker exec catalyst pkill -f /usr/local/bin/$(BIN) || echo "Not restarting $(BIN), use KILL=true if you want that"

.PHONY: catalyst
catalyst:
go build -o ./bin/catalyst ./cmd/catalyst/catalyst.go
catalyst: livepeer-catalyst

.PHONY: livepeer-catalyst
livepeer-catalyst:
go build -o ./bin/livepeer-catalyst ./cmd/catalyst/catalyst.go

.PHONY: download
download:
Expand Down Expand Up @@ -214,6 +216,8 @@ box-dev: scripts
-v $$(realpath config):/etc/livepeer:ro \
-v $$(realpath ./coredumps):$$(realpath ./coredumps) \
-e CORE_DUMP_DIR=$$(realpath ./coredumps) \
-e CATALYST_CONFIG=/etc/livepeer/full-stack.yaml \
-e CATALYST_DOWNLOAD=false \
$(shell for line in $$(cat .env 2>/dev/null || echo ''); do printf -- "-e $$line "; done) \
--rm \
-it \
Expand Down
44 changes: 34 additions & 10 deletions cmd/catalyst/catalyst.go
Expand Up @@ -2,9 +2,11 @@ package main

import (
"os"
"os/exec"
"syscall"

"github.com/livepeer/catalyst/cmd/downloader/cli"
"github.com/livepeer/catalyst/cmd/catalyst/cli"
"github.com/livepeer/catalyst/cmd/catalyst/config"
"github.com/livepeer/catalyst/cmd/downloader/downloader"
"github.com/livepeer/catalyst/cmd/downloader/types"
glog "github.com/magicsong/color-glog"
Expand All @@ -21,22 +23,44 @@ func main() {
glog.Fatalf("error parsing cli flags: %s", err)
return
}
err = downloader.Run(cliFlags)
if cliFlags.Download {
err = downloader.Run(cliFlags)
if err != nil {
glog.Fatalf("error running downloader: %s", err)
}
}
if !cliFlags.Exec {
return
}
err = execNext(cliFlags)
if err != nil {
glog.Fatalf("error running downloader: %s", err)
glog.Fatalf("error executing MistController: %s", err)
}
execNext(cliFlags)
}

// Done! Move on to the provided next application, if it exists.
func execNext(cliFlags types.CliFlags) {
if len(cliFlags.ExecCommand) == 0 {
// Nothing to do.
return
func execNext(cliFlags types.CliFlags) error {
jsonBytes, err := config.HandleConfigStack(cliFlags.ConfigStack)
if err != nil {
return err
}
f, err := os.CreateTemp("", "catalyst-generated-*.json")
if err != nil {
return err
}
_, err = f.Write(jsonBytes)
if err != nil {
return err
}
glog.Infof("downloader complete, now we will exec %v", cliFlags.MistController)
binary, err := exec.LookPath(cliFlags.MistController)
if err != nil {
return err
}
glog.Infof("downloader complete, now we will exec %v", cliFlags.ExecCommand)
execErr := syscall.Exec(cliFlags.ExecCommand[0], cliFlags.ExecCommand, os.Environ())
args := []string{binary, "-c", f.Name()}
execErr := syscall.Exec(binary, args, os.Environ())
if execErr != nil {
glog.Fatalf("error running next command: %s", execErr)
}
return nil
}
49 changes: 49 additions & 0 deletions cmd/catalyst/cli/cli.go
@@ -0,0 +1,49 @@
package cli

import (
"flag"
"fmt"
"os"

downloaderCli "github.com/livepeer/catalyst/cmd/downloader/cli"
"github.com/livepeer/catalyst/cmd/downloader/constants"
"github.com/livepeer/catalyst/cmd/downloader/types"
"github.com/peterbourgon/ff/v3"
)

// GetCliFlags reads command-line arguments and generates a struct
// with useful values set after parsing the same.
func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) {
cliFlags := types.CliFlags{}
flag.Set("logtostderr", "true")
vFlag := flag.Lookup("v")
fs := flag.NewFlagSet(constants.AppName, flag.ExitOnError)

downloaderCli.AddDownloaderFlags(fs, &cliFlags)

fs.StringVar(&cliFlags.MistController, "mist-controller", "MistController", "Path to MistController binary to exec when done")
fs.BoolVar(&cliFlags.Exec, "exec", true, "Exec MistController when (optional) update is complete")
fs.StringVar(&cliFlags.ConfigStack, "config", "/etc/livepeer/catalyst.yaml", "Path to multiple Catalyst config files to use. Can contain multiple entries e.g. /conf1:/conf2")

version := fs.Bool("version", false, "Get version information")

if *version {
fmt.Printf("catalyst version: %s\n", buildFlags.Version)
os.Exit(0)
}

ff.Parse(
fs, os.Args[1:],
ff.WithConfigFileParser(ff.PlainParser),
ff.WithEnvVarPrefix("CATALYST"),
ff.WithEnvVarSplit(","),
)
flag.CommandLine.Parse(nil)
vFlag.Value.Set(cliFlags.Verbosity)

err := downloaderCli.ValidateFlags(&cliFlags)
if err != nil {
return cliFlags, err
}
return cliFlags, err
}
111 changes: 111 additions & 0 deletions cmd/catalyst/config/config.go
@@ -0,0 +1,111 @@
package config

import (
"encoding/json"
"fmt"
"os"
"strings"

"github.com/icza/dyno"
"gopkg.in/yaml.v2"
)

// takes /path1:/path2:/path3 and returns JSON bytes
func HandleConfigStack(configPaths string) ([]byte, error) {
var err error
merged := map[string]any{}
filePaths := strings.Split(configPaths, ":")
for _, filePath := range filePaths {
contents, err := readYAMLFile(filePath)
// todo: handle missing file case (allowed as long as we have some)
if err != nil {
return []byte{}, fmt.Errorf("error handling config file %s: %w", filePath, err)
}
merged = mergeMaps(merged, contents)
}
config, err := optionalMap(merged, "config")
if err != nil {
return nil, err
}
protocols, err := optionalMap(config, "protocols")
if err != nil {
return nil, err
}
protocolArray := []map[string]any{}
for k, v := range protocols {
if v == nil {
continue
}
vMap, ok := v.(map[string]any)
if !ok {
return nil, fmt.Errorf("unable to convert protocol '%s' to a string map", k)
}
protocolArray = append(protocolArray, vMap)
}
config["protocols"] = protocolArray
jsonBytes, err := json.MarshalIndent(merged, "", " ")
if err != nil {
return nil, err
}
return jsonBytes, nil
}

// Returns a new map merging source into dest
// Merges any map[string]any maps that are present
// Overwrites everything else
func mergeMaps(dest, source map[string]any) map[string]any {
merged := map[string]any{}
// Start with a shallow copy of `dest`
for k, v := range dest {
merged[k] = v
}
for newKey, newValue := range source {
oldValue, has := merged[newKey]
if !has {
merged[newKey] = newValue
continue
}
newMap, newOk := newValue.(map[string]any)
oldMap, oldOk := oldValue.(map[string]any)
if newOk && oldOk {
// Both maps. Merge em!
merged[newKey] = mergeMaps(oldMap, newMap)
continue
}
// One or both is not a map, just copy over the new value
merged[newKey] = newValue
}
return merged
}

func readYAMLFile(filePath string) (map[string]any, error) {
var conf map[any]any
dat, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(dat, &conf)
if err != nil {
return nil, err
}
jsonConf := dyno.ConvertMapI2MapS(conf)
jsonMap, ok := jsonConf.(map[string]any)
if !ok {
return nil, fmt.Errorf("unable to convert config to a string map")
}
return jsonMap, nil
}

// Return a (mutable) reference to the map at the given key, returning an empty one if none present
func optionalMap(parent map[string]any, key string) (map[string]any, error) {
child, ok := parent[key]
if !ok {
child = map[string]any{}
parent[key] = child
}
childMap, ok := child.(map[string]any)
if !ok {
return nil, fmt.Errorf("unable to convert '%s' to a string map", key)
}
return childMap, nil
}
89 changes: 89 additions & 0 deletions cmd/catalyst/config/config_test.go
@@ -0,0 +1,89 @@
package config

import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"

"github.com/icza/dyno"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

func randPath(t *testing.T) string {
randBytes := make([]byte, 16)
rand.Read(randBytes)
return filepath.Join(t.TempDir(), hex.EncodeToString(randBytes)+".yaml")
}

func toFiles(t *testing.T, strs ...string) string {
paths := []string{}
for _, content := range strs {
filepath := randPath(t)
os.WriteFile(filepath, []byte(content), 0644)
paths = append(paths, filepath)
}
return strings.Join(paths, ":")
}

func yamlToJson(t *testing.T, yamlStr string) string {

Check warning on line 33 in cmd/catalyst/config/config_test.go

View workflow job for this annotation

GitHub Actions / Test the catalyst project

func yamlToJson should be yamlToJSON
var yamlStruct map[any]any
err := yaml.Unmarshal([]byte(yamlStr), &yamlStruct)
require.NoError(t, err)
jsonStruct := dyno.ConvertMapI2MapS(yamlStruct)
jsonBytes, err := json.Marshal(jsonStruct)
require.NoError(t, err)
return string(jsonBytes)
}

func TestMerge(t *testing.T) {
confStack := toFiles(t, conf1, conf2, conf3)
jsonBytes, err := HandleConfigStack(confStack)
require.NoError(t, err)
require.JSONEq(t, yamlToJson(t, mergedConf), string(jsonBytes))
}

var conf1 = `
foo: conf1
some-map:
opt1: cool
config:
protocols:
example-protocol:
protocol-number: 15
protocol-boolean: true
protocol-string: foobar
removed-protocol:
connector: asdf
`

var conf2 = `
foo: conf2
`

var conf3 = `
foo: conf3
some-map:
opt2: lmao
config:
protocols:
example-protocol:
protocol-string: override
removed-protocol: null
`

var mergedConf = `
foo: conf3
some-map:
opt1: cool
opt2: lmao
config:
protocols:
- protocol-number: 15
protocol-boolean: true
protocol-string: override
`