提问者:小点点

在Python中逐行写入拼花地板


我在异步循环中获取消息,并从每条消息中解析row,它是字典。我想将这些行写入parque。为了实现这一点,我做了以下操作:

fields = [('A', pa.float64()), ('B', pa.float64()), ('C', pa.float64()), ('D', pa.float64())]
schema = pa.schema(fields)
pqwriter = pq.ParquetWriter('sample.parquet', schema=schema, compression='gzip')

#async cycle starts here
async for message in messages:
   row = {'A': message[1], 'B': message[2], 'C': message[3], 'D': message[4]}
   table = pa.Table.from_pydict(row)
   pqwriter.write_table(table)
#end of async cycle
pqwriter.close()

一切都很完美,但是生成的parquet-file大小约为5 Mb,而如果我执行对csv-file的写入,我的文件大小约为200 Kb。我已经检查了数据类型是否相同(csv的列是floatt,parquet的列是floats)

为什么在相同的数据下,我的镶木地板比csv大得多?


共2个答案

匿名用户

Parquet是一种列式格式,经过优化可用于写入批次数据。它不适用于逐行写入数据。

它不太适合您的用例。您可能希望以更合适的格式(例如avro、csv)编写中间数据行,然后将数据批量转换为parquet。

匿名用户

我取得了如下预期的结果:

chunksize = 1e6
data = []
fields = #list of tuples
schema = pa.schema(fields)

with pq.ParquetWriter('my_parquet', schema=schema) as writer:
#async cycle starts here
rows = #dict with structure as in fields
data.extend(rows)

if len(data)>chunksize:
   data = pd.DataFrame(data)
   table = pa.Table.from_pandas(data, schema=schema)
   writer.write_table(table)
   data = []
#end of async cycle
if len(data)!=0:
   data = pd.DataFrame(data)
   table = pa.Table.from_pandas(data, schema=schema)
   writer.write_table(table)
writer.close()

这段代码实际上满足了我的需要。