|
@@ -1,13 +1,9 @@
|
|
|
package org.jetlinks.community.elastic.search.service;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Getter;
|
|
|
-import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.elasticsearch.ElasticsearchException;
|
|
|
-import org.elasticsearch.ElasticsearchParseException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
@@ -19,26 +15,31 @@ import org.elasticsearch.client.core.CountRequest;
|
|
|
import org.elasticsearch.client.core.CountResponse;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.hswebframework.ezorm.core.param.QueryParam;
|
|
|
+import org.hswebframework.utils.time.DateFormatter;
|
|
|
+import org.hswebframework.utils.time.DefaultDateFormatter;
|
|
|
import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
import org.jetlinks.core.utils.FluxUtils;
|
|
|
import org.jetlinks.community.elastic.search.ElasticRestClient;
|
|
|
-import org.jetlinks.community.elastic.search.index.ElasticIndex;
|
|
|
-import org.jetlinks.community.elastic.search.index.mapping.IndexMappingMetadata;
|
|
|
-import org.jetlinks.community.elastic.search.parser.QueryParamTranslateService;
|
|
|
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
|
|
|
+import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
|
|
|
+import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-import org.springframework.util.StringUtils;
|
|
|
-import reactor.core.publisher.*;
|
|
|
-import reactor.core.scheduler.Schedulers;
|
|
|
+import reactor.core.publisher.BufferOverflowStrategy;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.FluxSink;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+import reactor.util.function.Tuples;
|
|
|
|
|
|
import javax.annotation.PreDestroy;
|
|
|
import java.time.Duration;
|
|
|
import java.util.*;
|
|
|
import java.util.function.Function;
|
|
|
+import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
- * @author bsetfeng
|
|
|
+ * @author zhouhao
|
|
|
* @since 1.0
|
|
|
**/
|
|
|
@Service
|
|
@@ -47,26 +48,35 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
private final ElasticRestClient restClient;
|
|
|
|
|
|
- private final IndexOperationService indexOperationService;
|
|
|
-
|
|
|
- private final QueryParamTranslateService translateService;
|
|
|
+ private final ElasticSearchIndexManager indexManager;
|
|
|
|
|
|
FluxSink<Buffer> sink;
|
|
|
|
|
|
+ static {
|
|
|
+ DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
|
|
|
+ }
|
|
|
+
|
|
|
public DefaultElasticSearchService(ElasticRestClient restClient,
|
|
|
- QueryParamTranslateService translateService,
|
|
|
- IndexOperationService indexOperationService) {
|
|
|
+ ElasticSearchIndexManager indexManager) {
|
|
|
this.restClient = restClient;
|
|
|
- this.translateService = translateService;
|
|
|
- this.indexOperationService = indexOperationService;
|
|
|
init();
|
|
|
+ this.indexManager = indexManager;
|
|
|
}
|
|
|
|
|
|
|
|
|
+ public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
+ return doSearch(createSearchRequest(queryParam, index))
|
|
|
+ .flatMapIterable(response -> translate(mapper, response))
|
|
|
+ .onErrorResume(err -> {
|
|
|
+ log.error("query elastic error", err);
|
|
|
+ return Mono.empty();
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
- public <T> Mono<PagerResult<T>> queryPager(ElasticIndex index, QueryParam queryParam, Class<T> type) {
|
|
|
- return query(searchRequestStructure(queryParam, index))
|
|
|
- .map(response -> translatePageResult(type, queryParam, response))
|
|
|
+ public <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
+ return doSearch(createSearchRequest(queryParam, index))
|
|
|
+ .map(response -> translatePageResult(mapper, queryParam, response))
|
|
|
.switchIfEmpty(Mono.just(PagerResult.empty()))
|
|
|
.onErrorReturn(err -> {
|
|
|
log.error("query elastic error", err);
|
|
@@ -75,18 +85,10 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public <T> Flux<T> query(ElasticIndex index, QueryParam queryParam, Class<T> type) {
|
|
|
- return query(searchRequestStructure(queryParam, index))
|
|
|
- .flatMapIterable(response -> translate(type, response))
|
|
|
- .onErrorResume(err -> {
|
|
|
- log.error("query elastic error", err);
|
|
|
- return Flux.empty();
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Mono<Long> count(ElasticIndex index, QueryParam queryParam) {
|
|
|
- return countQuery(countRequestStructure(queryParam, index))
|
|
|
+ public Mono<Long> count(String index, QueryParam queryParam) {
|
|
|
+ QueryParam param=queryParam.clone();
|
|
|
+ param.setPaging(false);
|
|
|
+ return doCount(createCountRequest(param, index))
|
|
|
.map(CountResponse::getCount)
|
|
|
.defaultIfEmpty(0L)
|
|
|
.onErrorReturn(err -> {
|
|
@@ -95,16 +97,15 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
}, 0L);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
- public <T> Mono<Void> commit(ElasticIndex index, T payload) {
|
|
|
+ public <T> Mono<Void> commit(String index, T payload) {
|
|
|
return Mono.fromRunnable(() -> {
|
|
|
sink.next(new Buffer(index, payload));
|
|
|
});
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public <T> Mono<Void> commit(ElasticIndex index, Collection<T> payload) {
|
|
|
+ public <T> Mono<Void> commit(String index, Collection<T> payload) {
|
|
|
return Mono.fromRunnable(() -> {
|
|
|
for (T t : payload) {
|
|
|
sink.next(new Buffer(index, t));
|
|
@@ -113,7 +114,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public <T> Mono<Void> commit(ElasticIndex index, Publisher<T> data) {
|
|
|
+ public <T> Mono<Void> commit(String index, Publisher<T> data) {
|
|
|
return Flux.from(data)
|
|
|
.flatMap(d -> commit(index, d))
|
|
|
.then();
|
|
@@ -148,34 +149,51 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
@AllArgsConstructor
|
|
|
@Getter
|
|
|
static class Buffer {
|
|
|
- ElasticIndex index;
|
|
|
+ String index;
|
|
|
Object payload;
|
|
|
}
|
|
|
|
|
|
|
|
|
+ private Mono<String> getIndexForSave(String index) {
|
|
|
+ return indexManager
|
|
|
+ .getIndexStrategy(index)
|
|
|
+ .map(strategy -> strategy.getIndexForSave(index));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<String> getIndexForSearch(String index) {
|
|
|
+ return indexManager
|
|
|
+ .getIndexStrategy(index)
|
|
|
+ .map(strategy -> strategy.getIndexForSearch(index));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
|
|
|
return Flux.fromIterable(buffers)
|
|
|
.collect(Collectors.groupingBy(Buffer::getIndex))
|
|
|
- .map(Map::entrySet)
|
|
|
- .flatMapIterable(Function.identity())
|
|
|
+ .flatMapMany(map -> Flux
|
|
|
+ .fromIterable(map.entrySet())
|
|
|
+ .flatMap(e ->
|
|
|
+ this.getIndexForSave(e.getKey())
|
|
|
+ .zipWith(indexManager.getIndexMetadata(e.getKey()), (index, metadata) -> Tuples.of(index, metadata, e.getValue()))))
|
|
|
.map(entry -> {
|
|
|
- ElasticIndex index = entry.getKey();
|
|
|
- BulkRequest bulkRequest = new BulkRequest(index.getStandardIndex(), index.getStandardType());
|
|
|
- for (Buffer buffer : entry.getValue()) {
|
|
|
+ String index = entry.getT1();
|
|
|
+ BulkRequest bulkRequest = new BulkRequest(index, "_doc");
|
|
|
+ for (Buffer buffer : entry.getT3()) {
|
|
|
IndexRequest request = new IndexRequest();
|
|
|
Object o = JSON.toJSON(buffer.getPayload());
|
|
|
if (o instanceof Map) {
|
|
|
- request.source((Map) o);
|
|
|
+ request.source(ElasticSearchConverter.convertDataToElastic((Map<String, Object>) o, entry.getT2().getProperties()));
|
|
|
} else {
|
|
|
request.source(o.toString(), XContentType.JSON);
|
|
|
}
|
|
|
bulkRequest.add(request);
|
|
|
}
|
|
|
- entry.getValue().clear();
|
|
|
+ entry.getT3().clear();
|
|
|
return bulkRequest;
|
|
|
})
|
|
|
.flatMap(bulkRequest ->
|
|
|
- Mono.<Integer>create(sink ->
|
|
|
+ Mono.create(sink ->
|
|
|
restClient.getWriteClient()
|
|
|
.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
|
|
|
@Override
|
|
@@ -184,7 +202,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
sink.error(new RuntimeException("保存ElasticSearch数据失败:" + responses.buildFailureMessage()));
|
|
|
return;
|
|
|
}
|
|
|
- sink.success(buffers.size());
|
|
|
+ sink.success(1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -192,97 +210,68 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
|
sink.error(e);
|
|
|
}
|
|
|
})))
|
|
|
- .collect(Collectors.summingInt(Integer::intValue));
|
|
|
+ .then(Mono.just(buffers.size()));
|
|
|
}
|
|
|
|
|
|
- private <T> PagerResult<T> translatePageResult(Class<T> clazz, QueryParam param, SearchResponse response) {
|
|
|
+ private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {
|
|
|
long total = response.getHits().getTotalHits();
|
|
|
- return PagerResult.of((int) total, translate(clazz, response), param);
|
|
|
+ return PagerResult.of((int) total, translate(mapper, response), param);
|
|
|
}
|
|
|
|
|
|
- private <T> List<T> translate(Class<T> clazz, SearchResponse response) {
|
|
|
- // TODO: 2020/1/18 临时代码
|
|
|
+ private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
|
|
|
return Arrays.stream(response.getHits().getHits())
|
|
|
.map(hit -> {
|
|
|
Map<String, Object> hitMap = hit.getSourceAsMap();
|
|
|
hitMap.put("id", hit.getId());
|
|
|
- return JSON.toJavaObject(new JSONObject(hitMap), clazz);
|
|
|
+ return mapper.apply(hitMap);
|
|
|
})
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
- private Mono<SearchResponse> query(Mono<SearchRequest> requestMono) {
|
|
|
- return requestMono.<SearchResponse>flatMap((request) ->
|
|
|
- Mono.create(sink -> restClient
|
|
|
- .getQueryClient()
|
|
|
- .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink))))
|
|
|
+ private Mono<SearchResponse> doSearch(Mono<SearchRequest> requestMono) {
|
|
|
+ return requestMono.flatMap((request) ->
|
|
|
+ ReactorActionListener
|
|
|
+ .<SearchResponse>mono(listener ->
|
|
|
+ restClient
|
|
|
+ .getQueryClient()
|
|
|
+ .searchAsync(request, RequestOptions.DEFAULT, listener)))
|
|
|
.onErrorResume(err -> {
|
|
|
log.error("query elastic error", err);
|
|
|
return Mono.empty();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Mono<CountResponse> countQuery(Mono<CountRequest> requestMono) {
|
|
|
- return requestMono.<CountResponse>flatMap((request) ->
|
|
|
- Mono.create(sink -> restClient
|
|
|
- .getQueryClient()
|
|
|
- .countAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink))))
|
|
|
+ private Mono<CountResponse> doCount(Mono<CountRequest> requestMono) {
|
|
|
+ return requestMono.flatMap((request) ->
|
|
|
+ ReactorActionListener
|
|
|
+ .<CountResponse>mono(listener ->
|
|
|
+ restClient
|
|
|
+ .getQueryClient()
|
|
|
+ .countAsync(request, RequestOptions.DEFAULT, listener)))
|
|
|
.onErrorResume(err -> {
|
|
|
log.error("query elastic error", err);
|
|
|
return Mono.empty();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private <T> ActionListener<T> translatorActionListener(MonoSink<T> sink) {
|
|
|
- return new ActionListener<T>() {
|
|
|
- @Override
|
|
|
- public void onResponse(T response) {
|
|
|
- sink.success(response);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- if (e instanceof ElasticsearchException) {
|
|
|
- if (((ElasticsearchException) e).status().getStatus() == 404) {
|
|
|
- sink.success();
|
|
|
- return;
|
|
|
- } else if (((ElasticsearchException) e).status().getStatus() == 400) {
|
|
|
- sink.error(new ElasticsearchParseException("查询参数格式错误", e));
|
|
|
- }
|
|
|
- }
|
|
|
- sink.error(e);
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- private Mono<SearchRequest> searchRequestStructure(QueryParam queryParam, ElasticIndex provider) {
|
|
|
- return indexOperationService.getIndexMappingMetadata(provider.getStandardIndex())
|
|
|
- .switchIfEmpty(Mono.just(IndexMappingMetadata.getInstance(provider.getStandardIndex())))
|
|
|
- .map(metadata -> {
|
|
|
- SearchRequest request = new SearchRequest(provider.getStandardIndex())
|
|
|
- .source(translateService.translate(queryParam, metadata));
|
|
|
- if (StringUtils.hasText(provider.getStandardType())) {
|
|
|
- request.types(provider.getStandardType());
|
|
|
- }
|
|
|
- return request;
|
|
|
- })
|
|
|
- .doOnNext(searchRequest -> log.debug("查询index:{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString()))
|
|
|
- .onErrorResume(err -> {
|
|
|
- log.error("query index error", err);
|
|
|
- return Mono.empty();
|
|
|
- });
|
|
|
+ private Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String index) {
|
|
|
+ return indexManager
|
|
|
+ .getIndexMetadata(index)
|
|
|
+ .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata))
|
|
|
+ .switchIfEmpty(Mono.fromSupplier(() -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, null)))
|
|
|
+ .flatMap(builder -> this.getIndexForSearch(index)
|
|
|
+ .map(idx -> new SearchRequest(idx).source(builder).types("_doc")));
|
|
|
}
|
|
|
|
|
|
- private Mono<CountRequest> countRequestStructure(QueryParam queryParam, ElasticIndex provider) {
|
|
|
+ private Mono<CountRequest> createCountRequest(QueryParam queryParam, String index) {
|
|
|
QueryParam tempQueryParam = queryParam.clone();
|
|
|
tempQueryParam.setPaging(false);
|
|
|
tempQueryParam.setSorts(Collections.emptyList());
|
|
|
- return indexOperationService.getIndexMappingMetadata(provider.getStandardIndex())
|
|
|
- .map(metadata -> new CountRequest(provider.getStandardIndex())
|
|
|
- .source(translateService.translate(tempQueryParam, metadata)))
|
|
|
- .onErrorResume(err -> {
|
|
|
- log.error("query index error", err);
|
|
|
- return Mono.empty();
|
|
|
- });
|
|
|
+ return indexManager
|
|
|
+ .getIndexMetadata(index)
|
|
|
+ .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata))
|
|
|
+ .switchIfEmpty(Mono.fromSupplier(() -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, null)))
|
|
|
+ .flatMap(builder -> this.getIndexForSearch(index)
|
|
|
+ .map(idx -> new CountRequest(idx).source(builder)));
|
|
|
}
|
|
|
}
|