123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- package com.liyan;
- import com.alibaba.fastjson.JSONObject;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.liyan.portal.LadarMessage;
- ;
- import io.netty.buffer.Unpooled;
- import org.jetlinks.core.message.Message;
- import org.jetlinks.core.message.codec.*;
- import org.jetlinks.core.message.property.*;
- import reactor.core.publisher.Flux;
- import javax.annotation.Nonnull;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import java.util.stream.Stream;
- public class LadarDeviceMessageCodec implements DeviceMessageCodec {
- private final ObjectMapper mapper = ObjectMappers.JSON_MAPPER;
- private final HashMap<String, String> propMessageIdMapping = new HashMap<>();
- @Override
- public Transport getSupportTransport() {
- return DefaultTransport.MQTT;
- }
- @Nonnull
- @Override
- public Flux<? extends Message> decode(@Nonnull MessageDecodeContext context) {
- FromDeviceMessageContext ctx = (FromDeviceMessageContext) context;
- MqttMessage message = (MqttMessage) ctx.getMessage();
- String topic = message.getTopic();
- Pattern pattern = Pattern.compile("(?<=Radar60SP\\/)(.*?)(?=\\/sys)");
- Matcher matcher = pattern.matcher(topic);
- if (!matcher.find()) return Flux.empty();
- String deviceId = matcher.group(1);
- LadarMessage ladarMessage = JSONObject.parseObject(message.payloadAsString(), LadarMessage.class);
- // 雷达消息类型为获取属性响应
- if ("get".equals(ladarMessage.getOpt())) {
- ReadPropertyMessageReply readPropertyMessageReply = ReadPropertyMessageReply.create()
- .success(ladarMessage.getParams());
- String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
- String propKeyId = deviceId + "@get@" + prop;
- if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
- readPropertyMessageReply.setMessageId(propMessageIdMapping.get(propKeyId));
- propMessageIdMapping.remove(propKeyId);
- readPropertyMessageReply.setDeviceId(deviceId);
- return Flux.just(readPropertyMessageReply);
- }
- // 雷达消息类型为设置属性响应类型
- if ("set".equals(ladarMessage.getOpt())) {
- WritePropertyMessageReply reply = WritePropertyMessageReply.create()
- .success(ladarMessage.getParams());
- String prop = ladarMessage.getParams().keySet().stream().findFirst().orElse(null);
- String propKeyId = deviceId + "@set@" + prop;
- if (!propMessageIdMapping.containsKey(propKeyId)) return Flux.empty();
- reply.setMessageId(propMessageIdMapping.get(propKeyId));
- propMessageIdMapping.remove(propKeyId);
- reply.setDeviceId(deviceId);
- return Flux.just(reply);
- }
- // 雷达上报属性
- ReportPropertyMessage report = new ReportPropertyMessage();
- report.properties(ladarMessage.getParams());
- report.setDeviceId(deviceId);
- report.setTimestamp(System.currentTimeMillis());
- return Flux.just(report);
- }
- @Nonnull
- @Override
- public Flux<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
- MessageEncodeContext ctx = context;
- Message message = ctx.getMessage();
- // 消息类型为读取属性类型
- if (message instanceof ReadPropertyMessage) {
- ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage) message;
- // 获取待查询的属性类型
- List<String> props = readPropertyMessage.getProperties();
- if (props.isEmpty()) return Flux.empty();
- Stream<SimpleMqttMessage> simpleMqttMessageStream = props.parallelStream().map(prop -> {
- String propKeyId = readPropertyMessage.getDeviceId() + "@get@" + prop;
- // System.out.println("[LADARENCODE]--------" + propKeyId);
- // System.out.println("[LADAEENCODE]--------messageId" + readPropertyMessage.getMessageId());
- propMessageIdMapping.put(propKeyId, readPropertyMessage.getMessageId());
- LadarMessage ladarMessage = new LadarMessage();
- ladarMessage.setVersion("1.0");
- ladarMessage.setMethod("get");
- ladarMessage.getParams().put(prop, "?");
- SimpleMqttMessage mqttMessage = null;
- try {
- mqttMessage = SimpleMqttMessage
- .builder()
- .clientId(readPropertyMessage.getDeviceId())
- .topic("/Radar60SP/" + readPropertyMessage.getDeviceId() + "/sys/property/set")
- .payloadType(MessagePayloadType.JSON)
- .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
- .build();
- } catch (JsonProcessingException e) {
- }
- return mqttMessage;
- }).filter(it -> !(it == null));
- return Flux.fromStream(simpleMqttMessageStream);
- }
- // 消息类型为设置属性类型
- if (message instanceof WritePropertyMessage) {
- WritePropertyMessage writePropertyMessage = (WritePropertyMessage) message;
- Map<String, Object> properties = writePropertyMessage.getProperties();
- if (properties.isEmpty()) return Flux.empty();
- List<SimpleMqttMessage> mqttMessages = new ArrayList<>();
- properties.forEach((key, value) -> {
- String keyId = writePropertyMessage.getDeviceId() + "@set@" + key;
- propMessageIdMapping.put(keyId, writePropertyMessage.getMessageId());
- LadarMessage ladarMessage = new LadarMessage();
- ladarMessage.setVersion("1.0");
- ladarMessage.setMethod("set");
- ladarMessage.getParams().put(key, value);
- SimpleMqttMessage mqttMessage = null;
- try {
- mqttMessage = SimpleMqttMessage
- .builder()
- .clientId(writePropertyMessage.getDeviceId())
- .topic("/Radar60SP/" + writePropertyMessage.getDeviceId() + "/sys/property/set")
- .payloadType(MessagePayloadType.JSON)
- .payload(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(ladarMessage)))
- .build();
- } catch (JsonProcessingException e) {
- }
- if (mqttMessage != null) mqttMessages.add(mqttMessage);
- });
- return Flux.fromIterable(mqttMessages);
- }
- return Flux.empty();
- }
- }
|