我有一个约会,每小时安排一次。让我们说01:00,02:00,03:00。假设02:00am被选中,但是如果01:00am dag run仍在进行中,需要取消02:00am实例。
我正在尝试这个代码。
local_tz = pendulum.timezone("America/Chicago")
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 11, 15, tzinfo=local_tz),
'run_as_user': user_id
}
dag = DAG(os.path.basename(__file__).replace(".pyc", "").replace(".py", ""),
catchup=False,
max_active_runs=1
schedule_interval='0 * * * *', #schedule_interval='@hourly'
default_args=default_args
)
def check_prev_dag_run_status(**kwargs):
curr_dag_id = kwargs['dag'].dag_id
curr_task_id = kwargs['task'].task_id
newdate = kwargs['execution_date']
ti = TaskInstance(curr_dag_id, curr_task_id, newdate)
state = ti.current_state()
if state=="running":
raise ValueError("Not all previous tasks successfully completed")
check_success_task = PythonOperator(
task_id='check_status',
python_callable= check_prev_dag_run_status,
provide_context=True,
dag=dag
)
run_this_0 = BashOperator(
task_id='run_shell',
bash_command="ksh runshellscript.ksh",
execution_timeout=None,
dag=dag
)
我一直收到错误消息
[2020-11-17 12:30:07,337]{taskinstance.py:1150}ERROR-'str'对象没有属性'dag_id'
Traceback(最近一次调用最后):File"/airflow/bd/pyenv/pycdr/lib/python3.7/site-包/airflow/模型/taskinstance.py",第984行,_run_raw_task结果=task_copy.execute(上下文=上下文)
file"/airflow/bd/pyenv/pycdr/lib/python3.7/site-包/airflow/操作员/python_operator.py",第113行,在执行return_value=self.execute_callable()
文件“/aiffair/bd/pyenv/pycdr/lib/python3.7/site packages/aiffair/operators/python_operator.py”,第118行,在execute_ca中
*请建议我
airflow.models.taskinstance.TaskInstance
只接受两个参数,任务
和execution_date
,而不是代码中的3。此外,任务不是task_id而是定义的任务,在您的示例中,我猜是run_this_0。您需要传递上次任务运行的execution_date
,而不是当前任务。另外,状态可能不同于跑步,仍然不成功,所以我也会改变这一点。
综上所述,下面的代码可以用来检查上一次DAG运行的run\u this\u 0
是否成功:
def check_prev_dag_run_status(**kwargs):
newdate = kwargs['prev_execution_date']
ti = TaskInstance(run_this_0, newdate)
state = ti.current_state()
if state!="success":
raise ValueError("Not all previous tasks successfully completed")
为此任务将任务并发设置为1。新的任务将不会运行,除非先前的运行尚未开始。
设置依赖于过去的true。这有一个缺点,如果一个批次失败,下一个批次将无法运行。
使用具有模式重新安排的外部任务传感器来等待早期批处理完成。