|
@@ -22,12 +22,16 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.search.sort.SortBuilders;
|
|
|
import org.elasticsearch.search.sort.SortOrder;
|
|
|
import org.hswebframework.ezorm.core.param.QueryParam;
|
|
|
+import org.hswebframework.ezorm.core.param.Term;
|
|
|
import org.hswebframework.ezorm.core.param.TermType;
|
|
|
+import org.jetlinks.community.Interval;
|
|
|
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
|
|
|
import org.jetlinks.community.elastic.search.service.AggregationService;
|
|
|
import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
|
|
|
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
|
|
|
import org.jetlinks.community.timeseries.query.*;
|
|
|
+import org.jetlinks.core.metadata.types.DateTimeType;
|
|
|
+import org.jetlinks.reactor.ql.utils.CastUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
@@ -35,13 +39,14 @@ import org.springframework.util.StringUtils;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import java.time.Duration;
|
|
|
import java.time.ZoneId;
|
|
|
import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
- * @author bsetfeng
|
|
|
- * @since 1.0
|
|
|
+ * @author zhouhao
|
|
|
+ * @since 1.5
|
|
|
**/
|
|
|
@Service
|
|
|
@Slf4j
|
|
@@ -79,13 +84,17 @@ public class ReactiveAggregationService implements AggregationService {
|
|
|
}
|
|
|
builder.format(format);
|
|
|
}
|
|
|
+ builder.timeZone(ZoneId.systemDefault());
|
|
|
builder.order(BucketOrder.key(false));
|
|
|
if (timeGroup.getInterval() != null) {
|
|
|
if (restClient.serverVersion().after(Version.V_7_2_0)) {
|
|
|
- if (timeGroup.getInterval().isFixed()) {
|
|
|
+ Interval interval = timeGroup.getInterval();
|
|
|
+ if (interval.isFixed()) {
|
|
|
builder.fixedInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
|
|
|
- } else {
|
|
|
+ } else if (interval.isCalendar()) {
|
|
|
builder.calendarInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
|
|
|
+ } else {
|
|
|
+ builder.dateHistogramInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
|
|
|
}
|
|
|
} else {
|
|
|
builder.dateHistogramInterval(new DateHistogramInterval(timeGroup.getInterval().toString()));
|
|
@@ -95,7 +104,6 @@ public class ReactiveAggregationService implements AggregationService {
|
|
|
builder.extendedBounds(getExtendedBounds(param));
|
|
|
// builder.missing("");
|
|
|
|
|
|
- builder.timeZone(ZoneId.systemDefault());
|
|
|
return builder;
|
|
|
} else {
|
|
|
TermsAggregationBuilder builder = AggregationBuilders
|
|
@@ -148,7 +156,7 @@ public class ReactiveAggregationService implements AggregationService {
|
|
|
}
|
|
|
if (aggColumn instanceof LimitAggregationColumn) {
|
|
|
topHitsBuilder.size(((LimitAggregationColumn) aggColumn).getLimit());
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
topHitsBuilder.size(1);
|
|
|
}
|
|
|
}
|
|
@@ -250,7 +258,15 @@ public class ReactiveAggregationService implements AggregationService {
|
|
|
protected static QueryParam prepareQueryParam(AggregationQueryParam param) {
|
|
|
QueryParam queryParam = param.getQueryParam().clone();
|
|
|
queryParam.setPaging(false);
|
|
|
- queryParam.and(param.getTimeProperty(), TermType.btw, Arrays.asList(calculateStartWithTime(param), param.getEndWithTime()));
|
|
|
+ boolean hasTimestamp = false;
|
|
|
+ for (Term term : queryParam.getTerms()) {
|
|
|
+ if (param.getTimeProperty().equals(term.getColumn())) {
|
|
|
+ hasTimestamp = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!hasTimestamp) {
|
|
|
+ queryParam.and(param.getTimeProperty(), TermType.btw, Arrays.asList(calculateStartWithTime(param), param.getEndWithTime()));
|
|
|
+ }
|
|
|
if (queryParam.getSorts().isEmpty()) {
|
|
|
queryParam.orderBy(param.getTimeProperty()).desc();
|
|
|
}
|
|
@@ -258,17 +274,36 @@ public class ReactiveAggregationService implements AggregationService {
|
|
|
}
|
|
|
|
|
|
protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) {
|
|
|
+
|
|
|
return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime());
|
|
|
}
|
|
|
|
|
|
+ //聚合查询默认的时间间隔
|
|
|
+ static long thirtyDayMillis = Duration.ofDays(Integer.getInteger("elasticsearch.agg.default-range-day", 90)).toMillis();
|
|
|
+
|
|
|
private static long calculateStartWithTime(AggregationQueryParam param) {
|
|
|
long startWithParam = param.getStartWithTime();
|
|
|
+ if (startWithParam == 0) {
|
|
|
+ //从查询条件中提取时间参数来获取时间区间
|
|
|
+ List<Term> terms = param.getQueryParam().getTerms();
|
|
|
+ for (Term term : terms) {
|
|
|
+ if ("timestamp".equals(term.getColumn())) {
|
|
|
+ Object value = term.getValue();
|
|
|
+ String termType = term.getTermType();
|
|
|
+ if (TermType.btw.equals(termType)) {
|
|
|
+ if (String.valueOf(value).contains(",")) {
|
|
|
+ value = Arrays.asList(String.valueOf(value).split(","));
|
|
|
+ }
|
|
|
+ return DateTimeType.GLOBAL.convert(CastUtils.castArray(value).get(0)).getTime();
|
|
|
+ }
|
|
|
+ if (TermType.gt.equals(termType) || TermType.gte.equals(termType)) {
|
|
|
|
|
|
-// if (param.getGroupByTime() != nullcalculateStartWithTime(param) && param.getGroupByTime().getInterval() != null) {
|
|
|
-// long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit();
|
|
|
-// long tempStartWithParam = param.getEndWithTime() - timeInterval;
|
|
|
-// startWithParam = Math.max(tempStartWithParam, startWithParam);
|
|
|
-// }
|
|
|
+ return DateTimeType.GLOBAL.convert(value).getTime();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return param.getEndWithTime() - thirtyDayMillis;
|
|
|
+ }
|
|
|
return startWithParam;
|
|
|
}
|
|
|
|