当我摄取/写入数据到FeatureSet(MLRun FeatureStore的一部分)并且通过PySpark读取数据时,我遇到了这个问题(它似乎是无效的拼花)。见例外:
AnalysisException Traceback (most recent call last)
<ipython-input-8-a8c688f9ceb5> in <module>
----> 1 newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
2 newDF1.show()
/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
299 int96RebaseMode=int96RebaseMode)
300
--> 301 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
302
303 def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1320 answer = self.gateway_client.send_command(command)
1321 return_value = get_return_value(
-> 1322 answer, self.gateway_client, self.target_id, self.name)
1323
1324 for temp_arg in temp_args:
/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Unable to infer schema for Parquet. It must be specified manually.
请参阅源代码的关键部分(生成异常):
...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False),NoSqlTarget(name="s2")],with_defaults=False)
feature_set1.save()
fstore.ingest(f"store://feature-sets/{project_name}/FS-ingest", sparkDF,spark_context=spark, overwrite=True)
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()
你看到类似的问题了吗?
注意:Parquet路径包含parquet文件(所有文件都是有效的),这意味着摄取是成功的。
源代码(parquet的用法)包含错误。FeatureSet使用了两个目标,在线和离线存储,在这种情况下,spark.read. parquet也影响了在线存储,其中格式与parquet不同。我看到了两种可能的解决方案。
1.更新拼花读取部分
很简单的方法,如何解决问题。简单,扩展/添加当前路径/parque
,查看更新代码:
...
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest/parquet")
newDF1.show()
...
2.删除online/NoSql目标
这是关于FeatureSet定义的更新(删除NoSqlTarget(name="s2")
)并保留spark.read. parque部分,请参阅更新的代码:
...
feature_set1=fstore.FeatureSet(name="FS-ingest",entities=[fstore.Entity('app'),fstore.Entity('id')],engine="spark",timestamp_key='time')
feature_set1.set_targets(targets=[ParquetTarget(name="s1",partitioned=False)],with_defaults=False)
feature_set1.save()
newDF1 = spark.read.parquet(f"v3io://projects/{project_name}/FeatureStore/FS-ingest")
newDF1.show()
...
BTW:相同的解决方案也适用于此不同的异常,其中包含更准确的问题描述(鉴于在线和离线商店的不同路径):
Py4JJavaError: An error occurred while calling o3233.parquet.
: java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:
v3io://projects/spark-parquet-test2/featurestore/FS-ingest/nosql/sets/FS-ingest/1674747966078_84
v3io://projects/spark-parquet-test2/featurestore/FS-ingest/parquet/sets/FS-ingest/1674747966078_84
If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:178)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:110)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:158)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:73)
at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169)