Selaa lähdekoodia

优化elasticsearch保存

zhou-hao 5 vuotta sitten
vanhempi
commit
183458fb9d

+ 33 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java

@@ -3,9 +3,11 @@ package org.jetlinks.community.elastic.search.service;
 import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -122,6 +124,25 @@ public class DefaultElasticSearchService implements ElasticSearchService {
             .then();
     }
 
+    @Override
+    public <T> Mono<Void> save(String index, T payload) {
+        return save(index, Mono.just(payload));
+    }
+
+    @Override
+    public <T> Mono<Void> save(String index, Publisher<T> data) {
+        return Flux.from(data)
+            .map(v -> new Buffer(index, v))
+            .collectList()
+            .flatMap(this::doSave)
+            .then();
+    }
+
+    @Override
+    public <T> Mono<Void> save(String index, Collection<T> payload) {
+        return save(index, Flux.fromIterable(payload));
+    }
+
     @PreDestroy
     public void shutdown() {
         sink.complete();
@@ -196,10 +217,21 @@ public class DefaultElasticSearchService implements ElasticSearchService {
                 lst.forEach(request::add);
                 return ReactorActionListener.<BulkResponse>mono(listener ->
                     restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
-                );
+                ).doOnNext(this::checkResponse);
             }).thenReturn(buffers.size());
     }
 
+    @SneakyThrows
+    protected void checkResponse(BulkResponse response) {
+        if (response.hasFailures()) {
+            for (BulkItemResponse item : response.getItems()) {
+                if (item.isFailed()) {
+                    throw item.getFailure().getCause();
+                }
+            }
+        }
+    }
+
     private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {
         long total = response.getHits().getTotalHits();
         return PagerResult.of((int) total, translate(mapper, response), param);

+ 6 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java

@@ -26,6 +26,12 @@ public interface ElasticSearchService {
 
     <T> Mono<Void> commit(String index, Publisher<T> data);
 
+    <T> Mono<Void> save(String index,  T payload);
+
+    <T> Mono<Void> save(String index,  Collection<T> payload);
+
+    <T> Mono<Void> save(String index, Publisher<T> data);
+
     default <T> Flux<T> query(String index, QueryParam queryParam, Class<T> type) {
         return query(index, queryParam, map -> FastBeanCopier.copy(map, type));
     }