मैं एक डीएजी स्थापित करने की कोशिश कर रहा हूं जहां हर मिनट एक कार्य चलाया जाता है, और फिर 5 वें मिनट (1 मिनट के कार्य से ठीक पहले) पर एक और कार्य चलाया जाता है। यह वास्तव में सिर्फ परीक्षण है, मैं इतने कम अंतराल में नौकरी चलाने की योजना नहीं बना रहा हूं।

नेत्रहीन, मेरा डीएजी इस तरह दिखता है:

DAG

और कोड ही इस तरह है:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_minute"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_task.set_downstream(branch_minute)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)

मुझे जो समस्या हो रही है, वह यह है कि 5 वें मिनट में, एयरफ्लो 1 मिनट के कार्य को छोड़ देता है:

enter image description here

मैंने बिना ज्यादा सफलता के ट्रिगर_रूल सेटिंग्स के साथ खेलने की कोशिश की है।

कोई विचार क्या गलत है? अगर यह मायने रखता है तो मैं एयरफ्लो 1.10 का उपयोग कर रहा हूं।

1
Georgi Raychev 18 अक्टूबर 2018, 09:31

1 उत्तर

सबसे बढ़िया उत्तर

चूंकि आप 5 मिनट के कार्य के लिए एक अलग निष्पादन पथ का अनुसरण करते हैं, इसलिए एक मिनट का कार्य छोड़ दिया जाता है। यह आरेख से थोड़ा काउंटर सहज है लेकिन निष्पादन के साथ केवल 1 पथ है।

तो आपको क्या करना है कि शुरुआत में शाखा है, एक पथ झूठे के लिए एक डमी ऑपरेटर की ओर जाता है और एक पथ 5 मिनट के कार्य की ओर जाता है, हालांकि 5 मिनट का कार्य और डमी ऑपरेटर दोनों 1 मिनट में आगे बढ़ेंगे कार्य।

इस तरह डमी कार्य छोड़ दिया जाता है लेकिन निष्पादन प्रवाह 1 मिनट के कार्य में समाप्त हो जाता है, भले ही निष्पादन पथ का चयन किया गया हो।

from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator  import DummyOperator
from airflow.operators.bash_operator   import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_false_1"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_false_1 = DummyOperator( task_id= "branch_false_1", dag=dag )

branch_task.set_downstream(branch_false_1)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
branch_false_1.set_downstream(branch_minute)
1
Georgi Raychev 18 अक्टूबर 2018, 08:32