提问者:小点点

无法在气流任务之间传递数据帧


在气流任务之间传递熊猫数据帧失败。尝试了以下代码

try:
    from datetime import timedelta, datetime
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    import pandas as pd
except Exception as e:
    print(e)

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
    "retries": 0,
    "retry_delay": timedelta(minutes=1),
    'email': ['test@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}


def read_file(**context):
    path = "/opt/airflow/common/netflix_titles.csv"
    df = pd.read_csv(path, encoding="ISO-8859-1")
    context['ti'].xcom_push(key='df', value=df)


def process_type(**context):
    df = context.get("ti").xcom_pull(key="df")
    print(df)

dag=dag(dag_id=“dag-READ-CSV”,schedule_interval=“@once”,default_args=default_args,catchup=False)

read_file=Python算子(task_id=read_file,python_callable=read_file,dag=dag)

process_type=Python算子(task_id=process_title,python_callable=process_type,dag=dag)

读取文件

错误详细信息:

*** Reading local file: /opt/airflow/logs/DAG-READ-CSV/read_file/2021-05-15T02:16:50.764650+00:00/1.log
[2021-05-15 02:16:52,600] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,618] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,619] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,620] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-05-15 02:16:52,621] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,629] {taskinstance.py:1089} INFO - Executing <Task(PythonOperator): read_file> on 2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,634] {standard_task_runner.py:52} INFO - Started process 527 to run task
[2021-05-15 02:16:52,639] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG-READ-CSV', 'read_file', '2021-05-15T02:16:50.764650+00:00', '--job-id', '197', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/airflow_practice_7_practice_read_csv.py', '--cfg-path', '/tmp/tmp5jr0dror', '--error-file', '/tmp/tmpn9o4ulj3']
[2021-05-15 02:16:52,644] {standard_task_runner.py:77} INFO - Job 197: Subtask read_file
[2021-05-15 02:16:52,696] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [running]> on host f526bca85af4
[2021-05-15 02:16:52,745] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=test@gmail.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=DAG-READ-CSV
AIRFLOW_CTX_TASK_ID=read_file
AIRFLOW_CTX_EXECUTION_DATE=2021-05-15T02:16:50.764650+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,766] {xcom.py:238} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-05-15 02:16:52,767] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/airflow_practice_7_practice_read_csv.py", line 23, in read_file
    context['ti'].xcom_push(key='df', value=df)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1928, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 88, in set
    value = XCom.serialize_value(value)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 235, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'DataFrame' is not JSON serializable
[2021-05-15 02:16:52,772] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=DAG-READ-CSV, task_id=read_file, execution_date=20210515T021650, start_date=20210515T021652, end_date=20210515T021652
[2021-05-15 02:16:52,814] {local_task_job.py:146} INFO - Task exited with return code 1

共1个答案

匿名用户

正如注释中建议的那样,您可以使用像redis这样的东西作为缓存。要将数据帧存储在这样的数据结构存储中,您可以使用像pyrow这样的工具序列化您的df。

import redis
import pyarrow as pa

redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
context = pa.default_serialization_context()



def read_file(**context):
    path = "/opt/airflow/common/netflix_titles.csv"
    df = pd.read_csv(path, encoding="ISO-8859-1")
    r.set("name", context.serialize(df).to_buffer().to_pybytes())

def process_type(**context):
    df = context.deserialize(r.get("name"))
    print(df)


如果您在上一个任务中没有删除密钥,它可能会一直存在,因此我建议您提供一个唯一的名称,该名称可以通过在名称后添加kwargs['ts_nodash']并在上一个任务中删除来创建。