提问者:小点点

在使用一个任务运行dag时


default_args = {
"owner": "airflow_admin",
"start_date": datetime(2017, 8, 29),
"schedule_interval": "0 5 * * *"
 }
with DAG(
'my_first_dag',
catchup=False,
default_args=default_args,
schedule_interval=DAG_SCHEDULE_INTERVAL,
) as dag:

task1 = BashOperator(
task_id='id1',
bash_command=bash_.format('db1', 'tab1'))

task1

有没有人知道如何启动一个dag与一个任务请,上面的代码不工作,dag启动正确


共1个答案

匿名用户

确保您的缩进是正确的。 您的任务应在DAG上下文管理器的范围内定义,否则必须为任务提供DAG参数:

尝试:


default_args = {
    "owner": "airflow_admin",
    "start_date": datetime(2017, 8, 29),
    "schedule_interval": "0 5 * * *"
}
with DAG('my_first_dag', catchup=False, default_args=default_args) as dag:
    task1 = BashOperator(
        task_id='id1',
        bash_command=bash_.format('db1', 'tab1'))

或者不使用上下文管理器:

default_args = {
    "owner": "airflow_admin",
    "start_date": datetime(2017, 8, 29),
    "schedule_interval": "0 5 * * *"
}
dag = DAG('my_first_dag', catchup=False, default_args=default_args)
task1 = BashOperator(
        task_id='id1',
        bash_command=bash_.format('db1', 'tab1'),
        dag=dag)