Quellcode durchsuchen

优化ElasticsearchClient,增加兼容性

zhou-hao vor 3 Jahren
Ursprung
Commit
ea91293b31

+ 234 - 117
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -1,7 +1,10 @@
 package org.jetlinks.community.elastic.search.service.reactive;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import io.vavr.Function3;
+import lombok.Generated;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.methods.HttpGet;
@@ -69,6 +72,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.tasks.TaskId;
 import org.reactivestreams.Publisher;
 import org.springframework.data.elasticsearch.client.ClientLogger;
@@ -154,8 +158,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Boolean> ping(HttpHeaders headers) {
 
         return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful()) //
-            .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
+                                                                                                       .map(response -> response
+                                                                                                           .statusCode()
+                                                                                                           .is2xxSuccessful()) //
+                                                                                                       .onErrorResume(NoReachableHostException.class, error -> Mono
+                                                                                                           .just(false))
+                                                                                                       .next();
     }
 
     /*
@@ -166,7 +174,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<MainResponse> info(HttpHeaders headers) {
 
         return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) //
-            .next();
+                                                                                                  .next();
     }
 
     /*
@@ -177,9 +185,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
 
         return sendRequest(getRequest, requestCreator.get(), GetResponse.class, headers) //
-            .filter(GetResponse::isExists) //
-            .map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
-            .next();
+                                                                                         .filter(GetResponse::isExists) //
+                                                                                         .map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
+                                                                                         .next();
     }
 
     /*
@@ -204,8 +212,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
 
         return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful()) //
-            .next();
+                                                                                                  .map(response -> response
+                                                                                                      .statusCode()
+                                                                                                      .is2xxSuccessful()) //
+                                                                                                  .next();
     }
 
     /*
@@ -243,7 +253,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
 
         return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
-            .publishNext();
+                                                                                                  .publishNext();
     }
 
     /*
@@ -255,12 +265,26 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         searchRequest.source().trackTotalHits(true);
         searchRequest.source().size(0);
         searchRequest.source().fetchSource(false);
-        return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-            .map(SearchResponse::getHits) //
-            .map(searchHits -> searchHits.getTotalHits().value) //
+        return sendRequest(searchRequest, this::buildSearchRequest, SearchResponse.class, headers)
+            .map(SearchResponse::getHits)
+            .map(searchHits -> searchHits.getTotalHits().value)
             .next();
     }
 
+    protected Request buildSearchRequest(SearchRequest request) {
+        //兼容6.x版本es
+        if (version.before(Version.V_7_0_0) && request
+            .source()
+            .trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
+            Request req = requestCreator.search().apply(request);
+            JSONObject json = JSON.parseObject(requestBodyToString(req));
+            json.put("track_total_hits", true);
+            req.setJsonEntity(json.toJSONString());
+            return req;
+        }
+        return requestCreator.search().apply(request);
+    }
+
     /*
      * (non-Javadoc)
      * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
@@ -269,8 +293,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
 
         return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-            .map(SearchResponse::getHits) //
-            .flatMap(Flux::fromIterable);
+                                                                                                  .map(SearchResponse::getHits) //
+                                                                                                  .flatMap(Flux::fromIterable);
     }
 
     /*
@@ -287,8 +311,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         searchRequest.source().trackTotalHits(false);
 
         return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
-            .map(SearchResponse::getAggregations) //
-            .flatMap(Flux::fromIterable);
+                                                                                                  .map(SearchResponse::getAggregations) //
+                                                                                                  .flatMap(Flux::fromIterable);
     }
 
     /*
@@ -327,34 +351,36 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
         return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
 
-            scrollState -> {
+                              scrollState -> {
 
-                Flux<SearchHit> searchHits = inbound.<SearchResponse>handle((searchResponse, sink) -> {
+                                  Flux<SearchHit> searchHits = inbound
+                                      .<SearchResponse>handle((searchResponse, sink) -> {
 
-                    scrollState.updateScrollId(searchResponse.getScrollId());
-                    if (isEmpty(searchResponse.getHits())) {
+                                          scrollState.updateScrollId(searchResponse.getScrollId());
+                                          if (isEmpty(searchResponse.getHits())) {
 
-                        inbound.onComplete();
-                        outbound.onComplete();
+                                              inbound.onComplete();
+                                              outbound.onComplete();
 
-                    } else {
+                                          } else {
 
-                        sink.next(searchResponse);
+                                              sink.next(searchResponse);
 
-                        SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
-                            .scroll(scrollTimeout);
-                        request.next(searchScrollRequest);
-                    }
+                                              SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
+                                                  .scroll(scrollTimeout);
+                                              request.next(searchScrollRequest);
+                                          }
 
-                }).map(SearchResponse::getHits) //
-                    .flatMap(Flux::fromIterable);
+                                      })
+                                      .map(SearchResponse::getHits) //
+                                      .flatMap(Flux::fromIterable);
 
-                return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
+                                  return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
 
-            },
-            state -> cleanupScroll(headers, state), //
-            (state, error) -> cleanupScroll(headers, state), //
-            state -> cleanupScroll(headers, state)); //
+                              },
+                              state -> cleanupScroll(headers, state), //
+                              (state, error) -> cleanupScroll(headers, state), //
+                              state -> cleanupScroll(headers, state)); //
     }
 
     private static boolean isEmpty(@Nullable SearchHits hits) {
@@ -382,21 +408,21 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
 
         return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
-            .publishNext();
+                                                                                                               .publishNext();
     }
 
     static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
         XContentType requestContentType = indexRequest.getContentType();
         if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) {
             throw new IllegalArgumentException("Unsupported content-type found for request with content-type ["
-                + requestContentType + "], only JSON and SMILE are supported");
+                                                   + requestContentType + "], only JSON and SMILE are supported");
         }
         if (xContentType == null) {
             return requestContentType;
         }
         if (requestContentType != xContentType) {
             throw new IllegalArgumentException("Mismatching content-type found for request with content-type ["
-                + requestContentType + "], previous requests have content-type [" + xContentType + ']');
+                                                   + requestContentType + "], previous requests have content-type [" + xContentType + ']');
         }
         return xContentType;
     }
@@ -542,7 +568,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
         return sendRequest(bulkRequest, this::convertBulk, BulkResponse.class, headers) //
-            .publishNext();
+                                                                                        .publishNext();
     }
 
     // --> INDICES
@@ -555,9 +581,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
         return sendRequest(request, requestCreator.indexExists()
             , RawActionResponse.class, headers) //
-            .map(response -> response.statusCode().is2xxSuccessful())
-            .onErrorReturn(false)
-            .next();
+                                                .map(response -> response.statusCode().is2xxSuccessful())
+                                                .onErrorReturn(false)
+                                                .next();
     }
 
     /*
@@ -568,7 +594,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
 
         return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
-            .then();
+                                                                                                       .then();
     }
 
     /*
@@ -582,7 +608,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
             request.addParameter("include_type_name", "true");
             return request;
         }), AcknowledgedResponse.class, headers) //
-            .then();
+                                                 .then();
     }
 
     /*
@@ -601,7 +627,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
                     return r;
                 }),
             AcknowledgedResponse.class, headers) //
-            .then();
+                                                 .then();
     }
 
     /*
@@ -612,7 +638,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Void> closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) {
 
         return sendRequest(closeIndexRequest, requestCreator.indexClose(), AcknowledgedResponse.class, headers) //
-            .then();
+                                                                                                                .then();
     }
 
     /*
@@ -623,7 +649,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Void> refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) {
 
         return sendRequest(refreshRequest, requestCreator.indexRefresh(), RefreshResponse.class, headers) //
-            .then();
+                                                                                                          .then();
     }
 
     /*
@@ -636,7 +662,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         return sendRequest(putMappingRequest
             , requestCreator.putMapping()
             , AcknowledgedResponse.class, headers) //
-            .then();
+                                                   .then();
     }
 
     /*
@@ -647,7 +673,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest) {
 
         return sendRequest(flushRequest, requestCreator.flushIndex(), FlushResponse.class, headers) //
-            .then();
+                                                                                                    .then();
     }
 
     /*
@@ -658,24 +684,24 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     public Mono<ClientResponse> execute(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback callback) {
 
         return this.hostProvider.getActive(HostProvider.Verification.LAZY) //
-            .flatMap(callback::doWithClient) //
-            .onErrorResume(throwable -> {
+                                .flatMap(callback::doWithClient) //
+                                .onErrorResume(throwable -> {
 
-                if (throwable instanceof ConnectException) {
+                                    if (throwable instanceof ConnectException) {
 
-                    return hostProvider.getActive(HostProvider.Verification.ACTIVE) //
-                        .flatMap(callback::doWithClient);
-                }
+                                        return hostProvider.getActive(HostProvider.Verification.ACTIVE) //
+                                                           .flatMap(callback::doWithClient);
+                                    }
 
-                return Mono.error(throwable);
-            });
+                                    return Mono.error(throwable);
+                                });
     }
 
     @Override
     public Mono<org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status> status() {
 
         return hostProvider.clusterInfo() //
-            .map(it -> new ClientStatus(it.getNodes()));
+                           .map(it -> new ClientStatus(it.getNodes()));
     }
 
     // --> Private Response helpers
@@ -683,35 +709,52 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     private static GetResult getResponseToGetResult(GetResponse response) {
 
         return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getSeqNo(),
-            response.getPrimaryTerm(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(),
-            response.getFields(), null);
+                             response.getPrimaryTerm(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(),
+                             response.getFields(), null);
     }
 
     // -->
 
-    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request, Function<Req, Request> converter,
-                                                                     Class<Resp> responseType, HttpHeaders headers) {
-        return sendRequest(converter.apply(request), responseType, headers);
+    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request,
+                                                                     Function<Req, Request> converter,
+                                                                     Class<Resp> responseType,
+                                                                     HttpHeaders headers) {
+        return sendRequest(request, converter, responseType, headers, DefaultReactiveElasticsearchClient::doDecode);
+    }
+
+    private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request,
+                                                                     Function<Req, Request> converter,
+                                                                     Class<Resp> responseType,
+                                                                     HttpHeaders headers,
+                                                                     Function3<ClientResponse, Class<Resp>, String, Mono<Resp>> decoder) {
+        return sendRequest(converter.apply(request), responseType, headers, decoder);
     }
 
-    private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {
+    private <Resp> Flux<Resp> sendRequest(Request request,
+                                          Class<Resp> responseType,
+                                          HttpHeaders headers,
+                                          Function3<ClientResponse, Class<Resp>, String, Mono<Resp>> decoder) {
 
         String logId = ClientLogger.newLogId();
 
         return execute(webClient -> sendRequest(webClient, logId, request, headers))
-            .flatMapMany(response -> readResponseBody(logId, request, response, responseType));
+            .flatMapMany(response -> readResponseBody(logId, request, response, responseType, decoder));
     }
 
     private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
 
-        WebClient.RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
+        WebClient.RequestBodySpec requestBodySpec = webClient
+            .method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
             .uri(builder -> {
 
                 builder = builder.path(request.getEndpoint());
 
                 if (!ObjectUtils.isEmpty(request.getParameters())) {
-                    for (Map.Entry<String, String> entry : request.getParameters().entrySet()) {
-                        builder = builder.queryParam(entry.getKey(), entry.getValue());
+                    for (Map.Entry<String, String> entry : request
+                        .getParameters()
+                        .entrySet()) {
+                        builder = builder.queryParam(entry.getKey(), entry
+                            .getValue());
                     }
                 }
                 return builder.build();
@@ -725,8 +768,14 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
                 // and now those that might be set on the request.
                 if (request.getOptions() != null) {
 
-                    if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) {
-                        request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
+                    if (!ObjectUtils.isEmpty(request
+                                                 .getOptions()
+                                                 .getHeaders())) {
+                        request
+                            .getOptions()
+                            .getHeaders()
+                            .forEach(it -> theHeaders.add(it.getName(), it
+                                .getValue()));
                     }
                 }
 
@@ -741,34 +790,41 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
             Lazy<String> body = bodyExtractor(request);
 
-            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(),
-                body::get);
+            ClientLogger.logRequest(logId, request
+                                        .getMethod()
+                                        .toUpperCase(), request.getEndpoint(), request.getParameters(),
+                                    body::get);
 
             requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
             requestBodySpec.body(Mono.fromSupplier(body), String.class);
         } else {
-            ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
+            ClientLogger.logRequest(logId, request
+                .getMethod()
+                .toUpperCase(), request.getEndpoint(), request.getParameters());
         }
 
         return requestBodySpec //
-            .exchange() //
-            .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
+                               .exchange() //
+                               .onErrorReturn(ConnectException.class, ClientResponse
+                                   .create(HttpStatus.SERVICE_UNAVAILABLE)
+                                   .build());
     }
 
     private Lazy<String> bodyExtractor(Request request) {
 
-        return Lazy.of(() -> {
+        return Lazy.of(() -> requestBodyToString(request));
+    }
 
-            try {
-                return EntityUtils.toString(request.getEntity());
-            } catch (IOException e) {
-                throw new RequestBodyEncodingException("Error encoding request", e);
-            }
-        });
+    @SneakyThrows
+    private String requestBodyToString(Request request) {
+        return EntityUtils.toString(request.getEntity());
     }
 
-    private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
-                                                        Class<T> responseType) {
+    private <T> Publisher<? extends T> readResponseBody(String logId,
+                                                        Request request,
+                                                        ClientResponse response,
+                                                        Class<T> responseType,
+                                                        Function3<ClientResponse, Class<T>, String, Mono<T>> decoder) {
 
         if (RawActionResponse.class.equals(responseType)) {
 
@@ -789,53 +845,79 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         }
 
         return response.body(BodyExtractors.toMono(byte[].class)) //
-            .map(it -> new String(it, StandardCharsets.UTF_8)) //
-            .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
-            .flatMap(content -> doDecode(response, responseType, content));
+                       .map(it -> new String(it, StandardCharsets.UTF_8)) //
+                       .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
+                       .flatMap(content -> decoder.apply(response, responseType, content));
     }
 
+
     private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {
 
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
 
         try {
 
             Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
-
+            if (fromXContent == null) {
+                fromXContent = ReflectionUtils.findMethod(responseType, "fromXContext", XContentParser.class);
+            }
             return Mono.justOrEmpty(responseType
-                .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
+                                        .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
 
         } catch (Throwable errorParseFailure) { // cause elasticsearch also uses AssertionError
 
             try {
                 return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content)));
             } catch (Exception e) {
-
                 return Mono
-                    .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
+                    .error(new ElasticsearchStatusException(content,
+                                                            RestStatus.fromCode(response.statusCode().value()),
+                                                            errorParseFailure));
             }
         }
     }
 
     private static XContentParser createParser(String mediaType, String content) throws IOException {
+        XContentType type = XContentType.fromMediaTypeOrFormat(mediaType);
+        if (type == null) {
+            throw new IOException(content);
+        }
         return XContentType.fromMediaTypeOrFormat(mediaType) //
-            .xContent() //
-            .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()),
-                DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
+                           .xContent() //
+                           .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()),
+                                         DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
     }
 
     private <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
 
         int statusCode = response.statusCode().value();
         RestStatus status = RestStatus.fromCode(statusCode);
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
-
-        return response.body(BodyExtractors.toMono(byte[].class)) //
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
+
+        return response
+            .body(BodyExtractors.toMono(byte[].class)) //
+            .switchIfEmpty(Mono.error(
+                () -> new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
+                                                                     request.getMethod(),
+                                                                     request.getEndpoint(),
+                                                                     statusCode),
+                                                       status)))
             .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
             .flatMap(content -> contentOrError(content, mediaType, status))
             .flatMap(unused -> Mono
-                .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
-                    request.getMethod(), request.getEndpoint(), statusCode), status)));
+                .error(() -> new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
+                                                                            request.getMethod(),
+                                                                            request.getEndpoint(),
+                                                                            statusCode),
+                                                              status)));
     }
 
     private <T> Publisher<? extends T> handleClientError(String logId, Request request, ClientResponse response,
@@ -843,13 +925,17 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
         int statusCode = response.statusCode().value();
         RestStatus status = RestStatus.fromCode(statusCode);
-        String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+        String mediaType = response
+            .headers()
+            .contentType()
+            .map(MediaType::toString)
+            .orElse(XContentType.JSON.mediaType());
 
         return response.body(BodyExtractors.toMono(byte[].class)) //
-            .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
-            .flatMap(content -> contentOrError(content, mediaType, status)) //
-            .doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) //
-            .flatMap(content -> doDecode(response, responseType, content));
+                       .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
+                       .flatMap(content -> contentOrError(content, mediaType, status)) //
+                       .doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) //
+                       .flatMap(content -> doDecode(response, responseType, content));
     }
 
     // region ElasticsearchException helper
@@ -868,6 +954,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         ElasticsearchException exception = getElasticsearchException(content, mediaType, status);
 
         if (exception != null) {
+            if (status == RestStatus.NOT_FOUND) {
+                log.warn(exception.getMessage(), exception);
+                return Mono.empty();
+            }
             StringBuilder sb = new StringBuilder();
             buildExceptionMessages(sb, exception);
             return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception));
@@ -917,26 +1007,51 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     @Override
     public Mono<SearchResponse> searchForPage(SearchRequest request) {
         long startTime = System.currentTimeMillis();
-        return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY)
+        // if (version.after(Version.V_7_0_0)) {
+        request.source().trackTotalHits(true);
+        // }
+        return this
+            .sendRequest(request, this::buildSearchRequest, SearchResponse.class, HttpHeaders.EMPTY)
             .singleOrEmpty()
-            .doOnNext(res -> {
-                log.trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source());
-            })
-            .doOnError(err -> {
-                log.warn("execute search {} error : {}", request.indices(), request.source(), err);
-            });
+            .doOnSuccess(res -> log
+                .trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source()))
+            .doOnError(err -> log.warn("execute search {} error : {}", request.indices(), request.source(), err));
     }
 
     @SneakyThrows
     protected Request convertMultiSearchRequest(MultiSearchRequest searchRequest) {
-        return RequestConverters.multiSearch(searchRequest);
+        Request request = RequestConverters.multiSearch(searchRequest);
+        if (log.isTraceEnabled()) {
+            log.trace("execute elasticsearch multi search: {}", requestBodyToString(request));
+        }
+        return request;
     }
 
     @Override
     @SneakyThrows
     public Mono<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
-
-        return sendRequest(request, this::convertMultiSearchRequest, MultiSearchResponse.class, HttpHeaders.EMPTY)
+        Function3<ClientResponse, Class<MultiSearchResponse>, String, Mono<MultiSearchResponse>> decoder;
+        if (version.before(Version.V_7_0_0)) {
+            //适配6.x响应格式
+            decoder = (clientResponse, multiSearchResponseClass, s) -> {
+                JSONObject data = JSON.parseObject(s);
+                int took = data.getJSONArray("responses")
+                               .stream()
+                               .map(JSONObject.class::cast)
+                               .map(json -> json.getIntValue("took"))
+                               .reduce(Math::addExact)
+                               .orElse(0);
+                data.put("took", took);
+                return DefaultReactiveElasticsearchClient.doDecode(clientResponse, multiSearchResponseClass, data.toJSONString());
+            };
+        } else {
+            decoder = DefaultReactiveElasticsearchClient::doDecode;
+        }
+        return sendRequest(request,
+                           this::convertMultiSearchRequest,
+                           MultiSearchResponse.class,
+                           HttpHeaders.EMPTY,
+                           decoder)
             .singleOrEmpty();
     }
 
@@ -962,7 +1077,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
     }
 
     Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {
-        Request request= new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
+        Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
         Params params = new Params(request);
         params.putParam("include_type_name", "true");
         return request;
@@ -1013,7 +1128,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
      *
      * @author Christoph Strobl
      */
-    class ClientStatus implements ReactiveElasticsearchClient.Status {
+    @Generated
+    static class ClientStatus implements ReactiveElasticsearchClient.Status {
 
         private final Collection<ElasticsearchHost> connectedHosts;
 
@@ -1031,6 +1147,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
         }
     }
 
+    @Generated
     static class Params {
         private final Request request;