提问者:小点点

Apache BeamJavaSDKSparkRunner写入拼花错误


我正在使用带有Java的Apache Beam。我正在尝试读取csv文件并使用本地模式在预部署的Spark env上使用SparkRunner将其写入parquet格式。DirectRunner一切正常,但SparkRunner根本不起作用。我正在使用maven阴影插件来构建一个胖jat。

代码如下:

Java:

public class ImportCSVToParquet{
-- ommitted
                File csv = new File(filePath);
                PCollection<String> vals = pipeline.apply(TextIO.read().from(filePath));

                String parquetFilename = csv.getName().replaceFirst("csv", "parquet");
                String outputLocation = FolderConventions.getRawFilePath(confETL.getHdfsRoot(), parquetFilename);

                PCollection<GenericRecord> processed = vals.apply(ParDo.of(new ProcessFiles.GenericRecordFromCsvFn()))
                        .setCoder(AvroCoder.of(new Config().getTransactionSchema()));

                LOG.info("Processed file will be written to: " + outputLocation);
                processed.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(conf.getTransactionSchema())).to(outputLocation));


        pipeline.run().waitUntilFinish();


}

POM依赖关系:

<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-parquet</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
/dependencies>

火花脚本:

spark-submit \
--class package.ImportCSVToParquet \
--master local[*] \
--executor-cores 2 \
--executor-memory 2g \
--driver-memory 2g \
--driver-cores 2 \
--conf spark.sql.codegen.wholeStage=false \
--conf spark.wholeStage.codegen=false \
--conf spark.sql.shuffle.partitions=2005 \
--conf spark.driver.maxResultSize=2g \
--conf spark.executor.memoryOverhead=4048 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
--conf "spark.driver.extraJavaOptions=-Djava.io.tmpdir=/path-to-tmp/" \
--conf "spark.driver.extraClassPath=./" \
--jars path-to-jar \
/path-to-jar "$@"

我收到以下错误:

2019-08-07 13:37:49 ERROR Executor:91 - Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
       at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:176)
        at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
        at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.open(ParquetIO.java:304)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.prepareWrite(FileIO.java:1359)
        at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:937)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn.processElement(WriteFiles.java:533)

看起来这个作业完成了读取和转换,但是当尝试写入文件系统时失败了。我现在没有使用HDFS。有什么想法吗?


共2个答案

匿名用户

我确信ParquetIO依赖于Parquet1.10版本,该版本为parque文件阅读器/写入器添加了“hadoop中立”API。

Spark 2.2.3依赖于Parquet1.8.2,它没有Beam ParquetIO使用的builder(…)构造函数,这由异常确认。

如果可能,最简单的解决方案是更新到Spark 2.4,它将Parquet版本提升到1.10.0。

如果您无法升级Spark版本,有几种技术可以覆盖Spark带来的jar:

>

  • 您可以将火花。(驱动程序|执行程序). userClassPathFirst设置为true,这将把类放在火花提供的罐子之前的胖jar中。这可能会奏效,也可能会引入新的依赖冲突。

    您可以尝试用parket-xx-1.10.0替换本地火花安装中的parket-xx-1.8.2.jar(假设它们是直接替换)。如果这有效,您可以通过在提交作业时设置park. yarn.jars属性将相同的策略应用于集群中的火花作业。

    您可以尝试在胖jar中为梁ParquetIO及其镶木地板依赖项着色。

    编辑:这是一个已知问题BEAM-5164。

    编辑(解决方法):

    我按照说明进行了一些修改,设法使其适用于Spark 2.2.3:

    >

  • 我使用了scala2.11依赖项,并将它们设置为

    我在maven-shad-plugin中添加了以下三个位置:

      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <filters>
    
    ... unchanged ...
    
              </filters>
              <relocations>
                <relocation>
                  <pattern>org.apache.parquet</pattern>
                  <shadedPattern>shaded.org.apache.parquet</shadedPattern>
                </relocation>
                <!-- Some packages are shaded already, and on the original spark classpath. Shade them more. -->
                <relocation>
                  <pattern>shaded.parquet</pattern>
                  <shadedPattern>reshaded.parquet</shadedPattern>
                </relocation>
                <relocation>
                  <pattern>org.apache.avro</pattern>
                  <shadedPattern>shaded.org.apache.avro</shadedPattern>
                </relocation>
              </relocations>
            </configuration>
            <executions>
    
    ... unchanged ...
    
            </executions>
          </plugin>
        </plugins>
      </build>
    

  • 匿名用户

    不要使用火花.驱动器. userClassPathFirst火花.执行器.userClassPathFirst,因为它仍然是实验性的。但是,请使用火花.驱动器.外类路径火花.执行器.外类路径

    官方留档的定义:“附加到驱动程序类路径之前的额外类路径条目。”

    • “前置”,如放在Spark的核心类路径前面。

    示例:

    --conf spark.driver.extraClassPath=C:\Users\Khalid\Documents\Projects\libs\jackson-annotations-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-core-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-databind-2.6.0.jar

    这解决了我的问题(我想使用的杰克逊版本和使用的一个火花之间的冲突)。

    希望有帮助。