提问者:小点点

Apache Beam的数据存储性能较差


我在数据存储写入速度方面遇到了巨大的性能问题。大多数时候它保持在100个元素/秒以下。

当使用数据存储客户端(com.google.cloud:google-cloud-datastore)在本地机器上标记写入速度并并行运行批处理写入时,我能够达到大约2600个元素/秒的速度。

我已经使用JavaAPI设置了一个简单的Apache Beam管道。这是它的图表:

以下是在没有Datastore节点的情况下运行时的速度:

这种方式要快得多。这一切都指向DatastoreV1。写入是此管道中的瓶颈——通过没有写入节点的管道速度和DatastoreV1的墙壁时间来判断。写入与其他节点的墙壁时间相比。

我试图解决这个问题的方法:

•增加初始工作人员的数量(尝试1和10,没有明显差异)。一段时间后(可能在前2个节点完成处理之后),数据存储将写入次数减少到1。基于此,我怀疑DatastoreIO. v1().write()没有并行运行其工作人员。为什么?

确保一切都在同一个位置运行:GCP项目、数据流管道工作人员

尝试使用不同的实体密钥生成策略(根据这篇文章)。目前使用这种方法:Key. Builder keyBuilder=DatastoreHelper.makeKey(“一些类型”,UUID.随机UUID().toString());。我不完全确定这会生成足够均匀分布的密钥,但我想即使不是,性能也不应该这么低?

请注意,我无法使用提供的Apache Beam

查看DatastoreV1。写入节点日志:

它每隔大约5秒推送500个实体的批次,这不是很快。

总的来说,它看起来像DatastoreIO. v1().write()速度很慢,它的工作人员没有并行运行。知道如何解决这个问题或者可能是什么原因吗?


共1个答案

匿名用户

我不应该对这个问题置之不理。

在联系了GCP支持后,我得到了一个建议,认为原因可能是TextIO.Read节点从压缩(gzip)文件中读取。显然这是一个不可并行化的操作。事实上,在切换到源的未压缩文件后,性能有所提高。

建议的另一个解决方案是在从源代码读取后运行手动修复管道。这意味着向管道中的项目添加任意键,按任意键分组,然后删除任意键。它也有效。这种方法归结为以下代码:

管道代码:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
        .apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
        /* further transforms */ 

帮助类:

public class PipelineRepartitioner<T> {
    public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}

我在Apache Beam Jira上看到了与该问题相关的票证,因此将来可以修复。