新到反应器,所以我道歉,如果不清楚。我有一个类,每5分钟运行一个重复的过程(使用:Flux.间隔(Duration. ofMinute(5))
)。每5分钟,我ssh到我们的一些prem linux机器上,执行一个bash脚本并记录输出。然后我将输出保存到DB。这在一段时间内可以正常工作,但后来我得到:java.lang.OutOfMemoryError:无法创建本机线程:可能内存溢出或进程/资源限制达到
。
java程序运行在docker容器上,当我查看日志时,我看到io-excutor-thread-
@PostConstruct
private void pollMachines() {
Flux.interval(Duration.ofMinutes(5))
.map(this::getAllMachines)
.flatMap(name -> process1(name)
.map(this::process2)
.doOnError(throwable -> LOGGER.info("Some error happened with {}", name))
.onErrorResume(throwable -> setOfflineStatus(name))
)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
这里是例外:
07:03:49.854 [io-executor-thread-256] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
backend | reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
backend | Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
backend | at java.base/java.lang.Thread.start0(Native Method)
backend | at java.base/java.lang.Thread.start(Thread.java:798)
backend | at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
backend | at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1354)
backend | at io.micronaut.scheduling.instrument.InstrumentedExecutor.execute(InstrumentedExecutor.java:42)
backend | at java.base/java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1714)
backend | at java.base/java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1931)
backend | at io.micronaut.data.runtime.operations.ExecutorAsyncOperations.update(ExecutorAsyncOperations.java:152)
backend | at io.micronaut.data.runtime.operations.ExecutorReactiveOperations.lambda$update$12(ExecutorReactiveOperations.java:194)
backend | at io.micronaut.core.async.publisher.CompletableFuturePublisher$CompletableFutureSubscription.request(CompletableFuturePublisher.java:78)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
backend | at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:238)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
backend | at io.micronaut.core.async.publisher.CompletableFuturePublisher.subscribe(CompletableFuturePublisher.java:49)
backend | at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:63)
backend | at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
backend | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
backend | at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
backend | at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
backend | at io.micronaut.core.async.publisher.Publishers$1.doOnNext(Publishers.java:248)
backend | at io.micronaut.core.async.subscriber.CompletionAwareSubscriber.onNext(CompletionAwareSubscriber.java:56)
backend | at io.micronaut.core.async.publisher.CompletableFuturePublisher$CompletableFutureSubscription.lambda$request$0(CompletableFuturePublisher.java:89)
backend | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
backend | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
backend | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
backend | at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
backend | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
backend | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
backend | at java.base/java.lang.Thread.run(Thread.java:829)
有什么方法可以避免这种情况吗?
编辑:在1个线程上轮询20台左右的机器后,下一个轮询会话在另一个线程上开始。编辑:删除进程3。假设我每台机器只做2个进程
编辑3:如果我可以重新表述这个问题:有没有一种方法可以让我每5分钟执行一次上述过程而不会遇到OOM异常