Sfoglia il codice sorgente

merge from jetlinks-pro

zhou-hao 4 anni fa
parent
commit
02facde6ff

+ 198 - 90
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java

@@ -1,7 +1,9 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
-import lombok.AllArgsConstructor;
+import com.alibaba.fastjson.JSON;
+import io.netty.util.internal.ObjectPool;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -14,6 +16,8 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.core.CountRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.search.SearchHit;
@@ -30,9 +34,11 @@ import org.jetlinks.community.elastic.search.service.ElasticSearchService;
 import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
 import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
 import org.reactivestreams.Publisher;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
+import org.springframework.util.unit.DataSize;
 import reactor.core.publisher.BufferOverflowStrategy;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -40,10 +46,12 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
+import reactor.util.retry.Retry;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -55,13 +63,14 @@ import java.util.stream.Collectors;
 @Service("elasticSearchService")
 @Slf4j
 @DependsOn("reactiveElasticsearchClient")
+@ConfigurationProperties(prefix = "elasticsearch")
 public class ReactiveElasticSearchService implements ElasticSearchService {
 
     private final ReactiveElasticsearchClient restClient;
 
     private final ElasticSearchIndexManager indexManager;
 
-    FluxSink<Buffer> sink;
+    private FluxSink<Buffer> sink;
 
     public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
         true, true, false, false
@@ -94,14 +103,17 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
                     .flatMap(entry -> createSearchRequest(entry, index))
                     .doOnNext(request::add)
                     .then(Mono.just(request))
-                    .flatMapMany(searchRequest -> restClient.multiSearch(searchRequest)
+                    .flatMapMany(searchRequest -> restClient
+                        .multiSearch(searchRequest)
                         .flatMapMany(response -> Flux.fromArray(response.getResponses()))
                         .flatMap(item -> {
                             if (item.isFailure()) {
                                 log.warn(item.getFailureMessage(), item.getFailure());
                                 return Mono.empty();
                             }
-                            return Flux.fromIterable(translate((map) -> mapper.apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse()));
+                            return Flux
+                                .fromIterable(translate((map) -> mapper
+                                    .apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse()));
                         }))
                     ;
             });
@@ -122,13 +134,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
         return this.doQuery(index, queryParam)
-            .flatMap(tp2 ->
-                convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
-                    .collectList()
-                    .filter(CollectionUtils::isNotEmpty)
-                    .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam))
-            )
-            .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
+                   .flatMap(tp2 -> this
+                       .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
+                       .collectList()
+                       .filter(CollectionUtils::isNotEmpty)
+                       .map(list -> PagerResult.of((int) tp2
+                           .getT2()
+                           .getHits()
+                           .getTotalHits().value, list, queryParam))
+                   )
+                   .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
     }
 
     private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
@@ -147,8 +162,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
                 }
                 return mapper
                     .apply(Optional
-                        .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
-                        .convertFromElastic(hitMap));
+                               .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
+                               .convertFromElastic(hitMap));
             });
 
     }
@@ -193,14 +208,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
     @Override
     public <T> Mono<Void> commit(String index, T payload) {
-        sink.next(new Buffer(index, payload));
+        sink.next(Buffer.of(index, payload));
         return Mono.empty();
     }
 
     @Override
     public <T> Mono<Void> commit(String index, Collection<T> payload) {
         for (T t : payload) {
-            sink.next(new Buffer(index, t));
+            sink.next(Buffer.of(index, t));
         }
         return Mono.empty();
     }
@@ -208,8 +223,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<Void> commit(String index, Publisher<T> data) {
         return Flux.from(data)
-            .flatMap(d -> commit(index, d))
-            .then();
+                   .flatMap(d -> commit(index, d))
+                   .then();
     }
 
     @Override
@@ -220,10 +235,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
     @Override
     public <T> Mono<Void> save(String index, Publisher<T> data) {
         return Flux.from(data)
-            .map(v -> new Buffer(index, v))
-            .collectList()
-            .flatMap(this::doSave)
-            .then();
+                   .map(v -> Buffer.of(index, v))
+                   .collectList()
+                   .flatMap(this::doSave)
+                   .then();
     }
 
     @Override
@@ -236,43 +251,121 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
         sink.complete();
     }
 
