Ver código fonte

优化es保存

zhouhao 5 anos atrás
pai
commit
b693d0bc98

+ 24 - 40
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java

@@ -171,47 +171,31 @@ public class DefaultElasticSearchService implements ElasticSearchService {
 
     protected Mono<Integer> doSave(Collection<Buffer> buffers) {
         return Flux.fromIterable(buffers)
-            .collect(Collectors.groupingBy(Buffer::getIndex))
-            .flatMapMany(map -> Flux
-                .fromIterable(map.entrySet())
-                .flatMap(e ->
-                    this.getIndexForSave(e.getKey())
-                        .zipWith(indexManager.getIndexMetadata(e.getKey()), (index, metadata) -> Tuples.of(index, metadata, e.getValue()))))
-            .map(entry -> {
-                String index = entry.getT1();
-                BulkRequest bulkRequest = new BulkRequest(index, "_doc");
-                for (Buffer buffer : entry.getT3()) {
-                    IndexRequest request = new IndexRequest();
-                    Object o = JSON.toJSON(buffer.getPayload());
-                    if (o instanceof Map) {
-                        request.source(ElasticSearchConverter.convertDataToElastic((Map<String, Object>) o, entry.getT2().getProperties()));
-                    } else {
-                        request.source(o.toString(), XContentType.JSON);
-                    }
-                    bulkRequest.add(request);
-                }
-                entry.getT3().clear();
-                return bulkRequest;
-            })
-            .flatMap(bulkRequest ->
-                Mono.create(sink ->
-                    restClient.getWriteClient()
-                        .bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
-                            @Override
-                            public void onResponse(BulkResponse responses) {
-                                if (responses.hasFailures()) {
-                                    sink.error(new RuntimeException("保存ElasticSearch数据失败:" + responses.buildFailureMessage()));
-                                    return;
-                                }
-                                sink.success(1);
+            .groupBy(Buffer::getIndex)
+            .flatMap(group -> {
+                String index = group.key();
+                return this.getIndexForSave(index)
+                    .zipWith(indexManager.getIndexMetadata(index))
+                    .flatMapMany(tp2 ->
+                        group.map(buffer -> {
+                            IndexRequest request = new IndexRequest(tp2.getT1(),"_doc");
+                            Object o = JSON.toJSON(buffer.getPayload());
+                            if (o instanceof Map) {
+                                request.source(tp2.getT2().convertToElastic((Map<String, Object>) o));
+                            } else {
+                                request.source(o.toString(), XContentType.JSON);
                             }
-
-                            @Override
-                            public void onFailure(Exception e) {
-                                sink.error(e);
-                            }
-                        })))
-            .then(Mono.just(buffers.size()));
+                            return request;
+                        }));
+            })
+            .collectList()
+            .flatMap(lst -> {
+                BulkRequest request = new BulkRequest();
+                lst.forEach(request::add);
+                return ReactorActionListener.<BulkResponse>mono(listener -> {
+                    restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener);
+                });
+            }).thenReturn(buffers.size());
     }
 
     private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {