package com.liyan; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.liyan.portal.LadarMessage; ; import io.netty.buffer.Unpooled; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.property.*; import reactor.core.publisher.Flux; import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; public class LadarDeviceMessageCodec implements DeviceMessageCodec { private final ObjectMapper mapper = ObjectMappers.JSON_MAPPER; private final HashMap propMessageIdMapping = new HashMap<>(); @Override public Transport getSupportTransport() { return DefaultTransport.MQTT; } @Nonnull @Override public Flux decode(@Nonnull MessageDecodeContext context) { FromDeviceMessageContext ctx = (FromDeviceMessageContext) context; MqttMessage message = (MqttMessage) ctx.getMessage(); String topic = message.getTopic(); Pattern pattern = Pattern.compile("(?<=Radar60SP\\/)(.*?)(?=\\/sys)"); Matcher matcher = pattern.matcher(topic); if (!matcher.find()) return Flux.empty(); String deviceId = matcher.group(1); LadarMessage ladarMessage = JSONObject.parseObject(message.payloadAsString(), LadarMessage.class); // 雷达消息类型为获取属性响应 if ("get".equals(ladarMessage.getOpt())) { ReadPropertyMessageReply readPropertyMessageReply = ReadPropertyMessageReply.create() .success(ladarMessage.getParams()); String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null); String propKeyId = deviceId + "@get@" + prop; if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty(); readPropertyMessageReply.setMessageId(propMessageIdMapping.get(propKeyId)); propMessageIdMapping.remove(propKeyId); readPropertyMessageReply.setDeviceId(deviceId); return Flux.just(readPropertyMessageReply); } // 雷达消息类型为设置属性响应类型 if ("set".equals(ladarMessage.getOpt())) { WritePropertyMessageReply reply = WritePropertyMessageReply.create() .success(ladarMessage.getParams()); String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null); String propKeyId = deviceId + "@set@" + prop; if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty(); reply.setMessageId(propMessageIdMapping.get(propKeyId)); propMessageIdMapping.remove(propKeyId); reply.setDeviceId(deviceId); return Flux.just(reply); } // 雷达上报属性 ReportPropertyMessage report = new ReportPropertyMessage(); report.properties(ladarMessage.getParams()); report.setDeviceId(deviceId); report.setTimestamp(System.currentTimeMillis()); return Flux.just(report); } @Nonnull @Override public Flux encode(@Nonnull MessageEncodeContext context) { MessageEncodeContext ctx = context; Message message = ctx.getMessage(); // 消息类型为读取属性类型 if (message instanceof ReadPropertyMessage) { ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message; // 获取待查询的属性类型 List props = readPropertyMessage.getProperties(); if (props.isEmpty()) return Flux.empty(); Stream simpleMqttMessageStream = props.parallelStream().map(prop -> { String propKeyId = readPropertyMessage.getDeviceId() + "@get@" + prop; // System.out.println("[LADARENCODE]--------" + propKeyId); // System.out.println("[LADAEENCODE]--------messageId" + readPropertyMessage.getMessageId()); propMessageIdMapping.put(propKeyId, readPropertyMessage.getMessageId()); LadarMessage ladarMessage = new LadarMessage(); ladarMessage.setVersion("1.0"); ladarMessage.setMethod("get"); ladarMessage.getParams().put(prop, "?"); SimpleMqttMessage mqttMessage = null; try { mqttMessage = SimpleMqttMessage .builder() .clientId(readPropertyMessage.getDeviceId()) .topic("/Radar60SP/" + readPropertyMessage.getDeviceId() + "/sys/property/set") .payloadType(MessagePayloadType.JSON) .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage))) .build(); } catch (JsonProcessingException e) { } return mqttMessage; }).filter(it -> !(it == null)); return Flux.fromStream(simpleMqttMessageStream); } // 消息类型为设置属性类型 if (message instanceof WritePropertyMessage) { WritePropertyMessage writePropertyMessage = (WritePropertyMessage) message; Map properties = writePropertyMessage.getProperties(); if (properties.isEmpty()) return Flux.empty(); List mqttMessages = new ArrayList<>(); properties.forEach((key, value) -> { String keyId = writePropertyMessage.getDeviceId() + "@set@" + key; propMessageIdMapping.put(keyId, writePropertyMessage.getMessageId()); LadarMessage ladarMessage = new LadarMessage(); ladarMessage.setVersion("1.0"); ladarMessage.setMethod("set"); ladarMessage.getParams().put(key, value); SimpleMqttMessage mqttMessage = null; try { mqttMessage = SimpleMqttMessage .builder() .clientId(writePropertyMessage.getDeviceId()) .topic("/Radar60SP/" + writePropertyMessage.getDeviceId() + "/sys/property/set") .payloadType(MessagePayloadType.JSON) .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage))) .build(); } catch (JsonProcessingException e) { } if (mqttMessage != null) mqttMessages.add(mqttMessage); }); return Flux.fromIterable(mqttMessages); } return Flux.empty(); } }