我们将数据流式传输到BigQuery(使用动态目标)的数据流管道在作业日志中显示了许多异常。所有这些都是javax.net. ssl.SSLException:连接重置
异常。我在下面提供了堆栈跟踪,但想知道这是否会导致数据丢失?
这些错误很多(每天数百个),似乎与以下工作日志一致:
Execution of work for computation 'P6' on key '-REDACTED-' failed with uncaught
exception. Work will be retried locally.
我是否可以得出结论,工作在工作人员本地进行了有效重试,并且没有数据丢失,但作业日志中仍然提到了异常?
我发现仅仅通过查看不同的日志来理解哪些错误实际上导致了数据丢失是非常令人困惑的。
示例堆栈跟踪:
Error message from worker: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:954)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:994)
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375)
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
Caused by: javax.net.ssl.SSLException: Connection reset java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127)
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350)
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293)
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288)
java.base/sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1581)
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:979)
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552)
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609)
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510)
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107)
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274)
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40)
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232)
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252)
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369)
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48)
com.google.api.client.json.JsonParser.parse(JsonParser.java:363)
com.google.api.client.json.JsonParser.parse(JsonParser.java:335)
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79)
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73)
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Connection reset java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476)
java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:963)
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
java.base/sun.net.www.http.ChunkedInputStream.readAheadBlocking(ChunkedInputStream.java:552)
java.base/sun.net.www.http.ChunkedInputStream.readAhead(ChunkedInputStream.java:609)
java.base/sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:696)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3510)
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:164)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:107)
com.google.common.io.ByteStreams.exhaust(ByteStreams.java:274)
com.google.api.client.http.ConsumingInputStream.close(ConsumingInputStream.java:40)
java.base/java.util.zip.InflaterInputStream.close(InflaterInputStream.java:232)
java.base/java.util.zip.GZIPInputStream.close(GZIPInputStream.java:137)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._closeInput(UTF8StreamJsonParser.java:252)
com.fasterxml.jackson.core.base.ParserBase.close(ParserBase.java:369)
com.google.api.client.json.jackson2.JacksonParser.close(JacksonParser.java:48)
com.google.api.client.json.JsonParser.parse(JsonParser.java:363)
com.google.api.client.json.JsonParser.parse(JsonParser.java:335)
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:79)
com.google.api.client.json.JsonObjectParser.parseAndClose(JsonObjectParser.java:73)
com.google.api.client.http.HttpResponse.parseAs(HttpResponse.java:456)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:878)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1342)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
这些错误不应导致数据丢失,因为抛出异常时会重试捆绑包。