所以我有一个问题,在写入分区Parquet文件时,DataFrame中的某些行会被丢弃。
以下是我的步骤:
mode=append
阅读的第一步按预期工作,没有解析问题。对于质量检查,我执行以下操作:
对于date='2012-11-22'
的特定分区,对CSV文件、加载的DataFrame和parque文件执行计数。
以下是一些使用pyspark重现的代码:
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
我也试着写HDFS,结果是一样的。这发生在多个分区上。默认/空分区中没有记录。上面的logs_df是准确无误的。
我尝试的第二个实验是编写一个未分区的parque文件。上述代码的唯一区别是省略了的分区By()
:
logs_df. write.parque('s3://…/logs_2012_parquet/',mode='append')
如上所述加载此镶木地板集并运行计数产生了date='2012-11-22'
和其他日期的5000的正确结果。将模式设置为覆盖
或不设置(使用默认值)会导致相同的数据丢失。
我的环境是:
我将非常感谢修复或解决方法或使用Spark转换为拼花文件的其他方式。
谢谢,
编辑:我无法重现第二个实验。所以让我们说分区和未分区在写入Parquet或JSON时似乎都会丢弃记录。
所以谜团肯定在于模式定义。然而,出乎意料的是,它不是日期或时间戳。事实上是布尔值。
我从Redshift导出了CSV,它将布尔写成t
和f
。当我检查推断的模式时,这些字段被标记为字符串类型。在CSV文件中使用true
和false
的简单测试将它们识别为布尔值。
所以我以为日期和时间戳解析会像往常一样出错,但这是布尔值。吸取了教训。
谢谢你的指点。