|
@@ -26,6 +26,7 @@ import reactor.core.Disposable;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Scheduler;
|
|
|
+import reactor.core.scheduler.Schedulers;
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
import java.time.Duration;
|
|
@@ -47,21 +48,28 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
|
|
|
@Override
|
|
|
public Mono<TaskExecutor> createTask(ExecutionContext context) {
|
|
|
- return Mono.just(new DeviceAlarmTaskExecutor(context));
|
|
|
+ return Mono.just(new DeviceAlarmTaskExecutor(context, eventBus, scheduler));
|
|
|
}
|
|
|
|
|
|
- class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
|
|
|
+ static class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
|
|
|
|
|
|
List<String> default_columns = Arrays.asList(
|
|
|
- "timestamp", "deviceId", "this.headers.deviceName deviceName"
|
|
|
+ "timestamp", "deviceId", "this.headers headers", "this.headers.deviceName deviceName"
|
|
|
);
|
|
|
+ private final EventBus eventBus;
|
|
|
+
|
|
|
+ private final Scheduler scheduler;
|
|
|
|
|
|
private DeviceAlarmRule rule;
|
|
|
|
|
|
private ReactorQL ql;
|
|
|
|
|
|
- public DeviceAlarmTaskExecutor(ExecutionContext context) {
|
|
|
+ DeviceAlarmTaskExecutor(ExecutionContext context,
|
|
|
+ EventBus eventBus,
|
|
|
+ Scheduler scheduler) {
|
|
|
super(context);
|
|
|
+ this.eventBus = eventBus;
|
|
|
+ this.scheduler = scheduler;
|
|
|
rule = createRule();
|
|
|
ql = createQL(rule);
|
|
|
}
|
|
@@ -77,7 +85,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
return doSubscribe(eventBus)
|
|
|
.filter(ignore -> state == Task.State.running)
|
|
|
.flatMap(result -> {
|
|
|
- RuleData data = RuleData.create(result);
|
|
|
+ RuleData data = context.newRuleData(result);
|
|
|
//输出到下一节点
|
|
|
return context
|
|
|
.getOutput()
|
|
@@ -99,9 +107,11 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
}
|
|
|
|
|
|
private DeviceAlarmRule createRule() {
|
|
|
- DeviceAlarmRule rule = ValueObject.of(context.getJob().getConfiguration())
|
|
|
+ DeviceAlarmRule rule = ValueObject
|
|
|
+ .of(context.getJob().getConfiguration())
|
|
|
.get("rule")
|
|
|
- .map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule())).orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
|
|
|
+ .map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule()))
|
|
|
+ .orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
|
|
|
rule.validate();
|
|
|
return rule;
|
|
|
}
|
|
@@ -128,7 +138,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i);
|
|
|
columns.addAll(trigger.toColumns());
|
|
|
trigger.createExpression()
|
|
|
- .ifPresent(expr -> wheres.add("(" + expr + ")"));
|
|
|
+ .ifPresent(expr -> wheres.add("(" + expr + ")"));
|
|
|
}
|
|
|
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
|
|
|
|
|
@@ -140,6 +150,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
List<String> newColumns = new ArrayList<>(Arrays.asList(
|
|
|
"this.deviceName deviceName",
|
|
|
"this.deviceId deviceId",
|
|
|
+ "this.headers headers",
|
|
|
"this.timestamp timestamp"));
|
|
|
for (DeviceAlarmRule.Property property : rule.getProperties()) {
|
|
|
if (StringUtils.isEmpty(property.getProperty())) {
|
|
@@ -155,7 +166,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
|
|
|
}
|
|
|
}
|
|
|
- if (newColumns.size() > 3) {
|
|
|
+ if (newColumns.size() > 4) {
|
|
|
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
|
|
|
}
|
|
|
}
|
|
@@ -170,19 +181,36 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
List<Object> binds = new ArrayList<>();
|
|
|
|
|
|
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
|
|
|
- String topic = trigger.getType().getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
|
|
|
- topics.add(topic);
|
|
|
binds.addAll(trigger.toFilterBinds());
|
|
|
+ //since 1.11 定时触发的不从eventBus订阅
|
|
|
+ if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ String topic = trigger
|
|
|
+ .getType()
|
|
|
+ .getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
|
|
|
+ topics.add(topic);
|
|
|
}
|
|
|
- Subscription subscription = Subscription.of(
|
|
|
- "device_alarm:" + rule.getId(),
|
|
|
- topics.toArray(new String[0]),
|
|
|
- Subscription.Feature.local
|
|
|
+
|
|
|
+ List<Flux<? extends Map<String, Object>>> inputs = new ArrayList<>();
|
|
|
+
|
|
|
+ //从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
|
|
|
+ inputs.add(
|
|
|
+ context
|
|
|
+ .getInput()
|
|
|
+ .accept()
|
|
|
+ .flatMap(RuleData::dataToMap)
|
|
|
);
|
|
|
-// List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
|
|
|
|
|
|
- ReactorQLContext context = ReactorQLContext
|
|
|
- .ofDatasource(ignore ->
|
|
|
+ //从事件总线订阅数据进行处理
|
|
|
+ if (!topics.isEmpty()) {
|
|
|
+ Subscription subscription = Subscription.of(
|
|
|
+ "device_alarm:" + rule.getId(),
|
|
|
+ topics.toArray(new String[0]),
|
|
|
+ Subscription.Feature.local
|
|
|
+ );
|
|
|
+ inputs.add(
|
|
|
eventBus
|
|
|
.subscribe(subscription, DeviceMessage.class)
|
|
|
.map(Jsonable::toJson)
|
|
@@ -198,6 +226,9 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
json.put("alarmName", rule.getName());
|
|
|
})
|
|
|
);
|
|
|
+ }
|
|
|
+ ReactorQLContext context = ReactorQLContext
|
|
|
+ .ofDatasource(ignore -> Flux.merge(inputs));
|
|
|
|
|
|
binds.forEach(context::bind);
|
|
|
|
|
@@ -205,36 +236,36 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
.start(context)
|
|
|
.map(ReactorQLRecord::asMap);
|
|
|
|
|
|
- DeviceAlarmRule.ShakeLimit shakeLimit;
|
|
|
- if ((shakeLimit = rule.getShakeLimit()) != null
|
|
|
- && shakeLimit.isEnabled()
|
|
|
- && shakeLimit.getTime() > 0) {
|
|
|
- int thresholdNumber = shakeLimit.getThreshold();
|
|
|
- Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
|
|
|
+ ShakeLimit shakeLimit;
|
|
|
+ if ((shakeLimit = rule.getShakeLimit()) != null) {
|
|
|
|
|
|
- resultFlux = resultFlux
|
|
|
- .as(flux ->
|
|
|
+ resultFlux = shakeLimit.transfer(
|
|
|
+ resultFlux,
|
|
|
+ (duration, flux) ->
|
|
|
StringUtils.hasText(rule.getDeviceId())
|
|
|
- ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
|
|
|
- : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
|
|
|
+ //规则已经指定了固定的设备,直接开启时间窗口就行
|
|
|
+ ? flux.window(duration, scheduler)
|
|
|
+ //规则配置在设备产品上,则按设备ID分组后再开窗口
|
|
|
+ : flux
|
|
|
.groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
|
|
|
- .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
|
|
|
- //处理每一组数据
|
|
|
- .flatMap(group -> group
|
|
|
- .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
|
|
|
- .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
|
|
|
- .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
|
|
|
- .map(tp2 -> {
|
|
|
- tp2.getT2().put("totalAlarms", tp2.getT1());
|
|
|
- return tp2.getT2();
|
|
|
- }));
|
|
|
+ .flatMap(group -> group.window(duration, scheduler), Integer.MAX_VALUE),
|
|
|
+ (alarm, total) -> alarm.put("totalAlarms", total)
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
return resultFlux
|
|
|
.flatMap(map -> {
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ Map<String, Object> headers = (Map<String, Object>) map.remove("headers");
|
|
|
map.put("productId", rule.getProductId());
|
|
|
map.put("alarmId", rule.getId());
|
|
|
map.put("alarmName", rule.getName());
|
|
|
+ if (null != rule.getLevel()) {
|
|
|
+ map.put("alarmLevel", rule.getLevel());
|
|
|
+ }
|
|
|
+ if (null != rule.getType()) {
|
|
|
+ map.put("alarmType", rule.getType());
|
|
|
+ }
|
|
|
if (StringUtils.hasText(rule.getDeviceName())) {
|
|
|
map.putIfAbsent("deviceName", rule.getDeviceName());
|
|
|
}
|
|
@@ -244,26 +275,30 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
if (StringUtils.hasText(rule.getDeviceId())) {
|
|
|
map.putIfAbsent("deviceId", rule.getDeviceId());
|
|
|
}
|
|
|
- if (!map.containsKey("deviceName")) {
|
|
|
+ if (!map.containsKey("deviceName") && map.get("deviceId") != null) {
|
|
|
map.putIfAbsent("deviceName", map.get("deviceId"));
|
|
|
}
|
|
|
if (!map.containsKey("productName")) {
|
|
|
- map.putIfAbsent("productName", map.get("productId"));
|
|
|
+ map.putIfAbsent("productName", rule.getProductId());
|
|
|
}
|
|
|
- //生成告警记录时生成ID,方便下游做处理。
|
|
|
- map.putIfAbsent("id", IDGenerator.MD5.generate());
|
|
|
if (log.isDebugEnabled()) {
|
|
|
log.debug("发生设备告警:{}", map);
|
|
|
}
|
|
|
+
|
|
|
+ //生成告警记录时生成ID,方便下游做处理。
|
|
|
+ map.putIfAbsent("id", IDGenerator.MD5.generate());
|
|
|
// 推送告警信息到消息网关中
|
|
|
// /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
|
|
|
return eventBus
|
|
|
.publish(String.format(
|
|
|
"/rule-engine/device/alarm/%s/%s/%s",
|
|
|
- rule.getProductId(), map.get("deviceId"), rule.getId()
|
|
|
- ), map)
|
|
|
- .then(Mono.just(map));
|
|
|
+ rule.getProductId(),
|
|
|
+ map.get("deviceId"),
|
|
|
+ rule.getId()), map)
|
|
|
+ .thenReturn(map);
|
|
|
+
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|