提问者:小点点

如果上一个实例dag运行处于运行状态,则跳过当前dag运行


我有一个约会,每小时安排一次。让我们说01:00,02:00,03:00。假设02:00am被选中,但是如果01:00am dag run仍在进行中,需要取消02:00am实例。

我正在尝试这个代码。

local_tz = pendulum.timezone("America/Chicago")

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
    'run_as_user': user_id
}

dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
          catchup=False,
          max_active_runs=1
          schedule_interval='0 * * * *',  #schedule_interval='@hourly'
          default_args=default_args
)

def check_prev_dag_run_status(**kwargs):
    curr_dag_id = kwargs['dag'].dag_id
    curr_task_id = kwargs['task'].task_id
    newdate = kwargs['execution_date']
    ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
    state = ti.current_state()
    if state=="running":
        raise ValueError("Not all previous tasks successfully completed")
        
check_success_task = PythonOperator(
    task_id='check_status',
    python_callable= check_prev_dag_run_status,
    provide_context=True,
    dag=dag
)

run_this_0 = BashOperator(
    task_id='run_shell',
        bash_command="ksh runshellscript.ksh",
        execution_timeout=None,
        dag=dag 
  )

我一直收到错误消息

[2020-11-17 12:30:07,337]{taskinstance.py:1150}ERROR-'str'对象没有属性'dag_id'

Traceback(最近一次调用最后):File"/airflow/bd/pyenv/pycdr/lib/python3.7/site-包/airflow/模型/taskinstance.py",第984行,_run_raw_task结果=task_copy.execute(上下文=上下文)

file"/airflow/bd/pyenv/pycdr/lib/python3.7/site-包/airflow/操作员/python_operator.py",第113行,在执行return_value=self.execute_callable()

文件“/aiffair/bd/pyenv/pycdr/lib/python3.7/site packages/aiffair/operators/python_operator.py”,第118行,在execute_ca中

*请建议我

  1. 将参数传递给airflow时缺少的内容。模型。taskinstance。TaskInstance

共2个答案

匿名用户

airflow.models.taskinstance.TaskInstance只接受两个参数,任务execution_date,而不是代码中的3。此外,任务不是task_id而是定义的任务,在您的示例中,我猜是run_this_0。您需要传递上次任务运行的execution_date,而不是当前任务。另外,状态可能不同于跑步,仍然不成功,所以我也会改变这一点。

综上所述,下面的代码可以用来检查上一次DAG运行的run\u this\u 0是否成功:

def check_prev_dag_run_status(**kwargs):
    newdate = kwargs['prev_execution_date']
    ti = TaskInstance(run_this_0, newdate)
    state = ti.current_state()
    if state!="success":
        raise ValueError("Not all previous tasks successfully completed")

匿名用户

为此任务将任务并发设置为1。新的任务将不会运行,除非先前的运行尚未开始。

设置依赖于过去的true。这有一个缺点,如果一个批次失败,下一个批次将无法运行。

使用具有模式重新安排的外部任务传感器来等待早期批处理完成。