|
@@ -4,7 +4,16 @@ import com.alibaba.fastjson.JSON;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.collections.MapUtils;
|
|
import org.apache.commons.collections.MapUtils;
|
|
import org.hswebframework.web.id.IDGenerator;
|
|
import org.hswebframework.web.id.IDGenerator;
|
|
|
|
+import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
import org.jetlinks.community.device.entity.DevicePropertiesEntity;
|
|
import org.jetlinks.community.device.entity.DevicePropertiesEntity;
|
|
|
|
+import org.jetlinks.community.device.enums.DeviceLogType;
|
|
|
|
+import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
|
|
|
|
+import org.jetlinks.community.device.message.DeviceMessageUtils;
|
|
|
|
+import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
|
|
|
|
+import org.jetlinks.community.gateway.TopicMessage;
|
|
|
|
+import org.jetlinks.community.gateway.annotation.Subscribe;
|
|
|
|
+import org.jetlinks.community.timeseries.TimeSeriesData;
|
|
|
|
+import org.jetlinks.community.timeseries.TimeSeriesManager;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
import org.jetlinks.core.message.DeviceMessage;
|
|
import org.jetlinks.core.message.DeviceMessage;
|
|
import org.jetlinks.core.message.DeviceOfflineMessage;
|
|
import org.jetlinks.core.message.DeviceOfflineMessage;
|
|
@@ -17,29 +26,14 @@ import org.jetlinks.core.message.property.WritePropertyMessageReply;
|
|
import org.jetlinks.core.metadata.DataType;
|
|
import org.jetlinks.core.metadata.DataType;
|
|
import org.jetlinks.core.metadata.EventMetadata;
|
|
import org.jetlinks.core.metadata.EventMetadata;
|
|
import org.jetlinks.core.metadata.PropertyMetadata;
|
|
import org.jetlinks.core.metadata.PropertyMetadata;
|
|
-import org.jetlinks.core.metadata.types.DateTimeType;
|
|
|
|
-import org.jetlinks.core.metadata.types.NumberType;
|
|
|
|
-import org.jetlinks.core.metadata.types.ObjectType;
|
|
|
|
import org.jetlinks.core.metadata.types.UnknownType;
|
|
import org.jetlinks.core.metadata.types.UnknownType;
|
|
-import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
|
|
|
|
-import org.jetlinks.community.device.enums.DeviceLogType;
|
|
|
|
-import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
|
|
|
|
-import org.jetlinks.community.device.message.DeviceMessageUtils;
|
|
|
|
-import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
|
|
|
|
-import org.jetlinks.community.gateway.*;
|
|
|
|
-import org.jetlinks.community.timeseries.TimeSeriesData;
|
|
|
|
-import org.jetlinks.community.timeseries.TimeSeriesManager;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
-import javax.annotation.Nonnull;
|
|
|
|
-import java.math.BigDecimal;
|
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import static java.util.Optional.ofNullable;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* 用于将设备消息写入到时序数据库
|
|
* 用于将设备消息写入到时序数据库
|
|
*
|
|
*
|
|
@@ -47,11 +41,7 @@ import static java.util.Optional.ofNullable;
|
|
* @since 1.0
|
|
* @since 1.0
|
|
*/
|
|
*/
|
|
@Slf4j
|
|
@Slf4j
|
|
-public class TimeSeriesMessageWriterConnector
|
|
|
|
- implements MessageConnector,
|
|
|
|
- MessageConnection,
|
|
|
|
- MessageSubscriber {
|
|
|
|
-
|
|
|
|
|
|
+public class TimeSeriesMessageWriterConnector{
|
|
public TimeSeriesManager timeSeriesManager;
|
|
public TimeSeriesManager timeSeriesManager;
|
|
|
|
|
|
public DeviceRegistry registry;
|
|
public DeviceRegistry registry;
|
|
@@ -61,41 +51,8 @@ public class TimeSeriesMessageWriterConnector
|
|
this.registry = registry;
|
|
this.registry = registry;
|
|
}
|
|
}
|
|
|
|
|
|
- @Nonnull
|
|
|
|
- @Override
|
|
|
|
- public String getId() {
|
|
|
|
- return "device-message-ts-writer";
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public String getName() {
|
|
|
|
- return "写入设备消息到时序数据库";
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onDisconnect(Runnable disconnectListener) {
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void disconnect() {
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public boolean isAlive() {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Nonnull
|
|
|
|
- @Override
|
|
|
|
- public Flux<MessageConnection> onConnection() {
|
|
|
|
- return Flux.just(this);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Nonnull
|
|
|
|
- @Override
|
|
|
|
- public Mono<Void> publish(@Nonnull TopicMessage message) {
|
|
|
|
|
|
+ @Subscribe(topics = "/device/**",id = "device-message-ts-writer")
|
|
|
|
+ public Mono<Void> writeDeviceMessageToTs(TopicMessage message){
|
|
return Mono
|
|
return Mono
|
|
.justOrEmpty(DeviceMessageUtils.convert(message))
|
|
.justOrEmpty(DeviceMessageUtils.convert(message))
|
|
.flatMap(this::doIndex);
|
|
.flatMap(this::doIndex);
|
|
@@ -116,7 +73,6 @@ public class TimeSeriesMessageWriterConnector
|
|
Mono<Void> thenJob = null;
|
|
Mono<Void> thenJob = null;
|
|
if (message instanceof EventMessage) {
|
|
if (message instanceof EventMessage) {
|
|
operationLog.setContent(JSON.toJSONString(((EventMessage) message).getData()));
|
|
operationLog.setContent(JSON.toJSONString(((EventMessage) message).getData()));
|
|
- //上报属性
|
|
|
|
thenJob = doIndexEventMessage(headers, ((EventMessage) message));
|
|
thenJob = doIndexEventMessage(headers, ((EventMessage) message));
|
|
} else if (message instanceof DeviceOfflineMessage) {
|
|
} else if (message instanceof DeviceOfflineMessage) {
|
|
operationLog.setContent("设备离线");
|
|
operationLog.setContent("设备离线");
|
|
@@ -216,26 +172,4 @@ public class TimeSeriesMessageWriterConnector
|
|
}))
|
|
}))
|
|
.flatMap(data -> timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, message.getEvent())).save(data));
|
|
.flatMap(data -> timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, message.getEvent())).save(data));
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * @return 订阅信息
|
|
|
|
- * @see org.jetlinks.community.device.message.DeviceMessageConnector
|
|
|
|
- */
|
|
|
|
- @Nonnull
|
|
|
|
- @Override
|
|
|
|
- public Flux<Subscription> onSubscribe() {
|
|
|
|
- //订阅设备相关所有消息
|
|
|
|
- return Flux.just(new Subscription("/device/**"));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Nonnull
|
|
|
|
- @Override
|
|
|
|
- public Flux<Subscription> onUnSubscribe() {
|
|
|
|
- return Flux.empty();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public boolean isShareCluster() {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|