我在异步循环中获取消息,并从每条消息中解析row
,它是字典。我想将这些行写入parque。为了实现这一点,我做了以下操作:
fields = [('A', pa.float64()), ('B', pa.float64()), ('C', pa.float64()), ('D', pa.float64())]
schema = pa.schema(fields)
pqwriter = pq.ParquetWriter('sample.parquet', schema=schema, compression='gzip')
#async cycle starts here
async for message in messages:
row = {'A': message[1], 'B': message[2], 'C': message[3], 'D': message[4]}
table = pa.Table.from_pydict(row)
pqwriter.write_table(table)
#end of async cycle
pqwriter.close()
一切都很完美,但是生成的parquet-file大小约为5 Mb,而如果我执行对csv-file的写入,我的文件大小约为200 Kb。我已经检查了数据类型是否相同(csv的列是floatt,parquet的列是floats)
为什么在相同的数据下,我的镶木地板比csv大得多?
Parquet是一种列式格式,经过优化可用于写入批次数据。它不适用于逐行写入数据。
它不太适合您的用例。您可能希望以更合适的格式(例如avro、csv)编写中间数据行,然后将数据批量转换为parquet。
我取得了如下预期的结果:
chunksize = 1e6
data = []
fields = #list of tuples
schema = pa.schema(fields)
with pq.ParquetWriter('my_parquet', schema=schema) as writer:
#async cycle starts here
rows = #dict with structure as in fields
data.extend(rows)
if len(data)>chunksize:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
data = []
#end of async cycle
if len(data)!=0:
data = pd.DataFrame(data)
table = pa.Table.from_pandas(data, schema=schema)
writer.write_table(table)
writer.close()
这段代码实际上满足了我的需要。