使用PySpark,我有一些代码可以运行一系列查询。
for index, query in enumerate(query_map):
spark_dataframe.filter(
query).write.csv('s3://OutputBucket/Csvs/Query_{}'.format(index)
我是spark的新手,但我知道每个分区都在将单独的csv文件写入名为Query.[index]
的目录。现在我想收集这些文件并将它们放入一个数据框中。
import boto3
import pandas
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("OutputBucket")
#Get all csv names
csvs = [
"s3://OutputBucket/Csvs/"+\
str(i.key) for i in my_bucket.objects.filter(Prefix='Query/')]
to_concat = []
#Turn them into a dataframe
for csv in csvs:
try:
to_put_in.append(pandas.read_csv(csv))
except pandas.errors.EmptyDataError:
pass
#Join dataframe
my_big_dataframe = pandas.concat(to_concat)
问题是Pyspark写了很多空文件。所以我的代码花了很多时间试图读取一个空的csv文件,结果抛出了一个异常。
据我所知,df_火花。toPandas()
函数违背了spark的目的,因为它将spark放入驱动程序内存,并且没有利用每个分区的IO并行化。使用合并
也违背了spark的目的。因此,写一堆CSV,然后手动读取它们并不是一个糟糕的主意。
我的问题是,是否有一种方法可以跳过pyspark编写的空csv文件:
>
也许boto3可以先按大小排序,然后迭代,直到我们清空文件?
有什么办法在PySpark做到这一点而不击败pyspark?
几个月前,我遇到了类似的问题。用过这样的东西
# get the number of non-empty partitions in dataframe df
numNonEmptyPartitions = (df.rdd.glom().map(lambda x: 1 if len(x)>0 else 0).
reduce(lambda x,y: x+y))
df = df.coalesce(numNonEmptyPartitions)
现在,您将拥有所有非空分区。