Как установить два DAG в воздушном потоке с помощью ExternalTaskSensor?

cimujo спросил: 31 июля 2018 в 09:48 в: triggers

У меня два DAG:

DAG_CPS

dag = DAG(
  'DAG_CPS',
  default_args=default_args,
  dagrun_timeout=timedelta(hours=2),
  schedule_interval=None,
  max_active_runs=1
) 
tmp1_cap_pes_sap = PostgresOperatorWithTemplatedParams(
task_id='tmp1_cap_pes_sap',
sql='./SQL/A2050.sql',
postgres_conn_id='xxxx',
dag=dag) 
...

DAG_SAS

dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
max_active_runs=1
)wait_for_DAG_CPS = ExternalTaskSensor(
task_id='wait_for_DAG_CPS',
external_dag_id='DAG_CPS',
external_task_id='tmp1_cap_pes_sap',
execution_delta=None,
execution_date_fn=None,
dag=dag)

Я запускаю обе группы DAG вручную из Интернета, задача tmp1_cap_pes_sap заканчивается нормально

Attribute       Value
dag_id          DAG_CPS
duration        None
end_date        2018-08-24 11:04:28.177221
execution_date  2018-08-24 11:04:18.113031

, но в DAG_SAS я получаю следующий журнал и никогда не запускаю

[2018-08-24 11:03:55,592] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:03:55,592] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ... 
[2018-08-24 11:04:55,642] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:04:55,641] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ... 
[2018-08-24 11:05:55,718] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:05:55,717] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ... 
[2018-08-24 11:06:55,799] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:06:55,797] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ... 
[2018-08-24 11:07:55,853] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:07:55,853] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ... 

что не так в моем коде?

Решено

спасибо на @Alessandro Cosentino за помощь мне. Это код после его исправления, в основном он никогда не будет работать, если я запустил DAG вручную

DAG_CPS

default_args = {
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 2,
'retry_delay': timedelta(minutes=1)
}dag = DAG(
'DAG_CPS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)

DAG_SAS

dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)

1 ответ

Есть решение
Alessandro Cosentino ответил: 31 июля 2018 в 02:35

Поскольку вы запускаете задачи вручную, они будут выполняться с другим execution_date, поэтому ExternalTaskSensor не обнаруживает завершение первой задачи группы обеспечения доступности баз данных.

Попробуйте запустить их по одному и тому же расписанию и посмотрите, работает ли он.

Я полагаю, что это проблема из-за существования execution_delta и execution_date_fn аргументы, которые есть на самом деле для синхронизации двух DAG. Смотрите документацию о поведении этих двух аргументов.

cimujo ответил: 03 августа 2018 в 11:09
спасибо @Alessandro Cosentino, это работает, это мой новый код DAG_CPS default_args = {'depen_on_past': False, 'start_date': airflow.utils.dates.days_ago (2), 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta (minutes = 1)} dag = DAG ('DAG_CPS', default_args = default_args, dagrun_timeout = timedelta (minutes = 5), schedule_interval = '/ 10', max_active_runs = 1) DAG_SAS dag = DAG ('DAG_SAS', default_args = default_args, dagrun_timeout = timedelta (минут = 5), schedule_interval = '/ 10', max_active_runs = 1)
Alessandro Cosentino ответил: 03 августа 2018 в 11:23
@cimujo Круто, не забудьте принять ответ, если вы удовлетворены решением.
Alessandro Cosentino ответил: 03 августа 2018 в 11:29
@cimujo Также лучше, если вместо того, чтобы комментировать, вы редактируете вопрос, добавляя раздел с новым кодом, чтобы он был правильно отформатирован ;-)