在气流任务之间传递熊猫数据帧失败。尝试了以下代码
try:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
except Exception as e:
print(e)
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
"retries": 0,
"retry_delay": timedelta(minutes=1),
'email': ['test@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
}
def read_file(**context):
path = "/opt/airflow/common/netflix_titles.csv"
df = pd.read_csv(path, encoding="ISO-8859-1")
context['ti'].xcom_push(key='df', value=df)
def process_type(**context):
df = context.get("ti").xcom_pull(key="df")
print(df)
dag=dag(dag_id=“dag-READ-CSV”,schedule_interval=“@once”,default_args=default_args,catchup=False)
read_file=Python算子(task_id=read_file,python_callable=read_file,dag=dag)
process_type=Python算子(task_id=process_title,python_callable=process_type,dag=dag)
读取文件
错误详细信息:
*** Reading local file: /opt/airflow/logs/DAG-READ-CSV/read_file/2021-05-15T02:16:50.764650+00:00/1.log
[2021-05-15 02:16:52,600] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,618] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,619] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,620] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-05-15 02:16:52,621] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,629] {taskinstance.py:1089} INFO - Executing <Task(PythonOperator): read_file> on 2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,634] {standard_task_runner.py:52} INFO - Started process 527 to run task
[2021-05-15 02:16:52,639] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG-READ-CSV', 'read_file', '2021-05-15T02:16:50.764650+00:00', '--job-id', '197', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/airflow_practice_7_practice_read_csv.py', '--cfg-path', '/tmp/tmp5jr0dror', '--error-file', '/tmp/tmpn9o4ulj3']
[2021-05-15 02:16:52,644] {standard_task_runner.py:77} INFO - Job 197: Subtask read_file
[2021-05-15 02:16:52,696] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [running]> on host f526bca85af4
[2021-05-15 02:16:52,745] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=test@gmail.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=DAG-READ-CSV
AIRFLOW_CTX_TASK_ID=read_file
AIRFLOW_CTX_EXECUTION_DATE=2021-05-15T02:16:50.764650+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,766] {xcom.py:238} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-05-15 02:16:52,767] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/airflow_practice_7_practice_read_csv.py", line 23, in read_file
context['ti'].xcom_push(key='df', value=df)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1928, in xcom_push
session=session,
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 88, in set
value = XCom.serialize_value(value)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 235, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'DataFrame' is not JSON serializable
[2021-05-15 02:16:52,772] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=DAG-READ-CSV, task_id=read_file, execution_date=20210515T021650, start_date=20210515T021652, end_date=20210515T021652
[2021-05-15 02:16:52,814] {local_task_job.py:146} INFO - Task exited with return code 1
正如注释中建议的那样,您可以使用像redis这样的东西作为缓存。要将数据帧存储在这样的数据结构存储中,您可以使用像pyrow这样的工具序列化您的df。
import redis
import pyarrow as pa
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
context = pa.default_serialization_context()
def read_file(**context):
path = "/opt/airflow/common/netflix_titles.csv"
df = pd.read_csv(path, encoding="ISO-8859-1")
r.set("name", context.serialize(df).to_buffer().to_pybytes())
def process_type(**context):
df = context.deserialize(r.get("name"))
print(df)
如果您在上一个任务中没有删除密钥,它可能会一直存在,因此我建议您提供一个唯一的名称,该名称可以通过在名称后添加kwargs['ts_nodash']
并在上一个任务中删除来创建。