Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: limit argo workflow name length #1608

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 97 additions & 7 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,38 @@ def cli():
help="Argo Workflow name. The flow name is used instead if "
"this option is not specified.",
)
@click.option(
"--old-name-length",
default=False,
is_flag=True,
type=bool,
help="Use old limit of 253 characters for Argo Workflow Template names.",
)
@click.pass_obj
def argo_workflows(obj, name=None):
def argo_workflows(obj, name=None, old_name_length=False):
check_python_version(obj)
obj.check(obj.graph, obj.flow, obj.environment, pylint=obj.pylint)
(
obj.workflow_name,
obj.token_prefix,
obj.is_project,
) = resolve_workflow_name(obj, name)
workflow_name, token_prefix, is_project = old_resolve_workflow_name(obj, name)
if not old_name_length and len(workflow_name) > 63:
# check for existing deployment with old name.
# Error out as we're trying to deploy with a new name and ask for user action to resolve the issue.
existing_workflow = ArgoWorkflows.get_existing_deployment(workflow_name)
if existing_workflow:
raise ArgoWorkflowsNameTooLong(
"Can not perform command with new name length limit (63) as an existing deployment\n"
"*%s* was found with a name that is too long.\n\n"
"You can target this deployment with "
"*argo-workflows --old-name-length [command]* to use the old length limit (253).\n"
"Delete the old deployment in order to avoid this error in the future."
% workflow_name
)
workflow_name, token_prefix, is_project = resolve_workflow_name(obj, name)

(obj.workflow_name, obj.token_prefix, obj.is_project) = (
workflow_name,
token_prefix,
is_project,
)


@argo_workflows.command(help="Deploy a new version of this workflow to Argo Workflows.")
Expand Down Expand Up @@ -318,7 +341,12 @@ def check_metadata_service_version(obj):
)


def resolve_workflow_name(obj, name):
def old_resolve_workflow_name(obj, name):
# Legacy case; Workflows were deployed with the maxLength of Kubernetes metadata name values,
# but this prevents launching flows with long names through Argo UI,
# as the workflow name needs to fit into a Kubernetes label.
#
# We need to take care of existing deployments that have a name that is too long.
project = current.get("project_name")
obj._is_workflow_name_modified = False
if project:
Expand Down Expand Up @@ -377,6 +405,68 @@ def resolve_workflow_name(obj, name):
return workflow_name, token_prefix.lower(), is_project


def resolve_workflow_name(obj, name):
project = current.get("project_name")
obj._is_workflow_name_modified = False
if project:
if name:
raise MetaflowException(
"--name is not supported for @projects. Use --branch instead."
)
workflow_name = current.project_flow_name
project_branch = to_bytes(".".join((project, current.branch_name)))
token_prefix = (
"mfprj-%s"
% to_unicode(base64.b32encode(sha1(project_branch).digest()))[:16]
)
is_project = True
# Argo Workflow names shouldn't be longer than 63 characters, so we truncate
# by default. Also, while project and branch allow for underscores, Argo
# Workflows doesn't (DNS Subdomain names as defined in RFC 1123) - so we will
# remove any underscores as well as convert the name to lower case.
# Also remove + and @ as not allowed characters, which can be part of the
# project branch due to using email addresses as user names.
#
# TODO: there is a slight chance that the truncation will end up producing a name with
# .- which is not a valid subdomain name.
if len(workflow_name) > 63:
name_hash = to_unicode(
base64.b32encode(sha1(to_bytes(workflow_name)).digest())
)[:8].lower()
workflow_name = "%s-%s" % (workflow_name[:52], name_hash)
obj._is_workflow_name_modified = True
if not VALID_NAME.search(workflow_name):
workflow_name = sanitize_for_argo(workflow_name)
obj._is_workflow_name_modified = True
else:
if name and not VALID_NAME.search(name):
raise MetaflowException(
"Name '%s' contains invalid characters. The "
"name must consist of lower case alphanumeric characters, '-' or '.'"
", and must start and end with an alphanumeric character." % name
)

workflow_name = name if name else current.flow_name
token_prefix = workflow_name
is_project = False

if len(workflow_name) > 63:
msg = (
"The full name of the workflow:\n*%s*\nis longer than 63 "
"characters.\n\n"
"To deploy this workflow to Argo Workflows, please "
"assign a shorter name\nusing the option\n"
"*argo-workflows --name <name> create*." % workflow_name
)
raise ArgoWorkflowsNameTooLong(msg)

