-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed issue of new dag getting old dataset events. #39603
base: main
Are you sure you want to change the base?
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
tests/jobs/test_scheduler_job.py
Outdated
@@ -3776,6 +3776,11 @@ def test_create_dag_runs_datasets(self, session, dag_maker): | |||
dataset1 = Dataset(uri="ds1") | |||
dataset2 = Dataset(uri="ds2") | |||
|
|||
# Create DAG before the arrival of dataset events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be part of its own, new test?
test_new_dagrun_ignores_old_datasets
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RNHTTR I've added a new unit test. Please review it and let me know if it needs any adjustments.
Unit test is checking newly added Dag should just get the Dataset arrived after it was created(event3) and not the dataset events(event 1 and event2)
tests/jobs/test_scheduler_job.py
Outdated
@@ -3811,20 +3816,40 @@ def test_create_dag_runs_datasets(self, session, dag_maker): | |||
) | |||
session.add(event2) | |||
|
|||
# Create a third event, creation time is more recent, but data interval is even older |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is reproducible with a single, previous DAG run having a dataset event, shouldn't the test only require 3 total DAG runs?
- One DAG run from the original DAG (let's call this Run A)
- One more DAG run (Run B) after the new DAG is introduced, which triggers the new DAG's DAG run which should only have dataset info from Run B.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly designed test case perfectly addresses the scenario you have mentioned. Let me know if i missing anything.
cc @uranusjr |
To be honest I don’t really consider the current behaviour (in 2.9.1) a bug, but I can see why people expect it this way. I think we probably need an entry in |
Also the tests need improvements, as mentioned in above reviews. |
airflow/jobs/scheduler_job_runner.py
Outdated
if previous_dag_run: | ||
dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date) | ||
else: | ||
dataset_event_filters.append( | ||
DatasetEvent.timestamp >= DagScheduleDatasetReference.created_at | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm just wondering if the difference between >=
and >
is something we should worry about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, moved it back to >
@@ -3793,6 +3793,10 @@ def test_create_dag_runs_datasets(self, session, dag_maker): | |||
dataset1 = Dataset(uri="ds1") | |||
dataset2 = Dataset(uri="ds2") | |||
|
|||
with dag_maker(dag_id="datasets-consumer-single", schedule=[dataset1]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this block being moved?
assert session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one() is not None | ||
# dag2 should not be triggered since it depends on both dataset 1 and 2 | ||
assert session.query(DagRun).filter(DagRun.dag_id == dag2.dag_id).one_or_none() is None | ||
assert session.query(DagRun).filter(DagRun.dag_id == dag3.dag_id).one_or_none() is None | ||
# dag3 DDRQ record should be deleted since the dag run was triggered | ||
assert session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none() is None | ||
assert session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one_or_none() is None | ||
|
||
assert dag2.get_last_dagrun().creating_job_id == scheduler_job.id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be any changes to this test (test_create_dag_runs_datasets
) at all?
# Create DAG after dataset events. | ||
with dag_maker(dag_id="datasets-consumer", schedule=[dataset]): | ||
pass | ||
dag = dag_maker.dag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this and following references to dag
be consumer_dag
just to make it more obvious?
# we don't have __eq__ defined on DatasetEvent because... given the fact that in the future | ||
# we may register events from other systems, dataset_id + timestamp might not be enough PK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# we don't have __eq__ defined on DatasetEvent because... given the fact that in the future | |
# we may register events from other systems, dataset_id + timestamp might not be enough PK | |
# __eq__ isn't defined on DatasetEvent |
IMO this is sufficient
If a new dataset triggreed DAG is created for an already existing dataset. (Dataset has already existing dataset events) DAG see all dataset events from very first event for dataset.
Fixes: #39456