LadarDeviceMessageCodec.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package com.liyan;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.liyan.portal.LadarMessage;
  6. ;
  7. import io.netty.buffer.Unpooled;
  8. import org.jetlinks.core.message.Message;
  9. import org.jetlinks.core.message.codec.*;
  10. import org.jetlinks.core.message.property.*;
  11. import reactor.core.publisher.Flux;
  12. import javax.annotation.Nonnull;
  13. import java.util.ArrayList;
  14. import java.util.HashMap;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.regex.Matcher;
  18. import java.util.regex.Pattern;
  19. import java.util.stream.Stream;
  20. public class LadarDeviceMessageCodec implements DeviceMessageCodec {
  21. private final ObjectMapper mapper = ObjectMappers.JSON_MAPPER;
  22. private final HashMap<String, String> propMessageIdMapping = new HashMap<>();
  23. @Override
  24. public Transport getSupportTransport() {
  25. return DefaultTransport.MQTT;
  26. }
  27. @Nonnull
  28. @Override
  29. public Flux<? extends Message> decode(@Nonnull MessageDecodeContext context) {
  30. FromDeviceMessageContext ctx = (FromDeviceMessageContext) context;
  31. MqttMessage message = (MqttMessage) ctx.getMessage();
  32. String topic = message.getTopic();
  33. Pattern pattern = Pattern.compile("(?<=Radar60SP\\/)(.*?)(?=\\/sys)");
  34. Matcher matcher = pattern.matcher(topic);
  35. if (!matcher.find()) return Flux.empty();
  36. String deviceId = matcher.group(1);
  37. LadarMessage ladarMessage = JSONObject.parseObject(message.payloadAsString(), LadarMessage.class);
  38. // 雷达消息类型为获取属性响应
  39. if ("get".equals(ladarMessage.getOpt())) {
  40. ReadPropertyMessageReply readPropertyMessageReply = ReadPropertyMessageReply.create()
  41. .success(ladarMessage.getParams());
  42. String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
  43. String propKeyId = deviceId + "@get@" + prop;
  44. if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
  45. readPropertyMessageReply.setMessageId(propMessageIdMapping.get(propKeyId));
  46. propMessageIdMapping.remove(propKeyId);
  47. readPropertyMessageReply.setDeviceId(deviceId);
  48. return Flux.just(readPropertyMessageReply);
  49. }
  50. // 雷达消息类型为设置属性响应类型
  51. if ("set".equals(ladarMessage.getOpt())) {
  52. WritePropertyMessageReply reply = WritePropertyMessageReply.create()
  53. .success(ladarMessage.getParams());
  54. String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
  55. String propKeyId = deviceId + "@set@" + prop;
  56. if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
  57. reply.setMessageId(propMessageIdMapping.get(propKeyId));
  58. propMessageIdMapping.remove(propKeyId);
  59. reply.setDeviceId(deviceId);
  60. return Flux.just(reply);
  61. }
  62. // 雷达上报属性
  63. ReportPropertyMessage report = new ReportPropertyMessage();
  64. report.properties(ladarMessage.getParams());
  65. report.setDeviceId(deviceId);
  66. report.setTimestamp(System.currentTimeMillis());
  67. return Flux.just(report);
  68. }
  69. @Nonnull
  70. @Override
  71. public Flux<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
  72. MessageEncodeContext ctx = context;
  73. Message message = ctx.getMessage();
  74. // 消息类型为读取属性类型
  75. if (message instanceof ReadPropertyMessage) {
  76. ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message;
  77. // 获取待查询的属性类型
  78. List<String> props = readPropertyMessage.getProperties();
  79. if (props.isEmpty()) return Flux.empty();
  80. Stream<SimpleMqttMessage> simpleMqttMessageStream = props.parallelStream().map(prop -> {
  81. String propKeyId = readPropertyMessage.getDeviceId() + "@get@" + prop;
  82. // System.out.println("[LADARENCODE]--------" + propKeyId);
  83. // System.out.println("[LADAEENCODE]--------messageId" + readPropertyMessage.getMessageId());
  84. propMessageIdMapping.put(propKeyId, readPropertyMessage.getMessageId());
  85. LadarMessage ladarMessage = new LadarMessage();
  86. ladarMessage.setVersion("1.0");
  87. ladarMessage.setMethod("get");
  88. ladarMessage.getParams().put(prop, "?");
  89. SimpleMqttMessage mqttMessage = null;
  90. try {
  91. mqttMessage = SimpleMqttMessage
  92. .builder()
  93. .clientId(readPropertyMessage.getDeviceId())
  94. .topic("/Radar60SP/" + readPropertyMessage.getDeviceId() + "/sys/property/set")
  95. .payloadType(MessagePayloadType.JSON)
  96. .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
  97. .build();
  98. } catch (JsonProcessingException e) {
  99. }
  100. return mqttMessage;
  101. }).filter(it -> !(it == null));
  102. return Flux.fromStream(simpleMqttMessageStream);
  103. }
  104. // 消息类型为设置属性类型
  105. if (message instanceof WritePropertyMessage) {
  106. WritePropertyMessage writePropertyMessage = (WritePropertyMessage) message;
  107. Map<String, Object> properties = writePropertyMessage.getProperties();
  108. if (properties.isEmpty()) return Flux.empty();
  109. List<SimpleMqttMessage> mqttMessages = new ArrayList<>();
  110. properties.forEach((key, value) -> {
  111. String keyId = writePropertyMessage.getDeviceId() + "@set@" + key;
  112. propMessageIdMapping.put(keyId, writePropertyMessage.getMessageId());
  113. LadarMessage ladarMessage = new LadarMessage();
  114. ladarMessage.setVersion("1.0");
  115. ladarMessage.setMethod("set");
  116. ladarMessage.getParams().put(key, value);
  117. SimpleMqttMessage mqttMessage = null;
  118. try {
  119. mqttMessage = SimpleMqttMessage
  120. .builder()
  121. .clientId(writePropertyMessage.getDeviceId())
  122. .topic("/Radar60SP/" + writePropertyMessage.getDeviceId() + "/sys/property/set")
  123. .payloadType(MessagePayloadType.JSON)
  124. .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
  125. .build();
  126. } catch (JsonProcessingException e) {
  127. }
  128. if (mqttMessage != null) mqttMessages.add(mqttMessage);
  129. });
  130. return Flux.fromIterable(mqttMessages);
  131. }
  132. return Flux.empty();
  133. }
  134. }