我使用Apache Beam 2.13.0与GCP数据流运行器。
我有一个从批处理管道流式摄取到BigQuery的问题:
PCollection<BigQueryInsertError> stageOneErrors =
destinationTableSelected
.apply("Write BQ Attempt 1",
BigQueryIO.<KV<TableDestination, TableRow>>write()
.withMethod(STREAMING_INSERTS)
.to(new KVTableDestination())
.withFormatFunction(new KVTableRow())
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND))
.getFailedInsertsWithErr();
错误:
Shutting down JVM after 8 consecutive periods of measured GC thrashing.
Memory is used/total/max = 15914/18766/18766 MB,
GC last/max = 99.17/99.17 %, #pushbacks=0, gc thrashing=true.
Heap dump not written.
相同的代码在流模式下正常工作(如果省略了显式方法设置)。
该代码适用于相当小的数据集(少于200万条记录)。超过250万失败。
从表面上看,这似乎是一个类似于这里描述的问题:在连续8次测量GC后关闭JVM
创建单独的问题以添加其他详细信息。
我能做些什么来解决这个问题吗?看起来问题出在BigQueryIO组件本身-GroupBy键失败。
包含GroupByKey的转换的问题在于,它将等到接收到当前窗口的所有数据后再进行分组。
在流式传输模式下,这通常很好,因为传入元素被窗口化到单独的窗口中,因此GroupByKey仅对一小块数据进行操作。
然而,在Batch模式下,当前窗口是全局窗口,这意味着GroupByKey将等待整个输入数据集被读取和接收,然后才开始执行分组。如果输入数据集很大,那么您的worker将运行内存溢出,这解释了您在这里看到的内容。
这就提出了一个问题:为什么在处理Batch数据时使用BigQuery Streaming插入?流式插入相对昂贵(与免费的批量相比!)并且比批量导入具有更小的配额/限制:即使您解决了您看到的问题,在Bigquery本身中可能还有更多问题有待发现。
在与支持人员和开发人员进行广泛讨论后,已告知不鼓励从批处理管道使用BigQuery流入口,并且目前(从2.13.0开始)不支持。