|
@@ -10,6 +10,7 @@ import org.jetlinks.community.timeseries.TimeSeriesData;
|
|
|
import org.jetlinks.community.timeseries.TimeSeriesService;
|
|
|
import org.jetlinks.community.timeseries.query.AggregationData;
|
|
|
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
|
|
|
+import org.jetlinks.core.metadata.types.DateTimeType;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -27,9 +28,11 @@ public class ElasticSearchTimeSeriesService implements TimeSeriesService {
|
|
|
|
|
|
private AggregationService aggregationService;
|
|
|
|
|
|
+ static DateTimeType timeType=new DateTimeType();
|
|
|
+
|
|
|
@Override
|
|
|
public Flux<TimeSeriesData> query(QueryParam queryParam) {
|
|
|
- return elasticSearchService.query(index, queryParam, map -> TimeSeriesData.of((Long) map.get("timestamp"), map));
|
|
|
+ return elasticSearchService.query(index, queryParam, map -> TimeSeriesData.of(timeType.convert(map.get("timestamp")), map));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -39,6 +42,16 @@ public class ElasticSearchTimeSeriesService implements TimeSeriesService {
|
|
|
.map(Long::intValue);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<PagerResult<TimeSeriesData>> queryPager(QueryParam queryParam) {
|
|
|
+ return elasticSearchService.queryPager(index, queryParam, map -> TimeSeriesData.of(timeType.convert(map.get("timestamp")), map));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> Mono<PagerResult<T>> queryPager(QueryParam queryParam, Function<TimeSeriesData, T> mapper) {
|
|
|
+ return elasticSearchService.queryPager(index, queryParam, map -> mapper.apply(TimeSeriesData.of(timeType.convert(map.get("timestamp")), map)));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public Flux<AggregationData> aggregation(AggregationQueryParam queryParam) {
|
|
|
return aggregationService
|
|
@@ -51,17 +64,6 @@ public class ElasticSearchTimeSeriesService implements TimeSeriesService {
|
|
|
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Mono<PagerResult<TimeSeriesData>> queryPager(QueryParam queryParam) {
|
|
|
- return elasticSearchService.queryPager(index, queryParam, map -> TimeSeriesData.of((Long) map.get("timestamp"), map));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public <T> Mono<PagerResult<T>> queryPager(QueryParam queryParam, Function<TimeSeriesData, T> mapper) {
|
|
|
- return elasticSearchService.queryPager(index, queryParam, map -> mapper.apply(TimeSeriesData.of((Long) map.get("timestamp"), map)));
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
@Override
|
|
|
public Mono<Void> save(Publisher<TimeSeriesData> data) {
|
|
|
return Flux.from(data)
|