|
@@ -3,7 +3,6 @@ package org.jetlinks.community.elastic.search.service.reactive;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.fastjson.serializer.SerializerFeature;
|
|
|
-import io.vavr.Function3;
|
|
|
import lombok.Generated;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -19,6 +18,9 @@ import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionRequest;
|
|
|
import org.elasticsearch.action.DocWriteRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
|
|
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
|
|
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
|
@@ -31,6 +33,9 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
|
|
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
|
|
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
|
|
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
|
|
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
|
@@ -38,10 +43,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
import org.elasticsearch.action.delete.DeleteResponse;
|
|
|
-import org.elasticsearch.action.get.GetRequest;
|
|
|
-import org.elasticsearch.action.get.GetResponse;
|
|
|
-import org.elasticsearch.action.get.MultiGetRequest;
|
|
|
-import org.elasticsearch.action.get.MultiGetResponse;
|
|
|
+import org.elasticsearch.action.get.*;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
import org.elasticsearch.action.main.MainRequest;
|
|
@@ -53,7 +55,12 @@ import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
import org.elasticsearch.action.update.UpdateRequest;
|
|
|
import org.elasticsearch.action.update.UpdateResponse;
|
|
|
+import org.elasticsearch.client.GetAliasesResponse;
|
|
|
import org.elasticsearch.client.Request;
|
|
|
+import org.elasticsearch.client.indices.GetFieldMappingsRequest;
|
|
|
+import org.elasticsearch.client.indices.GetFieldMappingsResponse;
|
|
|
+import org.elasticsearch.client.indices.GetIndexResponse;
|
|
|
+import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
|
|
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -65,29 +72,33 @@ import org.elasticsearch.index.VersionType;
|
|
|
import org.elasticsearch.index.get.GetResult;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
|
|
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
|
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
|
import org.elasticsearch.rest.BytesRestResponse;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
+import org.elasticsearch.script.mustache.SearchTemplateRequest;
|
|
|
+import org.elasticsearch.script.mustache.SearchTemplateResponse;
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
|
import org.elasticsearch.search.SearchHits;
|
|
|
import org.elasticsearch.search.aggregations.Aggregation;
|
|
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
|
+import org.elasticsearch.search.suggest.Suggest;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
import org.springframework.data.elasticsearch.client.ClientLogger;
|
|
|
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
|
|
|
import org.springframework.data.elasticsearch.client.NoReachableHostException;
|
|
|
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
|
|
|
-import org.springframework.data.elasticsearch.client.reactive.RequestBodyEncodingException;
|
|
|
+import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
|
|
|
import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
|
|
|
import org.springframework.data.elasticsearch.client.util.NamedXContents;
|
|
|
import org.springframework.data.elasticsearch.client.util.RequestConverters;
|
|
|
import org.springframework.data.elasticsearch.client.util.ScrollState;
|
|
|
+import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
|
|
|
import org.springframework.data.util.Lazy;
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
import org.springframework.http.HttpMethod;
|
|
|
-import org.springframework.http.HttpStatus;
|
|
|
import org.springframework.http.MediaType;
|
|
|
import org.springframework.lang.Nullable;
|
|
|
import org.springframework.util.Assert;
|
|
@@ -101,6 +112,7 @@ import reactor.core.publisher.EmitterProcessor;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.FluxSink;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
+import reactor.function.Function3;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
@@ -118,7 +130,7 @@ import static org.springframework.data.elasticsearch.client.util.RequestConverte
|
|
|
|
|
|
@Slf4j
|
|
|
@Generated
|
|
|
-public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
|
|
|
+public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient, ReactiveElasticsearchClient.Cluster {
|
|
|
private final HostProvider<?> hostProvider;
|
|
|
private final RequestCreator requestCreator;
|
|
|
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
|
|
@@ -158,13 +170,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
@Override
|
|
|
public Mono<Boolean> ping(HttpHeaders headers) {
|
|
|
|
|
|
- return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
|
|
|
- .map(response -> response
|
|
|
- .statusCode()
|
|
|
- .is2xxSuccessful()) //
|
|
|
- .onErrorResume(NoReachableHostException.class, error -> Mono
|
|
|
- .just(false))
|
|
|
- .next();
|
|
|
+ return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers)
|
|
|
+ .flatMap(response -> response.releaseBody().thenReturn(response.statusCode().is2xxSuccessful()))
|
|
|
+ .onErrorResume(NoReachableHostException.class, error -> Mono.just(false))
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -196,13 +205,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest)
|
|
|
*/
|
|
|
@Override
|
|
|
- public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
|
|
|
+ public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
|
|
|
|
|
|
return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers)
|
|
|
.map(MultiGetResponse::getResponses) //
|
|
|
- .flatMap(Flux::fromArray) //
|
|
|
- .filter(it -> !it.isFailed() && it.getResponse().isExists()) //
|
|
|
- .map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
|
|
|
+ .flatMap(Flux::fromArray); //
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -212,11 +219,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
@Override
|
|
|
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
|
|
|
|
|
|
- return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
|
|
|
- .map(response -> response
|
|
|
- .statusCode()
|
|
|
- .is2xxSuccessful()) //
|
|
|
- .next();
|
|
|
+ return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers)
|
|
|
+ .flatMap(response -> response.releaseBody().thenReturn(response
|
|
|
+ .statusCode()
|
|
|
+ .is2xxSuccessful()))
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -233,7 +240,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
|
|
|
*/
|
|
|
@Override
|
|
|
- public org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices indices() {
|
|
|
+ public Indices indices() {
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Cluster cluster() {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
@@ -272,6 +284,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.next();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest searchTemplateRequest) {
|
|
|
+ return sendRequest(searchTemplateRequest, requestCreator.searchTemplate(), SearchTemplateResponse.class, headers)
|
|
|
+ .map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
|
|
|
+ }
|
|
|
+
|
|
|
protected Request buildSearchRequest(SearchRequest request) {
|
|
|
//兼容6.x版本es
|
|
|
if (version.before(Version.V_7_0_0) && request
|
|
@@ -298,6 +316,17 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.flatMap(Flux::fromIterable);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest) {
|
|
|
+ return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
|
|
|
+ return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
|
|
|
+ .map(SearchResponse::getSuggest);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* (non-Javadoc)
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
|
|
@@ -412,6 +441,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.publishNext();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
|
|
|
+ return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers)
|
|
|
+ .next()
|
|
|
+ .map(ByQueryResponse::of);
|
|
|
+ }
|
|
|
+
|
|
|
static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
|
|
|
XContentType requestContentType = indexRequest.getContentType();
|
|
|
if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) {
|
|
@@ -581,10 +617,25 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
@Override
|
|
|
public Mono<Boolean> existsIndex(HttpHeaders headers, GetIndexRequest request) {
|
|
|
return sendRequest(request, requestCreator.indexExists()
|
|
|
- , RawActionResponse.class, headers) //
|
|
|
- .map(response -> response.statusCode().is2xxSuccessful())
|
|
|
- .onErrorReturn(false)
|
|
|
- .next();
|
|
|
+ , RawActionResponse.class, headers)
|
|
|
+ .flatMap(response -> response
|
|
|
+ .releaseBody()
|
|
|
+ .thenReturn(response
|
|
|
+ .statusCode()
|
|
|
+ .is2xxSuccessful()))
|
|
|
+ .onErrorReturn(false)
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> existsIndex(HttpHeaders headers, org.elasticsearch.client.indices.GetIndexRequest getIndexRequest) {
|
|
|
+ return sendRequest(getIndexRequest, requestCreator.indexExistsRequest(), RawActionResponse.class, headers)
|
|
|
+ .flatMap(response -> response
|
|
|
+ .releaseBody()
|
|
|
+ .thenReturn(response
|
|
|
+ .statusCode()
|
|
|
+ .is2xxSuccessful()))
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -592,10 +643,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest)
|
|
|
*/
|
|
|
@Override
|
|
|
- public Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
|
|
|
+ public Mono<Boolean> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
|
|
|
|
|
|
- return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
|
|
|
- .then();
|
|
|
+ return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged)
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -603,13 +655,21 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest)
|
|
|
*/
|
|
|
@Override
|
|
|
- public Mono<Void> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
|
|
|
+ public Mono<Boolean> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
|
|
|
|
|
|
return sendRequest(createIndexRequest, requestCreator.indexCreate().andThen(request -> {
|
|
|
request.addParameter("include_type_name", "true");
|
|
|
return request;
|
|
|
- }), AcknowledgedResponse.class, headers) //
|
|
|
- .then();
|
|
|
+ }), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged)
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> createIndex(HttpHeaders headers, org.elasticsearch.client.indices.CreateIndexRequest createIndexRequest) {
|
|
|
+ return sendRequest(createIndexRequest, requestCreator.createIndexRequest(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged)
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -619,16 +679,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
@Override
|
|
|
public Mono<Void> openIndex(HttpHeaders headers, OpenIndexRequest request) {
|
|
|
|
|
|
- return sendRequest(
|
|
|
- request,
|
|
|
- requestCreator
|
|
|
- .indexOpen()
|
|
|
- .andThen(r -> {
|
|
|
- r.addParameter("include_type_name", "true");
|
|
|
- return r;
|
|
|
- }),
|
|
|
- AcknowledgedResponse.class, headers) //
|
|
|
- .then();
|
|
|
+ return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers)
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -658,12 +710,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest)
|
|
|
*/
|
|
|
@Override
|
|
|
- public Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
|
|
|
+ public Mono<Boolean> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
|
|
|
|
|
|
- return sendRequest(putMappingRequest
|
|
|
- , requestCreator.putMapping()
|
|
|
- , AcknowledgedResponse.class, headers) //
|
|
|
- .then();
|
|
|
+ return putMapping(headers, putMappingRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
|
|
|
+ return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged)
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
|
|
|
+ return sendRequest(putMappingRequest, requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged)
|
|
|
+ .next();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -677,29 +740,49 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.then();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<GetSettingsResponse> getSettings(HttpHeaders headers, GetSettingsRequest getSettingsRequest) {
|
|
|
+ return sendRequest(getSettingsRequest, requestCreator.getSettings(), GetSettingsResponse.class, headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* (non-Javadoc)
|
|
|
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
|
|
|
*/
|
|
|
@Override
|
|
|
- public Mono<ClientResponse> execute(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback callback) {
|
|
|
+ public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
|
|
|
|
|
|
- return this.hostProvider.getActive(HostProvider.Verification.LAZY) //
|
|
|
- .flatMap(callback::doWithClient) //
|
|
|
- .onErrorResume(throwable -> {
|
|
|
+ return this.hostProvider
|
|
|
+ .getActive(HostProvider.Verification.LAZY) //
|
|
|
+ .flatMap(callback::doWithClient) //
|
|
|
+ .onErrorResume(throwable -> {
|
|
|
+
|
|
|
+ if (isCausedByConnectionException(throwable)) {
|
|
|
+ return hostProvider.getActive(HostProvider.Verification.ACTIVE) //
|
|
|
+ .flatMap(callback::doWithClient);
|
|
|
+ }
|
|
|
+
|
|
|
+ return Mono.error(throwable);
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- if (throwable instanceof ConnectException) {
|
|
|
+ private boolean isCausedByConnectionException(Throwable throwable) {
|
|
|
|
|
|
- return hostProvider.getActive(HostProvider.Verification.ACTIVE) //
|
|
|
- .flatMap(callback::doWithClient);
|
|
|
- }
|
|
|
+ Throwable t = throwable;
|
|
|
+ do {
|
|
|
|
|
|
- return Mono.error(throwable);
|
|
|
- });
|
|
|
+ if (t instanceof ConnectException) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ t = t.getCause();
|
|
|
+ } while (t != null);
|
|
|
+
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Mono<org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status> status() {
|
|
|
+ public Mono<Status> status() {
|
|
|
|
|
|
return hostProvider.clusterInfo() //
|
|
|
.map(it -> new ClientStatus(it.getNodes()));
|
|
@@ -716,6 +799,22 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
|
|
|
// -->
|
|
|
|
|
|
+ private <REQ, RESP> Flux<RESP> sendRequest(REQ request, Function<REQ, Request> converter, Class<RESP> responseType,
|
|
|
+ HttpHeaders headers) {
|
|
|
+ return sendRequest(converter.apply(request), responseType, headers);
|
|
|
+ }
|
|
|
+
|
|
|
+ private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {
|
|
|
+
|
|
|
+ String logId = ClientLogger.newLogId();
|
|
|
+
|
|
|
+ return Flux
|
|
|
+ .from(execute(webClient -> sendRequest(webClient, logId, request, headers).exchangeToMono(clientResponse -> {
|
|
|
+ Publisher<? extends Resp> publisher = readResponseBody(logId, request, clientResponse, responseType);
|
|
|
+ return Mono.from(publisher);
|
|
|
+ })));
|
|
|
+ }
|
|
|
+
|
|
|
private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request,
|
|
|
Function<Req, Request> converter,
|
|
|
Class<Resp> responseType,
|
|
@@ -738,54 +837,53 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
|
|
|
String logId = ClientLogger.newLogId();
|
|
|
|
|
|
- return execute(webClient -> sendRequest(webClient, logId, request, headers))
|
|
|
- .flatMapMany(response -> readResponseBody(logId, request, response, responseType, decoder));
|
|
|
+ return execute(webClient -> Mono.just(this.sendRequest(webClient, logId, request, headers)))
|
|
|
+ .flatMapMany(spec -> {
|
|
|
+ return spec.exchangeToFlux(response -> {
|
|
|
+ return Flux.from(
|
|
|
+ this.readResponseBody(logId, request, response, responseType, decoder)
|
|
|
+ );
|
|
|
+ });
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
- private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
|
|
|
-
|
|
|
- WebClient.RequestBodySpec requestBodySpec = webClient
|
|
|
- .method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
|
|
|
- .uri(builder -> {
|
|
|
-
|
|
|
- builder = builder.path(request.getEndpoint());
|
|
|
-
|
|
|
- if (!ObjectUtils.isEmpty(request.getParameters())) {
|
|
|
- for (Map.Entry<String, String> entry : request
|
|
|
- .getParameters()
|
|
|
- .entrySet()) {
|
|
|
- builder = builder.queryParam(entry.getKey(), entry
|
|
|
- .getValue());
|
|
|
- }
|
|
|
- }
|
|
|
- return builder.build();
|
|
|
- }) //
|
|
|
- .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) //
|
|
|
- .headers(theHeaders -> {
|
|
|
-
|
|
|
- // add all the headers explicitly set
|
|
|
- theHeaders.addAll(headers);
|
|
|
-
|
|
|
- // and now those that might be set on the request.
|
|
|
- if (request.getOptions() != null) {
|
|
|
-
|
|
|
- if (!ObjectUtils.isEmpty(request
|
|
|
- .getOptions()
|
|
|
- .getHeaders())) {
|
|
|
- request
|
|
|
- .getOptions()
|
|
|
- .getHeaders()
|
|
|
- .forEach(it -> theHeaders.add(it.getName(), it
|
|
|
- .getValue()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // plus the ones from the supplier
|
|
|
- HttpHeaders suppliedHeaders = headersSupplier.get();
|
|
|
- if (suppliedHeaders != null && suppliedHeaders != HttpHeaders.EMPTY) {
|
|
|
- theHeaders.addAll(suppliedHeaders);
|
|
|
- }
|
|
|
- });
|
|
|
+ private WebClient.RequestBodySpec sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
|
|
|
+
|
|
|
+ WebClient.RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request
|
|
|
+ .getMethod()
|
|
|
+ .toUpperCase())) //
|
|
|
+ .uri(builder -> {
|
|
|
+
|
|
|
+ builder = builder.path(request.getEndpoint());
|
|
|
+
|
|
|
+ if (!ObjectUtils.isEmpty(request.getParameters())) {
|
|
|
+ for (Map.Entry<String, String> entry : request
|
|
|
+ .getParameters()
|
|
|
+ .entrySet()) {
|
|
|
+ builder = builder.queryParam(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return builder.build();
|
|
|
+ }) //
|
|
|
+ .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) //
|
|
|
+ .headers(theHeaders -> {
|
|
|
+
|
|
|
+ // add all the headers explicitly set
|
|
|
+ theHeaders.addAll(headers);
|
|
|
+
|
|
|
+ // and now those that might be set on the request.
|
|
|
+ if (request.getOptions() != null) {
|
|
|
+
|
|
|
+ if (!ObjectUtils.isEmpty(request
|
|
|
+ .getOptions()
|
|
|
+ .getHeaders())) {
|
|
|
+ request
|
|
|
+ .getOptions()
|
|
|
+ .getHeaders()
|
|
|
+ .forEach(it -> theHeaders.add(it.getName(), it.getValue()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
if (request.getEntity() != null) {
|
|
|
|
|
@@ -796,19 +894,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.toUpperCase(), request.getEndpoint(), request.getParameters(),
|
|
|
body::get);
|
|
|
|
|
|
- requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
|
|
|
- requestBodySpec.body(Mono.fromSupplier(body), String.class);
|
|
|
+ requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()))
|
|
|
+ .body(Mono.fromSupplier(body), String.class);
|
|
|
} else {
|
|
|
ClientLogger.logRequest(logId, request
|
|
|
.getMethod()
|
|
|
.toUpperCase(), request.getEndpoint(), request.getParameters());
|
|
|
}
|
|
|
|
|
|
- return requestBodySpec //
|
|
|
- .exchange() //
|
|
|
- .onErrorReturn(ConnectException.class, ClientResponse
|
|
|
- .create(HttpStatus.SERVICE_UNAVAILABLE)
|
|
|
- .build());
|
|
|
+ return requestBodySpec;
|
|
|
}
|
|
|
|
|
|
private Lazy<String> bodyExtractor(Request request) {
|
|
@@ -821,6 +915,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
return EntityUtils.toString(request.getEntity());
|
|
|
}
|
|
|
|
|
|
+ private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
|
|
|
+ Class<T> responseType) {
|
|
|
+ return readResponseBody(logId, request, response, responseType, DefaultReactiveElasticsearchClient::doDecode);
|
|
|
+ }
|
|
|
+
|
|
|
private <T> Publisher<? extends T> readResponseBody(String logId,
|
|
|
Request request,
|
|
|
ClientResponse response,
|
|
@@ -1014,7 +1113,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.sendRequest(request, this::buildSearchRequest, SearchResponse.class, HttpHeaders.EMPTY)
|
|
|
.singleOrEmpty()
|
|
|
.doOnSuccess(res -> log
|
|
|
- .trace("execute search {} {} : {}", request.indices(),res.getTook(), request.source()))
|
|
|
+ .trace("execute search {} {} : {}", request.indices(), res.getTook(), request.source()))
|
|
|
.doOnError(err -> log.warn("execute search {} error : {}", request.indices(), request.source(), err));
|
|
|
}
|
|
|
|
|
@@ -1076,6 +1175,70 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
.singleOrEmpty();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
|
|
|
+ return sendRequest(getMappingsRequest, requestCreator.getMapping(),
|
|
|
+ GetMappingsResponse.class, headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<org.elasticsearch.client.indices.GetMappingsResponse> getMapping(HttpHeaders headers, org.elasticsearch.client.indices.GetMappingsRequest getMappingsRequest) {
|
|
|
+ return sendRequest(getMappingsRequest, requestCreator.getMappingRequest(), org.elasticsearch.client.indices.GetMappingsResponse.class, headers) //
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers,
|
|
|
+ GetFieldMappingsRequest getFieldMappingsRequest) {
|
|
|
+ return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class,
|
|
|
+ headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> updateAliases(HttpHeaders headers, IndicesAliasesRequest indicesAliasesRequest) {
|
|
|
+ return sendRequest(indicesAliasesRequest, requestCreator.updateAlias(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<GetAliasesResponse> getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) {
|
|
|
+ return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> putTemplate(HttpHeaders headers, org.elasticsearch.client.indices.PutIndexTemplateRequest putIndexTemplateRequest) {
|
|
|
+ return sendRequest(putIndexTemplateRequest, requestCreator.putTemplate(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<org.elasticsearch.client.indices.GetIndexTemplatesResponse> getTemplate(HttpHeaders headers,
|
|
|
+ org.elasticsearch.client.indices.GetIndexTemplatesRequest getIndexTemplatesRequest) {
|
|
|
+ return (sendRequest(getIndexTemplatesRequest, requestCreator.getTemplates(), org.elasticsearch.client.indices.GetIndexTemplatesResponse.class,
|
|
|
+ headers)).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> existsTemplate(HttpHeaders headers, IndexTemplatesExistRequest indexTemplatesExistRequest) {
|
|
|
+ return sendRequest(indexTemplatesExistRequest, requestCreator.templatesExist(),
|
|
|
+ RawActionResponse.class, headers)
|
|
|
+ .flatMap(response -> response
|
|
|
+ .releaseBody()
|
|
|
+ .thenReturn(response.statusCode().is2xxSuccessful()))
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> deleteTemplate(HttpHeaders headers, DeleteIndexTemplateRequest deleteIndexTemplateRequest) {
|
|
|
+ return sendRequest(deleteIndexTemplateRequest, requestCreator.deleteTemplate(), AcknowledgedResponse.class, headers)
|
|
|
+ .map(AcknowledgedResponse::isAcknowledged).next();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Mono<GetIndexResponse> getIndex(HttpHeaders headers, org.elasticsearch.client.indices.GetIndexRequest getIndexRequest) {
|
|
|
+ return sendRequest(getIndexRequest, requestCreator.getIndex(), GetIndexResponse.class, headers).next();
|
|
|
+ }
|
|
|
+
|
|
|
Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {
|
|
|
Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
|
|
|
Params params = new Params(request);
|
|
@@ -1119,17 +1282,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|
|
return version;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) {
|
|
|
+ return sendRequest(clusterHealthRequest, requestCreator.clusterHealth(), ClusterHealthResponse.class, headers)
|
|
|
+ .next();
|
|
|
+ }
|
|
|
+
|
|
|
// endregion
|
|
|
|
|
|
// region internal classes
|
|
|
|
|
|
/**
|
|
|
- * Reactive client {@link ReactiveElasticsearchClient.Status} implementation.
|
|
|
+ * Reactive client {@link Status} implementation.
|
|
|
*
|
|
|
* @author Christoph Strobl
|
|
|
*/
|
|
|
@Generated
|
|
|
- static class ClientStatus implements ReactiveElasticsearchClient.Status {
|
|
|
+ static class ClientStatus implements Status {
|
|
|
|
|
|
private final Collection<ElasticsearchHost> connectedHosts;
|
|
|
|