-    //@PostConstruct
-    public void init() {
+    @Getter
+    @Setter
+    private BufferConfig buffer = new BufferConfig();
+
+    @Getter
+    @Setter
+    public static class BufferConfig {
         //最小间隔
-        int flushRate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
+        private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
         //缓冲最大数量
-        int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
+        private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
         //缓冲超时时间
-        Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
-        //缓冲背压
-        int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64);
-
-        FluxUtils.bufferRate(
-            Flux.<Buffer>create(sink -> this.sink = sink),
-            flushRate,
-            bufferSize,
-            bufferTimeout)
-            .onBackpressureBuffer(bufferBackpressure,
-                drop -> System.err.println("无法处理更多索引请求!"),
-                BufferOverflowStrategy.DROP_OLDEST)
+        private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
+        //背压堆积数量限制.
+        private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
+            .getRuntime()
+            .availableProcessors());
+        //最大缓冲字节
+        private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
+
+        //最大重试次数
+        private int maxRetry = 3;
+        //重试间隔
+        private Duration minBackoff = Duration.ofSeconds(3);
+    }
+
+    //@PostConstruct
+    public void init() {
+        int flushRate = buffer.rate;
+        int bufferSize = buffer.bufferSize;
+        Duration bufferTimeout = buffer.bufferTimeout;
+        int bufferBackpressure = buffer.bufferBackpressure;
+        long bufferBytes = buffer.bufferBytes.toBytes();
+        AtomicLong bufferedBytes = new AtomicLong();
+
+        FluxUtils
+            .bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
+                        flushRate,
+                        bufferSize,
+                        bufferTimeout,
+                        (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
+            .doOnNext(buf -> bufferedBytes.set(0))
+            .onBackpressureBuffer(bufferBackpressure, drop -> {
+                // TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
+                drop.forEach(Buffer::release);
+                System.err.println("elasticsearch无法处理更多索引请求!丢弃数据数量:" + drop.size());
+            }, BufferOverflowStrategy.DROP_OLDEST)
+            .publishOn(Schedulers.boundedElastic(), bufferBackpressure)
             .flatMap(buffers -> {
                 long time = System.currentTimeMillis();
-                return this
-                    .doSave(buffers)
-                    .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
-                    .onErrorContinue((err, obj) -> {
-                        //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
-                        System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
-                    });
+                return Mono.create(sink -> {
+                    try {
+                        this
+                            .doSave(buffers)
+                            .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
+                            .doOnError((err) -> {
+                                //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
+                                System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
+                            })
+                            .doFinally((s) -> sink.success())
+                            .subscribe();
+                    } catch (Exception e) {
+                        sink.success();
+                    }
+                });
             })
+            .onErrorResume((err) -> Mono
+                .fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
+                                                           org.hswebframework.utils.StringUtils.throwable2String(err))))
             .subscribe();
     }
 
-    @AllArgsConstructor
+    //使用对象池处理Buffer,减少GC消耗
+    static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
+
     @Getter
     static class Buffer {
         String index;
-        Object payload;
+        String id;
+        String payload;
+        final ObjectPool.Handle<Buffer> handle;
+
+        public Buffer(ObjectPool.Handle<Buffer> handle) {
+            this.handle = handle;
+        }
+
+        public static Buffer of(String index, Object payload) {
+            Buffer buffer;
+            try {
+                buffer = pool.get();
+            } catch (Exception e) {
+                buffer = new Buffer(null);
+            }
+            buffer.index = index;
+            Map<String, Object> data = payload instanceof Map
+                ? ((Map) payload) :
+                FastBeanCopier.copy(payload, HashMap::new);
+            Object id = data.get("id");
+            buffer.id = id == null ? null : String.valueOf(id);
+            buffer.payload = JSON.toJSONString(data);
+            return buffer;
+        }
+
+        void release() {
+            this.index = null;
+            this.id = null;
+            this.payload = null;
+            if (null != handle) {
+                handle.recycle(this);
+            }
+        }
+
+        int numberOfBytes() {
+            return payload == null ? 0 : payload.length() * 2;
+        }
     }
 
 
