|
@@ -27,12 +27,12 @@ import org.hswebframework.utils.time.DateFormatter;
|
|
|
import org.hswebframework.utils.time.DefaultDateFormatter;
|
|
|
import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
import org.hswebframework.web.bean.FastBeanCopier;
|
|
|
-import org.jetlinks.core.utils.FluxUtils;
|
|
|
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
|
|
|
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
|
|
|
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
|
|
|
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
|
|
|
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
|
|
|
+import org.jetlinks.core.utils.FluxUtils;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
|
import org.springframework.context.annotation.DependsOn;
|
|
@@ -57,6 +57,8 @@ import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
+ * 响应式ES数据库操作类
|
|
|
+ *
|
|
|
* @author zhouhao
|
|
|
* @since 1.0
|
|
|
**/
|
|
@@ -66,20 +68,23 @@ import java.util.stream.Collectors;
|
|
|
@ConfigurationProperties(prefix = "elasticsearch")
|
|
|
public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
- private final ReactiveElasticsearchClient restClient;
|
|
|
-
|
|
|
- private final ElasticSearchIndexManager indexManager;
|
|
|
-
|
|
|
- private FluxSink<Buffer> sink;
|
|
|
-
|
|
|
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
|
|
|
true, true, false, false
|
|
|
);
|
|
|
+ //使用对象池处理Buffer,减少GC消耗
|
|
|
+ static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
|
|
|
|
|
|
static {
|
|
|
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
|
|
|
}
|
|
|
|
|
|
+ private final ReactiveElasticsearchClient restClient;
|
|
|
+ private final ElasticSearchIndexManager indexManager;
|
|
|
+ private FluxSink<Buffer> sink;
|
|
|
+ @Getter
|
|
|
+ @Setter
|
|
|
+ private BufferConfig buffer = new BufferConfig();
|
|
|
+
|
|
|
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
|
|
|
ElasticSearchIndexManager indexManager) {
|
|
|
this.restClient = restClient;
|
|
@@ -119,12 +124,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
return this
|
|
|
.doQuery(new String[]{index}, queryParam)
|
|
|
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
return this
|
|
|
.doQuery(index, queryParam)
|
|
@@ -134,16 +141,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
@Override
|
|
|
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
return this.doQuery(index, queryParam)
|
|
|
- .flatMap(tp2 -> this
|
|
|
- .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
|
|
|
- .collectList()
|
|
|
- .filter(CollectionUtils::isNotEmpty)
|
|
|
- .map(list -> PagerResult.of((int) tp2
|
|
|
- .getT2()
|
|
|
- .getHits()
|
|
|
- .getTotalHits().value, list, queryParam))
|
|
|
- )
|
|
|
- .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
|
|
|
+ .flatMap(tp2 -> this
|
|
|
+ .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
|
|
|
+ .collectList()
|
|
|
+ .filter(CollectionUtils::isNotEmpty)
|
|
|
+ .map(list -> PagerResult.of((int) tp2
|
|
|
+ .getT2()
|
|
|
+ .getHits()
|
|
|
+ .getTotalHits().value, list, queryParam))
|
|
|
+ )
|
|
|
+ .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
|
|
|
}
|
|
|
|
|
|
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
|
|
@@ -162,8 +169,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
}
|
|
|
return mapper
|
|
|
.apply(Optional
|
|
|
- .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
|
|
|
- .convertFromElastic(hitMap));
|
|
|
+ .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
|
|
|
+ .convertFromElastic(hitMap));
|
|
|
});
|
|
|
|
|
|
}
|
|
@@ -184,7 +191,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
public Mono<Long> count(String[] index, QueryParam queryParam) {
|
|
|
QueryParam param = queryParam.clone();
|
|
@@ -223,8 +229,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
@Override
|
|
|
public <T> Mono<Void> commit(String index, Publisher<T> data) {
|
|
|
return Flux.from(data)
|
|
|
- .flatMap(d -> commit(index, d))
|
|
|
- .then();
|
|
|
+ .flatMap(d -> commit(index, d))
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -235,10 +241,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
@Override
|
|
|
public <T> Mono<Void> save(String index, Publisher<T> data) {
|
|
|
return Flux.from(data)
|
|
|
- .map(v -> Buffer.of(index, v))
|
|
|
- .collectList()
|
|
|
- .flatMap(this::doSave)
|
|
|
- .then();
|
|
|
+ .map(v -> Buffer.of(index, v))
|
|
|
+ .collectList()
|
|
|
+ .flatMap(this::doSave)
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -251,32 +257,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
sink.complete();
|
|
|
}
|
|
|
|
|
|
- @Getter
|
|
|
- @Setter
|
|
|
- private BufferConfig buffer = new BufferConfig();
|
|
|
-
|
|
|
- @Getter
|
|
|
- @Setter
|
|
|
- public static class BufferConfig {
|
|
|
- //最小间隔
|
|
|
- private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
|
|
|
- //缓冲最大数量
|
|
|
- private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
|
|
|
- //缓冲超时时间
|
|
|
- private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
|
|
|
- //背压堆积数量限制.
|
|
|
- private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
|
|
|
- .getRuntime()
|
|
|
- .availableProcessors());
|
|
|
- //最大缓冲字节
|
|
|
- private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
|
|
|
-
|
|
|
- //最大重试次数
|
|
|
- private int maxRetry = 3;
|
|
|
- //重试间隔
|
|
|
- private Duration minBackoff = Duration.ofSeconds(3);
|
|
|
- }
|
|
|
-
|
|
|
//@PostConstruct
|
|
|
public void init() {
|
|
|
int flushRate = buffer.rate;
|
|
@@ -288,10 +268,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
FluxUtils
|
|
|
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
|
|
|
- flushRate,
|
|
|
- bufferSize,
|
|
|
- bufferTimeout,
|
|
|
- (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
|
|
|
+ flushRate,
|
|
|
+ bufferSize,
|
|
|
+ bufferTimeout,
|
|
|
+ (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
|
|
|
.doOnNext(buf -> bufferedBytes.set(0))
|
|
|
.onBackpressureBuffer(bufferBackpressure, drop -> {
|
|
|
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
|
|
@@ -319,56 +299,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
})
|
|
|
.onErrorResume((err) -> Mono
|
|
|
.fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
|
|
|
- org.hswebframework.utils.StringUtils.throwable2String(err))))
|
|
|
+ org.hswebframework.utils.StringUtils.throwable2String(err))))
|
|
|
.subscribe();
|
|
|
}
|
|
|
|
|
|
- //使用对象池处理Buffer,减少GC消耗
|
|
|
- static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
|
|
|
-
|
|
|
- @Getter
|
|
|
- static class Buffer {
|
|
|
- String index;
|
|
|
- String id;
|
|
|
- String payload;
|
|
|
- final ObjectPool.Handle<Buffer> handle;
|
|
|
-
|
|
|
- public Buffer(ObjectPool.Handle<Buffer> handle) {
|
|
|
- this.handle = handle;
|
|
|
- }
|
|
|
-
|
|
|
- public static Buffer of(String index, Object payload) {
|
|
|
- Buffer buffer;
|
|
|
- try {
|
|
|
- buffer = pool.get();
|
|
|
- } catch (Exception e) {
|
|
|
- buffer = new Buffer(null);
|
|
|
- }
|
|
|
- buffer.index = index;
|
|
|
- Map<String, Object> data = payload instanceof Map
|
|
|
- ? ((Map) payload) :
|
|
|
- FastBeanCopier.copy(payload, HashMap::new);
|
|
|
- Object id = data.get("id");
|
|
|
- buffer.id = id == null ? null : String.valueOf(id);
|
|
|
- buffer.payload = JSON.toJSONString(data);
|
|
|
- return buffer;
|
|
|
- }
|
|
|
-
|
|
|
- void release() {
|
|
|
- this.index = null;
|
|
|
- this.id = null;
|
|
|
- this.payload = null;
|
|
|
- if (null != handle) {
|
|
|
- handle.recycle(this);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- int numberOfBytes() {
|
|
|
- return payload == null ? 0 : payload.length() * 2;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
private Mono<String> getIndexForSave(String index) {
|
|
|
return indexManager
|
|
|
.getIndexStrategy(index)
|
|
@@ -385,48 +319,48 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
|
|
|
return Flux.fromIterable(buffers)
|
|
|
- .groupBy(Buffer::getIndex,Integer.MAX_VALUE)
|
|
|
- .flatMap(group -> {
|
|
|
- String index = group.key();
|
|
|
- return this
|
|
|
- .getIndexForSave(index)
|
|
|
- .flatMapMany(realIndex -> group
|
|
|
- .map(buffer -> {
|
|
|
- try {
|
|
|
- IndexRequest request;
|
|
|
- if (buffer.id != null) {
|
|
|
- request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
|
|
|
- } else {
|
|
|
- request = new IndexRequest(realIndex).type("_doc");
|
|
|
- }
|
|
|
- request.source(buffer.payload, XContentType.JSON);
|
|
|
- return request;
|
|
|
- } finally {
|
|
|
- buffer.release();
|
|
|
- }
|
|
|
- }));
|
|
|
- })
|
|
|
- .collectList()
|
|
|
- .filter(CollectionUtils::isNotEmpty)
|
|
|
- .flatMap(lst -> {
|
|
|
- BulkRequest request = new BulkRequest();
|
|
|
- request.timeout(TimeValue.timeValueSeconds(9));
|
|
|
- lst.forEach(request::add);
|
|
|
- return restClient
|
|
|
- .bulk(request)
|
|
|
- .as(save -> {
|
|
|
- if (buffer.maxRetry > 0) {
|
|
|
- return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
|
|
|
- }
|
|
|
- return save;
|
|
|
- });
|
|
|
- })
|
|
|
- .doOnNext(response -> {
|
|
|
- if (response.hasFailures()) {
|
|
|
- System.err.println(response.buildFailureMessage());
|
|
|
- }
|
|
|
- })
|
|
|
- .thenReturn(buffers.size());
|
|
|
+ .groupBy(Buffer::getIndex, Integer.MAX_VALUE)
|
|
|
+ .flatMap(group -> {
|
|
|
+ String index = group.key();
|
|
|
+ return this
|
|
|
+ .getIndexForSave(index)
|
|
|
+ .flatMapMany(realIndex -> group
|
|
|
+ .map(buffer -> {
|
|
|
+ try {
|
|
|
+ IndexRequest request;
|
|
|
+ if (buffer.id != null) {
|
|
|
+ request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
|
|
|
+ } else {
|
|
|
+ request = new IndexRequest(realIndex).type("_doc");
|
|
|
+ }
|
|
|
+ request.source(buffer.payload, XContentType.JSON);
|
|
|
+ return request;
|
|
|
+ } finally {
|
|
|
+ buffer.release();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ })
|
|
|
+ .collectList()
|
|
|
+ .filter(CollectionUtils::isNotEmpty)
|
|
|
+ .flatMap(lst -> {
|
|
|
+ BulkRequest request = new BulkRequest();
|
|
|
+ request.timeout(TimeValue.timeValueSeconds(9));
|
|
|
+ lst.forEach(request::add);
|
|
|
+ return restClient
|
|
|
+ .bulk(request)
|
|
|
+ .as(save -> {
|
|
|
+ if (buffer.maxRetry > 0) {
|
|
|
+ return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
|
|
|
+ }
|
|
|
+ return save;
|
|
|
+ });
|
|
|
+ })
|
|
|
+ .doOnNext(response -> {
|
|
|
+ if (response.hasFailures()) {
|
|
|
+ System.err.println(response.buildFailureMessage());
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .thenReturn(buffers.size());
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -442,14 +376,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
|
|
|
return Arrays.stream(response.getHits().getHits())
|
|
|
- .map(hit -> {
|
|
|
- Map<String, Object> hitMap = hit.getSourceAsMap();
|
|
|
- if (StringUtils.isEmpty(hitMap.get("id"))) {
|
|
|
- hitMap.put("id", hit.getId());
|
|
|
- }
|
|
|
- return mapper.apply(hitMap);
|
|
|
- })
|
|
|
- .collect(Collectors.toList());
|
|
|
+ .map(hit -> {
|
|
|
+ Map<String, Object> hitMap = hit.getSourceAsMap();
|
|
|
+ if (StringUtils.isEmpty(hitMap.get("id"))) {
|
|
|
+ hitMap.put("id", hit.getId());
|
|
|
+ }
|
|
|
+ return mapper.apply(hitMap);
|
|
|
+ })
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
private Flux<SearchHit> doSearch(SearchRequest request) {
|
|
@@ -482,12 +416,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
|
|
|
return Flux.fromIterable(indexes)
|
|
|
- .flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
- .collectList()
|
|
|
- .map(indexList ->
|
|
|
- new SearchRequest(indexList.toArray(new String[0]))
|
|
|
- .source(builder)
|
|
|
- .indicesOptions(indexOptions));
|
|
|
+ .flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
+ .collectList()
|
|
|
+ .map(indexList ->
|
|
|
+ new SearchRequest(indexList.toArray(new String[0]))
|
|
|
+ .source(builder)
|
|
|
+ .indicesOptions(indexOptions));
|
|
|
}
|
|
|
|
|
|
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
|
|
@@ -504,9 +438,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
|
|
|
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
|
|
|
return Flux.fromIterable(indexes)
|
|
|
- .flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
- .collectList()
|
|
|
- .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
|
|
|
+ .flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
+ .collectList()
|
|
|
+ .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
|
|
|
}
|
|
|
|
|
|
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
|
|
@@ -516,4 +450,68 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
.filter(CollectionUtils::isNotEmpty)
|
|
|
.flatMap(list -> createCountRequest(queryParam, list));
|
|
|
}
|
|
|
+
|
|
|
+ @Getter
|
|
|
+ @Setter
|
|
|
+ public static class BufferConfig {
|
|
|
+ //最小间隔
|
|
|
+ private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
|
|
|
+ //缓冲最大数量
|
|
|
+ private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
|
|
|
+ //缓冲超时时间
|
|
|
+ private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
|
|
|
+ //背压堆积数量限制.
|
|
|
+ private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
|
|
|
+ .getRuntime()
|
|
|
+ .availableProcessors());
|
|
|
+ //最大缓冲字节
|
|
|
+ private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
|
|
|
+
|
|
|
+ //最大重试次数
|
|
|
+ private int maxRetry = 3;
|
|
|
+ //重试间隔
|
|
|
+ private Duration minBackoff = Duration.ofSeconds(3);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Getter
|
|
|
+ static class Buffer {
|
|
|
+ final ObjectPool.Handle<Buffer> handle;
|
|
|
+ String index;
|
|
|
+ String id;
|
|
|
+ String payload;
|
|
|
+
|
|
|
+ public Buffer(ObjectPool.Handle<Buffer> handle) {
|
|
|
+ this.handle = handle;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Buffer of(String index, Object payload) {
|
|
|
+ Buffer buffer;
|
|
|
+ try {
|
|
|
+ buffer = pool.get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ buffer = new Buffer(null);
|
|
|
+ }
|
|
|
+ buffer.index = index;
|
|
|
+ Map<String, Object> data = payload instanceof Map
|
|
|
+ ? ((Map) payload) :
|
|
|
+ FastBeanCopier.copy(payload, HashMap::new);
|
|
|
+ Object id = data.get("id");
|
|
|
+ buffer.id = id == null ? null : String.valueOf(id);
|
|
|
+ buffer.payload = JSON.toJSONString(data);
|
|
|
+ return buffer;
|
|
|
+ }
|
|
|
+
|
|
|
+ void release() {
|
|
|
+ this.index = null;
|
|
|
+ this.id = null;
|
|
|
+ this.payload = null;
|
|
|
+ if (null != handle) {
|
|
|
+ handle.recycle(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int numberOfBytes() {
|
|
|
+ return payload == null ? 0 : payload.length() * 2;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|