zhouhao 5 rokov pred
rodič
commit
43eee59781

+ 14 - 16
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
@@ -27,10 +28,8 @@ import org.jetlinks.community.elastic.search.parser.QueryParamTranslateService;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
+import reactor.core.publisher.*;
+import reactor.core.scheduler.Schedulers;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
@@ -115,20 +114,19 @@ public class DefaultElasticSearchService implements ElasticSearchService {
 
     //@PostConstruct
     public void init() {
-
-        FluxUtils.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
-            1000, 2000, Duration.ofSeconds(3))
+        //这里的警告都输出到控制台,输入到slf4j可能会造成日志递归.
+        FluxUtils.bufferRate(
+            Flux.<Buffer>create(sink -> this.sink = sink),
+            1000,
+            2000,
+            Duration.ofSeconds(3))
+            .onBackpressureBuffer(512,
+                drop -> System.err.println("无法处理更多索引请求!"),
+                BufferOverflowStrategy.DROP_OLDEST)
             .flatMap(this::doSave)
-            .doOnNext((len) -> {
-                //System.out.println(len);
-                log.debug("保存ES数据成功:{}", len);
-            })
-            .onErrorContinue((err, obj) -> {
-                //打印到控制台以免递归调用ES导致崩溃
-                System.err.println(org.hswebframework.utils.StringUtils.throwable2String(err));
-            })
+            .doOnNext((len) -> log.debug("保存ES数据成功,数量:{}", len))
+            .onErrorContinue((err, obj) -> System.err.println(org.hswebframework.utils.StringUtils.throwable2String(err)))
             .subscribe();
-
     }
 
     @AllArgsConstructor