@@ -292,33 +385,48 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
     protected Mono<Integer> doSave(Collection<Buffer> buffers) {
         return Flux.fromIterable(buffers)
-            .groupBy(Buffer::getIndex)
-            .flatMap(group -> {
-                String index = group.key();
-                return this.getIndexForSave(index)
-                    .zipWith(indexManager.getIndexMetadata(index))
-                    .flatMapMany(tp2 ->
-                        group.map(buffer -> {
-                            Map<String, Object> data = FastBeanCopier.copy(buffer.getPayload(), HashMap::new);
-
-                            IndexRequest request;
-                            if (data.get("id") != null) {
-                                request = new IndexRequest(tp2.getT1()).type("_doc").id(String.valueOf(data.get("id")));
-                            } else {
-                                request = new IndexRequest(tp2.getT1()).type("_doc");
-                            }
-                            request.source(tp2.getT2().convertToElastic(data));
-                            return request;
-                        }));
-            })
-            .collectList()
-            .filter(CollectionUtils::isNotEmpty)
-            .flatMap(lst -> {
-                BulkRequest request = new BulkRequest();
-                lst.forEach(request::add);
-                return restClient.bulk(request);
-            })
-            .thenReturn(buffers.size());
+                   .groupBy(Buffer::getIndex,Integer.MAX_VALUE)
+                   .flatMap(group -> {
+                       String index = group.key();
+                       return this
+                           .getIndexForSave(index)
+                           .flatMapMany(realIndex -> group
+                               .map(buffer -> {
+                                   try {
+                                       IndexRequest request;
+                                       if (buffer.id != null) {
+                                           request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
+                                       } else {
+                                           request = new IndexRequest(realIndex).type("_doc");
+                                       }
+                                       request.source(buffer.payload, XContentType.JSON);
+                                       return request;
+                                   } finally {
+                                       buffer.release();
+                                   }
+                               }));
+                   })
+                   .collectList()
+                   .filter(CollectionUtils::isNotEmpty)
+                   .flatMap(lst -> {
+                       BulkRequest request = new BulkRequest();
+                       request.timeout(TimeValue.timeValueSeconds(9));
+                       lst.forEach(request::add);
+                       return restClient
+                           .bulk(request)
+                           .as(save -> {
+                               if (buffer.maxRetry > 0) {
+                                   return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
+                               }
+                               return save;
+                           });
+                   })
+                   .doOnNext(response -> {
+                       if (response.hasFailures()) {
+                           System.err.println(response.buildFailureMessage());
+                       }
+                   })
+                   .thenReturn(buffers.size());
     }
 
     @SneakyThrows
@@ -334,14 +442,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
     private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
         return Arrays.stream(response.getHits().getHits())
-            .map(hit -> {
-                Map<String, Object> hitMap = hit.getSourceAsMap();
-                if (StringUtils.isEmpty(hitMap.get("id"))) {
-                    hitMap.put("id", hit.getId());
-                }
-                return mapper.apply(hitMap);
-            })
-            .collect(Collectors.toList());
+                     .map(hit -> {
+                         Map<String, Object> hitMap = hit.getSourceAsMap();
+                         if (StringUtils.isEmpty(hitMap.get("id"))) {
+                             hitMap.put("id", hit.getId());
+                         }
+                         return mapper.apply(hitMap);
+                     })
+                     .collect(Collectors.toList());
     }
 
     private Flux<SearchHit> doSearch(SearchRequest request) {
@@ -374,12 +482,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
         SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
         return Flux.fromIterable(indexes)
-            .flatMap(index -> getIndexForSearch(index.getIndex()))
-            .collectList()
-            .map(indexList ->
-                new SearchRequest(indexList.toArray(new String[0]))
-                    .source(builder)
-                    .indicesOptions(indexOptions));
+                   .flatMap(index -> getIndexForSearch(index.getIndex()))
+                   .collectList()
+                   .map(indexList ->
+                            new SearchRequest(indexList.toArray(new String[0]))
+                                .source(builder)
+                                .indicesOptions(indexOptions));
     }
 
     protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
@@ -396,9 +504,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
 
         SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
         return Flux.fromIterable(indexes)
-            .flatMap(index -> getIndexForSearch(index.getIndex()))
-            .collectList()
-            .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
+                   .flatMap(index -> getIndexForSearch(index.getIndex()))
+                   .collectList()
+                   .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
     }
 
     private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {