zhou-hao 5 gadi atpakaļ
vecāks
revīzija
4faa6ab211

+ 13 - 4
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java

@@ -150,13 +150,22 @@ public class DefaultElasticSearchService implements ElasticSearchService {
 
     //@PostConstruct
     public void init() {
+        //最小间隔
+        int flushRate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
+        //缓冲最大数量
+        int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
+        //缓冲超时时间
+        Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
+        //缓冲背压
+        int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64);
+
         //这里的警告都输出到控制台,输入到slf4j可能会造成日志递归.
         FluxUtils.bufferRate(
             Flux.<Buffer>create(sink -> this.sink = sink),
-            Integer.getInteger("elasticsearch.flush.rate", 1000),
-            Integer.getInteger("elasticsearch.buffer.size", 2000),
-            Duration.ofSeconds(3))
-            .onBackpressureBuffer(512,
+            flushRate,
+            bufferSize,
+            bufferTimeout)
+            .onBackpressureBuffer(bufferBackpressure,
                 drop -> System.err.println("无法处理更多索引请求!"),
                 BufferOverflowStrategy.DROP_OLDEST)
             .flatMap(this::doSave)