Recently, I've been trying to coordinate two Airflow DAGs so that one only runs, on its own schedule, if the other DAG (which runs daily) has been successful.
In today's tutorial, I will walk you through the use case and demonstrate how to achieve the desired behavior in three different ways; two using the ExternalTaskSensor
and another that uses a custom approach with PythonOperator
.
Now let's start with our use case involving two Airflow DAGs.
The first DAG, my_daily_dag
runs every day at 5am UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
The second DAG, my_hourly_dag
runs every hour, between 6 am and 8 pm UTC.
from datetime import datetime, timedelta
from pathlib import Pathfrom airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
In our use case, we would like my_hourly_dag
run alone if my_daily_dag
has been executed successfully within the current date. If not then my_hourly_dag
should be omitted. It is important to mention here that we do not want to activate my_hourly_dag
as soon as my_daily_dag
Is successful. That would be possible with TriggerDagRun
…