Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multiple environment decorators #1785

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def _to_job(self, node):
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
if env_deco:
env = env_deco[0].attributes["vars"].copy()
type(env_deco[0]).merge_vars(env_deco, env)

# The below if/else block handles "input paths".
# Input Paths help manage dataflow across the graph.
Expand Down
8 changes: 3 additions & 5 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,11 +1226,9 @@ def _container_templates(self):
# (2) Metaflow runtime specific environment variables
# (3) @kubernetes, @argo_workflows_internal bookkeeping environment
# variables
env = dict(
[deco for deco in node.decorators if deco.name == "environment"][
0
].attributes["vars"]
)
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
type(env_deco[0]).merge_vars(env_deco, env)

# Temporary passing of *some* environment variables. Do not rely on this
# mechanism as it will be removed in the near future
Expand Down
5 changes: 2 additions & 3 deletions metaflow/plugins/aws/batch/batch_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,9 @@ def echo(msg, stream="stderr", batch_id=None, **kwargs):
]

env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
if env_deco:
env = env_deco[0].attributes["vars"]
else:
env = {}
type(env_deco[0]).merge_vars(env_deco, env)

# Add the environment variables related to the input-paths argument
if split_vars:
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def _batch(self, node):
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
if env_deco:
env = env_deco[0].attributes["vars"].copy()
type(env_deco[0]).merge_vars(env_deco, env)

# add METAFLOW_S3_ENDPOINT_URL
if S3_ENDPOINT_URL is not None:
Expand Down
24 changes: 21 additions & 3 deletions metaflow/plugins/environment_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,28 @@ class EnvironmentDecorator(StepDecorator):

name = "environment"
defaults = {"vars": {}}
allow_multiple = True

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
self.logger = logger

def runtime_step_cli(
self, cli_args, retry_count, max_user_code_retries, ubf_context
):
cli_args.env.update(
{key: str(value) for key, value in self.attributes["vars"].items()}
)
self.merge_vars([self], cli_args.env)

@classmethod
def merge_vars(cls, decorators, dest):
"""Merge variables from a list of environment decorators into an existing dictionary."""
for deco in decorators:
for key, value in deco.attributes["vars"].items():
if key in dest and value != dest[key]:
deco.logger(
"Overwriting value {} for environment variable {} with new value {}".format(
dest[key], key, value
)
)
dest[key] = value
dest.update(deco.attributes["vars"])
2 changes: 1 addition & 1 deletion metaflow/plugins/kubernetes/kubernetes_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def echo(msg, stream="stderr", job_id=None, **kwargs):
env = {}
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
if env_deco:
env = env_deco[0].attributes["vars"]
type(env_deco[0]).merge_vars(env_deco, env)

# Set input paths.
input_paths = kwargs.get("input_paths")
Expand Down