मैं काम करने के लिए एयरफ्लो एक्सटर्नल टास्क सेंसर प्राप्त करने की कोशिश कर रहा हूं, लेकिन अभी तक इसे पूरा करने में सक्षम नहीं हूं, यह हमेशा अटका हुआ लगता है और कभी खत्म नहीं होता है इसलिए डीएजी अगले कार्य पर आगे बढ़ सकता है।

यहां वह कोड है जिसका मैं परीक्षण करने के लिए उपयोग कर रहा हूं:


DEFAULT_ARGS = {
    'owner': 'NAME',
    'depends_on_past': False,
    'start_date': datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

external_watch_dag = DAG(
    'DAG-External_watcher-Test',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

start_op = DummyOperator(
    task_id='start_op',
    dag=external_watch_dag
)


trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag
)

external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_delta=timedelta(minutes=-1),
    # execution_date_fn=datetime(2019, 9, 25),
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

end_op = DummyOperator(
    task_id='end_op',
    dag=external_watch_dag
)

start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op


# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
    'DAG-Dummy',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

dummy_task = BashOperator(
    task_id='dummy_task',
    bash_command='sleep 10',
    dag=dummy_dag
)

मैंने इस कोड को कई तरीकों से बदलने की कोशिश की है लेकिन बाहरी टास्क सेंसर के साथ कोई सफलता नहीं मिली है।

क्या कोई इस समस्या को हल करने और बाहरी टास्क सेंसर को ठीक से काम करने के बारे में जानता है? मैंने यह भी पढ़ा है कि बाहरी टास्क सेंसर का उपयोग करते समय शेड्यूलिंग अंतराल के माध्यम से समस्याएं उत्पन्न हो सकती हैं, क्या यह संभव है कि इस मुद्दे का हिस्सा यह है कि डीएजी दोनों में schedule_interval=None है?

मुझे यह दोनों डीएजी के साथ काम करने के लिए मिला था जो एक ही schedule_interval पर सेट थे, लेकिन यह उत्पादन में काम नहीं करेगा। लक्ष्य है कि मुख्य DAG, external-watch-dag को नियमित समय पर रखा जाए और DAG-Dummy को चलाने के दौरान DAG के साथ ट्रिगर किया जाए -डमी में ही schedule_interval=None है।

कोई भी मदद बहुत ही सराहनीय होगी।

3
mattc-7 25 सितंबर 2019, 18:05

1 उत्तर

डिफ़ॉल्ट रूप से ExternalTaskSensor उसी निष्पादन तिथि के साथ external_dag_id की निगरानी करेगा जो सेंसर DAG करता है। execution_delta के साथ आप सेंसर डैग और बाहरी डैग के बीच एक टाइम डेल्टा सेट कर सकते हैं ताकि यह मॉनिटर करने के लिए सही execution_date की तलाश कर सके। यह बहुत अच्छा काम करता है जब दोनों डैग एक शेड्यूल में चलाए जाते हैं क्योंकि आप वास्तव में इस टाइमडेल्टा को जानते हैं।

समस्या: जब कोई डैग मैन्युअल रूप से या किसी अन्य डैग द्वारा ट्रिगर किया जाता है, तो आप निश्चित रूप से इन दोनों में से किसी भी डैग की सटीक निष्पादन तिथि को नहीं जान सकते हैं।

समाधान: क्योंकि आप TriggerDagRunOperator का उपयोग कर रहे हैं, आप execution_date पैरामीटर सेट कर सकते हैं। यह सुनिश्चित करेगा कि आपके डैग और बाहरी डैग से निष्पादन की तारीख समान है। docs< से /ए>:

एक्ज़ीक्यूशन_डेट (str या datetime.datetime) - डैग के लिए एक्ज़ीक्यूशन की तारीख (टेम्पलेट)

तो आपका कोड इस तरह दिखेगा:

trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag,
    execution_date="{{ execution_date }}",  # Use the template to get the current execution date
)
external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)
0
pablosjv 6 अक्टूबर 2020, 16:32