提问者:小点点

气流-子DAG中长时间运行的任务在一小时后标记为失败


我有一个长时间运行的步骤(通常约2小时,但根据运行的设备不同而有所不同)气流中的子DAG。低于1.7。1.3,该步骤将持续导致气流-736,并且当所有步骤成功时,子DAG将在“运行”状态下停止。我们可以解决这个问题,因为通过在数据库中手动将SubDAG操作符标记为成功(而不是运行),在SubDAG之后没有步骤。

我们现在正在测试气流1.8.1,通过执行以下操作进行升级:

  1. 关闭我们的调度器和工人
  2. 通过pip,卸载气流并安装apache-airflow(版本1.8.1)
  3. 运行气流升级db
  4. 运行气流调度器和工作人员

在系统未被触动的情况下,同样的DAG现在在长时间运行的任务达到1小时标记后(奇怪的是,不是3600秒之后-它可以在小时标记后30到90秒)出现100%的失败,并显示消息“Executor reports task instance finished(failed)”虽然任务说它正在运行。任务是否在外部终止?“。但是,任务本身继续在辅助进程上运行。尽管实际任务运行良好,但调度程序基于数据库错误地认为任务失败(请参见此jobs.py行)之间存在分歧。

我已经确认,在airflow数据库的task_实例表中,状态是“失败的”。因此,我想知道当任务本身仍在运行时,是什么将任务状态设置为失败的。

以下是触发问题的示例dag:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)

共1个答案

匿名用户

如果您确实使用Celery和Redis运行,请查看Celery的可见性超时设置,并将其增加到任务的预期结束时间之外。

虽然我们将芹菜配置为延迟确认任务,但它仍然存在任务消失的问题。我们认为这是芹菜中的一种病菌。