Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent start trigger initialization in scheduler #39585

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented May 13, 2024

Why

During #38674, I introduced a logic that might run user code in the scheduler here. This PR intent to propose a new logic that could avoid this.

What

  • Introduce a StartTriggerArgs class which should contain trigger_cls (or maybe it should be renamed as trigger_cls_path?), tirgger_kwargs, timeout, next_method which are what we need when we run an operator in deferrable mode.
    • airflow.models.abstractoperator might not be the best module to put this data class but might need some suggestion

Operator authors will now need to set the start_trigger_args this one to start execution directly from triggerer.

from __future__ import annotations

from datetime import timedelta
from typing import Any

from airflow.models.baseoperator import BaseOperator, StartTriggerArgs
from airflow.triggers.temporal import TimeDeltaTrigger


class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs=None,
        next_method="execute_complete",
        timeout=None,
    )

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = {}

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

If the trigger_kwargs is set to None and not updated in __init__ then this task won't be deferred from starting. This is designed this way so that an operator can decide whether to start the execution from the triggerer. e.g.,

class AsyncOperator(BaseOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.triggers.testing.SuccessTrigger",
        trigger_kwargs=None,
        next_method="execute_complete",
        timeout=None,
    )

    def __init__(self, *args, start_from_trigger=False, **kwargs):
        super().__init__(*args, **kwargs)
        if start_from_trigger is True:
            self.start_trigger_args.trigger_kwargs = {}

    def execute_complete(self, context, event=None) -> None:
        self.log.info("execute complete")

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 7 times, most recently from d621388 to 657a1cf Compare May 14, 2024 15:29
@Lee-W Lee-W changed the title fix(baseoperator): change start_trigger into start_trigger_cls and start_trigger_kwargs Prevent start trigger initialization in scheduler May 14, 2024
@Lee-W Lee-W marked this pull request as ready for review May 14, 2024 15:38
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 657a1cf to 926b3d3 Compare May 14, 2024 15:42
@dstandish
Copy link
Contributor

maybe we should mark this experimental.

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 926b3d3 to ec0a768 Compare May 15, 2024 02:27
@Lee-W
Copy link
Member Author

Lee-W commented May 15, 2024

maybe we should mark this experimental.

I'm ok with it. maybe wait for others' comment? if we're to mark it as experimental, where should I do so?

trigger_kwargs being None vs {} does not seem like an explicit / obvious enough way to decide whether to start from trigger. why wouldn't we just look at the start_from_trigger boolean`?

In the previous PR, @uranusjr and I discussed whether we could infer start_from_trigger through existing args. I do not have a strong opinion on this one, but create a commit with start_from_trigger so that even if we don't want it, I can still revert it easily

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 4 times, most recently from 529ecc1 to 12fb69d Compare May 17, 2024 12:59
@@ -143,10 +143,12 @@ The ``self.defer`` call raises the ``TaskDeferred`` exception, so it can work an
Triggering Deferral from Start
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: address #38674 (comment)

@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 2 times, most recently from 445274d to dac10f5 Compare May 20, 2024 11:58
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch 2 times, most recently from 4b0ac4a to 5f99ce7 Compare May 28, 2024 07:50
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 31fab0c to 4be767c Compare May 28, 2024 09:50
@Lee-W Lee-W force-pushed the remove-start-trigger-initialization-from-scheduler branch from 4be767c to 3dcace0 Compare May 29, 2024 02:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants