提问者:小点点

在MLRun中读取parquet,"无法推断Parquet的架构。必须手动指定。"


当我摄取/写入数据到FeatureSet(MLRun FeatureStore的一部分)并且通过PySpark读取数据时,我遇到了这个问题(它似乎是无效的拼花)。见例外:

AnalysisException                         Traceback (most recent call last)
<ipython-input-8-a8c688f9ceb5> in <module>
----> 1 newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
      2 newDF1.show()

/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
    299                        int96RebaseMode=int96RebaseMode)
    300 
--> 301         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    302 
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,

/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1320         answer = self.gateway_client.send_command(command)
   1321         return_value = get_return_value(
-> 1322             answer, self.gateway_client, self.target_id, self.name)
   1323 
   1324         for temp_arg in temp_args:

/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Unable to infer schema for Parquet. It must be specified manually.

请参阅源代码的关键部分(生成异常):

...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False),NoSqlTarget(name="s2")],with_defaults=False)
feature_set1.save()
fstore.ingest(f"store://feature-sets/{project_name}/FS-ingest", sparkDF,spark_context=spark, overwrite=True)
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()

你看到类似的问题了吗?

注意:Parquet路径包含parquet文件(所有文件都是有效的),这意味着摄取是成功的。


共1个答案

匿名用户

源代码(parquet的用法)包含错误。FeatureSet使用了两个目标,在线和离线存储,在这种情况下,spark.read. parquet也影响了在线存储,其中格式与parquet不同。我看到了两种可能的解决方案。

1.更新拼花读取部分

很简单的方法,如何解决问题。简单,扩展/添加当前路径/parque,查看更新代码:

...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
newDF1.show()
...

2.删除online/NoSql目标

这是关于FeatureSet定义的更新(删除NoSqlTarget(name="s2"))并保留spark.read. parque部分,请参阅更新的代码:

...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False)],with_defaults=False)
feature_set1.save()

newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()
...

BTW:相同的解决方案也适用于此不同的异常,其中包含更准确的问题描述(鉴于在线和离线商店的不同路径):

Py4JJavaError: An error occurred while calling o3233.parquet.
: java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
    v3io://projects/spark-parquet-test2/featurestore/FS-ingest/nosql/sets/FS-ingest/1674747966078_84
    v3io://projects/spark-parquet-test2/featurestore/FS-ingest/parquet/sets/FS-ingest/1674747966078_84

If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
    at scala.Predef$.assert(Predef.scala:223)
    at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:178)
    at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:110)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:158)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:73)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)