if not VALID_NAME.search(workflow_name):
workflow_name = sanitize_for_argo(workflow_name)
obj._is_workflow_name_modified = True

return workflow_name, token_prefix.lower(), is_project


def make_flow(
obj,
token,
Expand Down
132 changes: 132 additions & 0 deletions test/unit/test_argo_workflows_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
import contextlib

from metaflow.plugins.argo.argo_workflows_cli import (
resolve_workflow_name,
old_resolve_workflow_name,
ArgoWorkflowsNameTooLong,
)
from metaflow import current
from metaflow.current import Current


class DummyObject(object):
# For testing workflow name generation
def __init__(self, name) -> None:
self.name = name


@contextlib.contextmanager
def mocked_current(flow, project=None, branch=None):
prev = current._flow_name
current._flow_name = flow
if project is not None:
current._update_env(
{
"project_name": project,
"branch_name": branch,
"project_flow_name": "%s.%s.%s" % (project, branch, flow),
}
)
yield
# cleanup
try:
current._flow_name = prev
delattr(Current, "project_name")
delattr(Current, "branch_name")
delattr(Current, "project_flow_name")
except Exception:
pass


# Test 253 character length names


@pytest.mark.parametrize(
"name,expected_name",
[
("test-flow", "test-flow"),
("test.flow.with.dots", "test.flow.with.dots"),
],
)
def test_old_workflow_names_with_name(name, expected_name):
_obj = DummyObject("test")
workflow_name, token_prefix, is_project = old_resolve_workflow_name(_obj, name)
assert expected_name == workflow_name


@pytest.mark.parametrize(
"flow, project, branch ,name,expected_name",
[
("flow", "project", "user.test", None, "project.user.test.flow"),
("TestFlow", None, None, None, "testflow"),
],
)
def test_old_workflow_names_with_object(flow, project, branch, name, expected_name):
with mocked_current(flow, project, branch):
obj = DummyObject(name)
workflow_name, token_prefix, is_project = old_resolve_workflow_name(obj, name)
assert expected_name == workflow_name


def test_old_workflow_names_errors():
long_name = 254 * "a"
obj = DummyObject("test")
with pytest.raises(ArgoWorkflowsNameTooLong):
old_resolve_workflow_name(obj, long_name)

with pytest.raises(ArgoWorkflowsNameTooLong):
with mocked_current(long_name):
old_resolve_workflow_name(obj, None)

with mocked_current(long_name, "project_a", "user.b"):
# should truncate instead
workflow_name, _, _ = old_resolve_workflow_name(obj, None)
assert "projecta.user.b" in workflow_name
assert len(workflow_name) == 250


# Test 63 character length names
@pytest.mark.parametrize(
"obj,name,expected_name",
[
("test-flow", "test-flow"),
("test.flow.with.dots", "test.flow.with.dots"),
("TestFlow", "testflow"),
],
)
def test_new_workflow_names_with_object(name, expected_name):
_obj = DummyObject("test")
workflow_name, token_prefix, is_project = resolve_workflow_name(_obj, name)
assert expected_name == workflow_name


@pytest.mark.parametrize(
"flow, project, branch ,name,expected_name",
[
("flow", "project", "user.test", None, "project.user.test.flow"),
("TestFlow", None, None, None, "testflow"),
],
)
def test_new_workflow_names_with_object(flow, project, branch, name, expected_name):
with mocked_current(flow, project, branch):
obj = DummyObject(name)
workflow_name, token_prefix, is_project = resolve_workflow_name(obj, name)
assert expected_name == workflow_name


def test_new_workflow_names_errors():
long_name = 64 * "a"
obj = DummyObject("test")
with pytest.raises(ArgoWorkflowsNameTooLong):
resolve_workflow_name(obj, long_name)

with pytest.raises(ArgoWorkflowsNameTooLong):
with mocked_current(long_name):
resolve_workflow_name(obj, None)

with mocked_current(long_name, "project_a", "user.b"):
# should truncate instead
workflow_name, _, _ = resolve_workflow_name(obj, None)
assert "projecta.user.b" in workflow_name
assert len(workflow_name) == 60