我一直在尝试在apache光束管道中使用Elasticsearch chIOAPI。我无法连接到elasticsearch。任何帮助都会很棒。
我的JAR版本:
org. apache2.0-sdkjava-core:2.37.0
org. apache2.0-sdk s-java-io-弹性搜索:2.37.0
我的弹性版本是:8.1.2
留档说只支持v2. x
链接:https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html
我不能使用最新版本的弹性吗?有人尝试过让它使用最新版本吗?
我的管道:
public class Test {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(options);
PCollection<FileIO.ReadableFile> files = pipeline.apply(
FileIO.match().filepattern("/path_to_files/**")
.continuously(
Duration.standardSeconds(10),
Watch.Growth.never()))
.apply(FileIO.readMatches());
PCollection<FileIO.ReadableFile> filteredFiles = files.apply(Filter.by(new EndsWithFilter("Dummy.txt")));
filteredFiles.apply(ParDo.of(new DummyFileProcessor())).apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(new String[]{ "http://****:9200" }, "***").withUsername("***").withPassword("***")));
pipeline.run().waitUntilFinish();
}
}
请找到Stacktrace,上面说找不到弹性搜索版本
Exception in thread "main" org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 13 more
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2592)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:839)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2579)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 more
> Task :IO-TextIO-TextIO_Read:Test.main() FAILED
Execution failed for task ':IO-TextIO-TextIO_Read:Test.main()'.
我刚刚发现下面这个问题,其中Elasticsearch chIO不适用于Elasticsearch 8。*
https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/367
当我把它降级到7.13.2时,它工作得很好