|
@@ -0,0 +1,148 @@
|
|
|
+package com.liyan;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.liyan.portal.LadarMessage;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import org.jetlinks.core.message.Message;
|
|
|
+import org.jetlinks.core.message.codec.*;
|
|
|
+import org.jetlinks.core.message.property.*;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+
|
|
|
+import javax.annotation.Nonnull;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+;
|
|
|
+
|
|
|
+public class LadarDeviceMessageCodec implements DeviceMessageCodec {
|
|
|
+
|
|
|
+ private final ObjectMapper mapper = ObjectMappers.JSON_MAPPER;
|
|
|
+
|
|
|
+ private final HashMap<String, String> propMessageIdMapping = new HashMap<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Transport getSupportTransport() {
|
|
|
+ return DefaultTransport.MQTT;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Nonnull
|
|
|
+ @Override
|
|
|
+ public Flux<? extends Message> decode(@Nonnull MessageDecodeContext context) {
|
|
|
+ FromDeviceMessageContext ctx = (FromDeviceMessageContext) context;
|
|
|
+ MqttMessage message = (MqttMessage) ctx.getMessage();
|
|
|
+ String topic = message.getTopic();
|
|
|
+ Pattern pattern = Pattern.compile("(?<=Radar60FL\\/)(.*?)(?=\\/sys)");
|
|
|
+ Matcher matcher = pattern.matcher(topic);
|
|
|
+ if (!matcher.find()) return Flux.empty();
|
|
|
+ String deviceId = matcher.group(1);
|
|
|
+ LadarMessage ladarMessage = JSONObject.parseObject(message.payloadAsString(), LadarMessage.class);
|
|
|
+ // 雷达消息类型为获取属性响应
|
|
|
+ if ("get".equals(ladarMessage.getOpt())) {
|
|
|
+ ReadPropertyMessageReply readPropertyMessageReply = ReadPropertyMessageReply.create()
|
|
|
+ .success(ladarMessage.getParams());
|
|
|
+ String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
|
|
|
+ String propKeyId = deviceId + "@get@" + prop;
|
|
|
+ if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
|
|
|
+ readPropertyMessageReply.setMessageId(propMessageIdMapping.get(propKeyId));
|
|
|
+ propMessageIdMapping.remove(propKeyId);
|
|
|
+ readPropertyMessageReply.setDeviceId(deviceId);
|
|
|
+ return Flux.just(readPropertyMessageReply);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 雷达消息类型为设置属性响应类型
|
|
|
+ if ("set".equals(ladarMessage.getOpt())) {
|
|
|
+ WritePropertyMessageReply reply = WritePropertyMessageReply.create()
|
|
|
+ .success(ladarMessage.getParams());
|
|
|
+ String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
|
|
|
+ String propKeyId = deviceId + "@set@" + prop;
|
|
|
+ if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
|
|
|
+ reply.setMessageId(propMessageIdMapping.get(propKeyId));
|
|
|
+ propMessageIdMapping.remove(propKeyId);
|
|
|
+ reply.setDeviceId(deviceId);
|
|
|
+ return Flux.just(reply);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 雷达上报属性
|
|
|
+ ReportPropertyMessage report = new ReportPropertyMessage();
|
|
|
+ report.properties(ladarMessage.getParams());
|
|
|
+ report.setDeviceId(deviceId);
|
|
|
+ report.setTimestamp(System.currentTimeMillis());
|
|
|
+ return Flux.just(report);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Nonnull
|
|
|
+ @Override
|
|
|
+ public Flux<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
|
|
|
+ MessageEncodeContext ctx = context;
|
|
|
+ Message message = ctx.getMessage();
|
|
|
+ // 消息类型为读取属性类型
|
|
|
+ if (message instanceof ReadPropertyMessage) {
|
|
|
+ ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message;
|
|
|
+ // 获取待查询的属性类型
|
|
|
+ List<String> props = readPropertyMessage.getProperties();
|
|
|
+ if (props.isEmpty()) return Flux.empty();
|
|
|
+ Stream<SimpleMqttMessage> simpleMqttMessageStream = props.parallelStream().map(prop -> {
|
|
|
+ String propKeyId = readPropertyMessage.getDeviceId() + "@get@" + prop;
|
|
|
+ propMessageIdMapping.put(propKeyId, readPropertyMessage.getMessageId());
|
|
|
+ LadarMessage ladarMessage = new LadarMessage();
|
|
|
+ ladarMessage.setVersion("1.0");
|
|
|
+ ladarMessage.setMethod("get");
|
|
|
+ ladarMessage.getParams().put(prop, "?");
|
|
|
+ SimpleMqttMessage mqttMessage = null;
|
|
|
+ try {
|
|
|
+ mqttMessage = SimpleMqttMessage
|
|
|
+ .builder()
|
|
|
+ .clientId(readPropertyMessage.getDeviceId())
|
|
|
+ .topic("/Radar60FL/" + readPropertyMessage.getDeviceId() + "/sys/property/set")
|
|
|
+ .payloadType(MessagePayloadType.JSON)
|
|
|
+ .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
|
|
|
+ .build();
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
+
|
|
|
+ }
|
|
|
+ return mqttMessage;
|
|
|
+ }).filter(it -> !(it == null));
|
|
|
+ return Flux.fromStream(simpleMqttMessageStream);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息类型为设置属性类型
|
|
|
+ if (message instanceof WritePropertyMessage) {
|
|
|
+ WritePropertyMessage writePropertyMessage = (WritePropertyMessage) message;
|
|
|
+ Map<String, Object> properties = writePropertyMessage.getProperties();
|
|
|
+ if (properties.isEmpty()) return Flux.empty();
|
|
|
+ List<SimpleMqttMessage> mqttMessages = new ArrayList<>();
|
|
|
+ properties.forEach((key, value) -> {
|
|
|
+ String keyId = writePropertyMessage.getDeviceId() + "@set@" + key;
|
|
|
+ propMessageIdMapping.put(keyId, writePropertyMessage.getMessageId());
|
|
|
+ LadarMessage ladarMessage = new LadarMessage();
|
|
|
+ ladarMessage.setVersion("1.0");
|
|
|
+ ladarMessage.setMethod("set");
|
|
|
+ ladarMessage.getParams().put(key, value);
|
|
|
+ SimpleMqttMessage mqttMessage = null;
|
|
|
+ try {
|
|
|
+ mqttMessage = SimpleMqttMessage
|
|
|
+ .builder()
|
|
|
+ .clientId(writePropertyMessage.getDeviceId())
|
|
|
+ .topic("/Radar60FL/" + writePropertyMessage.getDeviceId() + "/sys/property/set")
|
|
|
+ .payloadType(MessagePayloadType.JSON)
|
|
|
+ .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
|
|
|
+ .build();
|
|
|
+ } catch (JsonProcessingException e) {
|
|
|
+
|
|
|
+ }
|
|
|
+ if (mqttMessage != null) mqttMessages.add(mqttMessage);
|
|
|
+ });
|
|
|
+ return Flux.fromIterable(mqttMessages);
|
|
|
+ }
|
|
|
+
|
|
|
+ return Flux.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|