zhouhao пре 4 година
родитељ
комит
3a183774b1

+ 28 - 15
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java

@@ -44,6 +44,7 @@ import reactor.util.function.Tuples;
 import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -247,24 +248,36 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
         //缓冲背压
         int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64);
 
-        FluxUtils.bufferRate(
-            Flux.<Buffer>create(sink -> this.sink = sink),
-            flushRate,
-            bufferSize,
-            bufferTimeout)
-            .onBackpressureBuffer(bufferBackpressure,
-                drop -> System.err.println("无法处理更多索引请求!"),
-                BufferOverflowStrategy.DROP_OLDEST)
+        FluxUtils
+            .bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
+                        flushRate,
+                        bufferSize,
+                        bufferTimeout)
+            .onBackpressureBuffer(bufferBackpressure, drop -> {
+                System.err.println("elasticsearch无法处理更多索引请求!丢弃数据数量:" + drop.size());
+            }, BufferOverflowStrategy.DROP_OLDEST)
+            .publishOn(Schedulers.boundedElastic(), bufferBackpressure)
             .flatMap(buffers -> {
                 long time = System.currentTimeMillis();
-                return this
-                    .doSave(buffers)
-                    .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
-                    .onErrorContinue((err, obj) -> {
-                        //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
-                        System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
-                    });
+                return Mono.create(sink -> {
+                    try {
+                        this
+                            .doSave(buffers)
+                            .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
+                            .doOnError((err) -> {
+                                //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
+                                System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
+                            })
+                            .doFinally((s) -> sink.success())
+                            .subscribe();
+                    } catch (Exception e) {
+                        sink.success();
+                    }
+                });
             })
+            .onErrorResume((err) -> Mono
+                .fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
+                                                           org.hswebframework.utils.StringUtils.throwable2String(err))))
             .subscribe();
     }