提问者:小点点

ApacheBeam Elasticsearch chIO不适用于最新的elasticsearch


我一直在尝试在apache光束管道中使用Elasticsearch chIOAPI。我无法连接到elasticsearch。任何帮助都会很棒。

我的JAR版本:

org. apache2.0-sdkjava-core:2.37.0

org. apache2.0-sdk s-java-io-弹性搜索:2.37.0

我的弹性版本是:8.1.2

留档说只支持v2. x

链接:https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html

我不能使用最新版本的弹性吗?有人尝试过让它使用最新版本吗?

我的管道:

public class Test {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();

        org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(options);

        PCollection<FileIO.ReadableFile> files = pipeline.apply(
                        FileIO.match().filepattern("/path_to_files/**")
                                .continuously(
                                        Duration.standardSeconds(10),
                                        Watch.Growth.never()))
                .apply(FileIO.readMatches());

        PCollection<FileIO.ReadableFile> filteredFiles =  files.apply(Filter.by(new EndsWithFilter("Dummy.txt")));
        
        filteredFiles.apply(ParDo.of(new DummyFileProcessor())).apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration.create(new String[]{ "http://****:9200" }, "***").withUsername("***").withPassword("***")));

        pipeline.run().waitUntilFinish();
    }
}

请找到Stacktrace,上面说找不到弹性搜索版本

Exception in thread "main" org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2592)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:839)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:2579)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.setup(ElasticsearchIO.java:1647)
    at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeSetup(Unknown Source)
    at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

    at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load(DoFnLifecycleManager.java:92)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.runners.direct.DoFnLifecycleManager.get(DoFnLifecycleManager.java:62)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:131)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:81)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:162)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version

Caused by: org.apache.http.ConnectionClosedException: Connection is closed
    at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:356)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed

    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 more

> Task :IO-TextIO-TextIO_Read:Test.main() FAILED

Execution failed for task ':IO-TextIO-TextIO_Read:Test.main()'.

共1个答案

匿名用户

我刚刚发现下面这个问题,其中Elasticsearch chIO不适用于Elasticsearch 8。*

https://github.com/GoogleCloudPlatform/DataflowTemplates/issues/367

当我把它降级到7.13.2时,它工作得很好