Explorar o código

增加通知中心相关功能

zhou-hao %!s(int64=5) %!d(string=hai) anos
pai
achega
f2db1dc088
Modificáronse 15 ficheiros con 779 adicións e 7 borrados
  1. 1 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/MessageGateway.java
  2. 14 7
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultMessageGateway.java
  3. 60 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java
  4. 63 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java
  5. 69 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java
  6. 20 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotificationState.java
  7. 19 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/SubscribeState.java
  8. 41 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java
  9. 60 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotificationService.java
  10. 155 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java
  11. 18 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Notify.java
  12. 8 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Subscriber.java
  13. 16 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProvider.java
  14. 74 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java
  15. 161 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/web/NotificationController.java

+ 1 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/MessageGateway.java

@@ -103,4 +103,5 @@ public interface MessageGateway {
      */
     void shutdown();
 
+    String nextSubscriberId(String prefix);
 }

+ 14 - 7
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultMessageGateway.java

@@ -13,6 +13,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 
 @Slf4j
@@ -21,19 +22,19 @@ public class DefaultMessageGateway implements MessageGateway {
     @Getter
     private final String id;
     @Getter
-    private String name;
+    private final String name;
 
-    private TopicPart root = new TopicPart(null, "/");
+    private final TopicPart root = new TopicPart(null, "/");
 
-    private Map<String, ConnectionSession> sessions = new ConcurrentHashMap<>();
+    private final Map<String, ConnectionSession> sessions = new ConcurrentHashMap<>();
 
-    private ClientSessionManager sessionManager;
+    private final ClientSessionManager sessionManager;
 
-    private Map<String, Connector> connectors = new ConcurrentHashMap<>();
+    private final Map<String, Connector> connectors = new ConcurrentHashMap<>();
 
-    private AtomicBoolean started = new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
 
-    private LocalMessageConnector localGatewayConnector;
+    private final LocalMessageConnector localGatewayConnector;
 
     public DefaultMessageGateway(String id, ClientSessionManager sessionManager) {
         this(id, id, sessionManager);
@@ -47,6 +48,12 @@ public class DefaultMessageGateway implements MessageGateway {
         this.registerMessageConnector(localGatewayConnector);
     }
 
+    private final AtomicLong nextSubCounter = new AtomicLong();
+
+    @Override
+    public String nextSubscriberId(String prefix) {
+        return prefix + ":" + nextSubCounter.getAndIncrement();
+    }
     @Override
     public Flux<ClientSession> publish(TopicMessage message, boolean shareCluster) {
         return publishLocal(message, session -> true);

+ 60 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java

@@ -0,0 +1,60 @@
+package org.jetlinks.community.notify.manager.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.community.notify.manager.subscriber.Notify;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+public class Notification implements Serializable {
+    private static final long serialVersionUID = -1L;
+
+    private String id;
+
+    private String subscribeId;
+
+    private String subscriberType;
+
+    private String subscriber;
+
+    private String topicProvider;
+
+    private String topicName;
+
+    private String message;
+
+    private String dataId;
+
+    private long notifyTime;
+
+    public static Notification from(NotifySubscriberEntity entity) {
+        Notification notification = new Notification();
+
+        notification.subscribeId = entity.getId();
+        notification.subscriberType = entity.getSubscriberType();
+        notification.subscriber = entity.getSubscriber();
+        notification.topicName = entity.getTopicName();
+        notification.setTopicProvider(entity.getTopicProvider());
+
+        return notification;
+    }
+
+    public Notification copyWithMessage(Notify message) {
+        Notification target = FastBeanCopier.copy(this, new Notification());
+        target.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
+        target.setMessage(message.getMessage());
+        target.setDataId(message.getDataId());
+        target.setNotifyTime(message.getNotifyTime());
+
+        return target;
+    }
+
+    public String createTopic() {
+        //      /notifications/{订阅者类型:user}/{订阅者ID:userId}/{主题类型}/{订阅ID}
+        return "/notifications/" + getSubscriberType() + "/" + getSubscriber() + "/" + getTopicProvider() + "/" + getSubscribeId();
+    }
+}

+ 63 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java

@@ -0,0 +1,63 @@
+package org.jetlinks.community.notify.manager.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
+import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
+import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.notify.manager.enums.NotificationState;
+
+import javax.persistence.Column;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import java.sql.JDBCType;
+
+@Getter
+@Setter
+@Table(name = "notify_notifications",
+    indexes = @Index(
+        name = "idx_ntfc_subscribe", columnList = "subscriber_type,subscriber"
+    ))
+public class NotificationEntity extends GenericEntity<String> {
+    private static final long serialVersionUID = -1L;
+
+    @Column(length = 64, nullable = false, updatable = false)
+    private String subscribeId;
+
+    @Column(length = 32, nullable = false, updatable = false)
+    private String subscriberType;
+
+    @Column(length = 64, nullable = false, updatable = false)
+    private String subscriber;
+
+    @Column(length = 32, nullable = false, updatable = false)
+    private String topicProvider;
+
+    @Column(length = 64, nullable = false, updatable = false)
+    private String topicName;
+
+    @Column
+    @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class)
+    private String message;
+
+    @Column(length = 64)
+    private String dataId;
+
+    @Column(nullable = false)
+    private Long notifyTime;
+
+    @Column(length = 32)
+    @EnumCodec
+    @DefaultValue("unread")
+    @ColumnType(javaType = String.class)
+    private NotificationState state;
+
+    @Column(length = 1024)
+    private String description;
+
+    public static NotificationEntity from(Notification notification) {
+        return FastBeanCopier.copy(notification, new NotificationEntity());
+    }
+}

+ 69 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java

@@ -0,0 +1,69 @@
+package org.jetlinks.community.notify.manager.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.ezorm.rdb.mapping.annotation.*;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.crud.annotation.EnableEntityEvent;
+import org.jetlinks.community.notify.manager.enums.SubscribeState;
+
+import javax.persistence.Column;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import java.util.Map;
+
+/**
+ * 通知订阅者
+ *
+ * @author zhouhao
+ * @since 1.3
+ */
+@Table(name = "notify_subscribers",
+    indexes = @Index(name = "idx_nfy_subs_subscriber", columnList = "subscriber")
+)
+@Getter
+@Setter
+@EnableEntityEvent
+public class NotifySubscriberEntity extends GenericEntity<String> {
+
+    private static final long serialVersionUID = -1L;
+
+    @Comment("订阅者类型,如:user")
+    @Column(length = 32, nullable = false, updatable = false)
+    private String subscriberType;
+
+    @Comment("订阅者ID")
+    @Column(length = 32, nullable = false, updatable = false)
+    private String subscriber;
+
+    @Comment("主题提供商标识,如:device_alarm")
+    @Column(length = 32, nullable = false, updatable = false)
+    private String topicProvider;
+
+    @Comment("订阅名称")
+    @Column(length = 64, nullable = false)
+    private String subscribeName;
+
+    @Comment("主题名称,如:设备告警")
+    @Column(length = 64, nullable = false)
+    private String topicName;
+
+    @Comment("主题订阅配置")
+    @Column(length = 3000)
+    @JsonCodec
+    @ColumnType
+    private Map<String, Object> topicConfig;
+
+    @Column
+    @Comment("描述")
+    private String description;
+
+    @Comment("状态:enabled,disabled")
+    @Column(length = 32)
+    @EnumCodec
+    @ColumnType(javaType = String.class)
+    @DefaultValue("enabled")
+    private SubscribeState state;
+
+
+}

+ 20 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotificationState.java

@@ -0,0 +1,20 @@
+package org.jetlinks.community.notify.manager.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.hswebframework.web.dict.EnumDict;
+
+@Getter
+@AllArgsConstructor
+public enum NotificationState implements EnumDict<String> {
+
+    unread("未读"),
+    read("已读");
+
+    private final String text;
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+}

+ 19 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/SubscribeState.java

@@ -0,0 +1,19 @@
+package org.jetlinks.community.notify.manager.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.hswebframework.web.dict.EnumDict;
+
+@Getter
+@AllArgsConstructor
+public enum SubscribeState implements EnumDict<String> {
+    enabled("订阅中"),
+    disabled("已停止");
+
+    private final String text;
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+}

+ 41 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java

@@ -0,0 +1,41 @@
+package org.jetlinks.community.notify.manager.message;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+@Component
+@AllArgsConstructor
+public class NotificationsPublishProvider implements SubscriptionProvider {
+
+    private final MessageGateway messageGateway;
+
+    @Override
+    public String id() {
+        return "notifications-publisher";
+    }
+
+    @Override
+    public String name() {
+        return "通知推送器";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{"/notifications"};
+    }
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+
+        return messageGateway
+            .subscribe(
+                "/notifications/user/" + request.getAuthentication().getUser().getId() + "/*/*"
+                , messageGateway.nextSubscriberId("notifications-publisher"))
+            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.getMessage().payloadAsJson()));
+    }
+}

+ 60 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotificationService.java

@@ -0,0 +1,60 @@
+package org.jetlinks.community.notify.manager.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.core.utils.FluxUtils;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.notify.manager.entity.Notification;
+import org.jetlinks.community.notify.manager.entity.NotificationEntity;
+import org.jetlinks.community.notify.manager.enums.NotificationState;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.stream.Collectors;
+
+@Service
+@Slf4j
+public class NotificationService extends GenericReactiveCrudService<NotificationEntity, String> {
+
+
+    private final EmitterProcessor<NotificationEntity> processor = EmitterProcessor.create();
+
+    private final FluxSink<NotificationEntity> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+    @PostConstruct
+    public void init() {
+
+        FluxUtils
+            .bufferRate(processor, 1000, 200, Duration.ofSeconds(3))
+            .flatMap(buffer -> this.save(Flux.fromIterable(buffer)))
+            .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
+            .subscribe()
+        ;
+
+    }
+
+    @Subscribe("/notifications/**")
+    public Mono<Void> subscribeNotifications(Notification notification) {
+        return Mono.fromRunnable(() -> sink.next(NotificationEntity.from(notification)));
+    }
+
+    public Flux<NotificationEntity> findAndMarkRead(QueryParamEntity query) {
+        return query(query)
+            .collectList()
+            .flatMapMany(list -> createUpdate()
+                .set(NotificationEntity::getState, NotificationState.read)
+                .where()
+                .in(NotificationEntity::getId, list.stream().map(NotificationEntity::getId).collect(Collectors.toList()))
+                .and(NotificationEntity::getState, NotificationState.unread)
+                .execute()
+                .thenMany(Flux.fromIterable(list)
+                    .doOnNext(e -> e.setState(NotificationState.read))));
+    }
+
+}

+ 155 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java

@@ -0,0 +1,155 @@
+package org.jetlinks.community.notify.manager.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.crud.events.EntityCreatedEvent;
+import org.hswebframework.web.crud.events.EntityDeletedEvent;
+import org.hswebframework.web.crud.events.EntityModifyEvent;
+import org.hswebframework.web.crud.events.EntitySavedEvent;
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.notify.manager.entity.Notification;
+import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
+import org.jetlinks.community.notify.manager.enums.SubscribeState;
+import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+@Slf4j
+public class NotifySubscriberService extends GenericReactiveCrudService<NotifySubscriberEntity, String> implements CommandLineRunner {
+
+    private final MessageGateway gateway;
+
+    private final ClusterManager clusterManager;
+
+    private final Map<String, SubscriberProvider> providers = new ConcurrentHashMap<>();
+
+    private final Map<String, Disposable> subscribers = new ConcurrentHashMap<>();
+
+    public NotifySubscriberService(MessageGateway gateway,
+                                   ClusterManager clusterManager,
+                                   List<SubscriberProvider> providers) {
+        this.gateway = gateway;
+        this.clusterManager = clusterManager;
+        for (SubscriberProvider provider : providers) {
+            this.providers.put(provider.getId(), provider);
+        }
+    }
+
+    public Optional<SubscriberProvider> getProvider(String provider) {
+        return Optional.ofNullable(provider).map(providers::get);
+    }
+
+    private void doStart() {
+        clusterManager.<NotifySubscriberEntity>getTopic("notification-changed")
+            .subscribe()
+            .subscribe(this::handleSubscribe);
+    }
+
+    protected void doNotifyChange(NotifySubscriberEntity entity) {
+        clusterManager.<NotifySubscriberEntity>getTopic("notification-changed")
+            .publish(Mono.just(entity))
+            .retry(3)
+            .subscribe();
+    }
+
+    @EventListener
+    public void handleEvent(EntityCreatedEvent<NotifySubscriberEntity> entity) {
+        entity.getEntity().forEach(this::doNotifyChange);
+    }
+
+    @EventListener
+    public void handleEvent(EntitySavedEvent<NotifySubscriberEntity> entity) {
+        entity.getEntity().forEach(this::doNotifyChange);
+    }
+
+    @EventListener
+    public void handleEvent(EntityDeletedEvent<NotifySubscriberEntity> entity) {
+        entity.getEntity().forEach(e -> {
+            e.setState(SubscribeState.disabled);
+            doNotifyChange(e);
+        });
+    }
+
+    @EventListener
+    public void handleEvent(EntityModifyEvent<NotifySubscriberEntity> entity) {
+        entity.getAfter().forEach(this::doNotifyChange);
+    }
+
+    private void handleSubscribe(NotifySubscriberEntity entity) {
+
+        //取消订阅
+        if (entity.getState() == SubscribeState.disabled) {
+            Optional.ofNullable(subscribers.remove(entity.getId()))
+                .ifPresent(Disposable::dispose);
+            log.debug("unsubscribe:{}({}),{}", entity.getTopicProvider(), entity.getTopicName(), entity.getId());
+            return;
+        }
+
+        //模版
+        Notification template = Notification.from(entity);
+        //转发通知
+        String dispatch = template.createTopic();
+
+        Disposable old = subscribers
+            .put(entity.getId(), Mono.justOrEmpty(getProvider(entity.getTopicProvider()))
+                .flatMap(provider -> provider.createSubscriber(entity.getTopicConfig()))
+                .flatMap(subscriber ->
+                    subscriber
+                        .subscribe()
+                        .map(template::copyWithMessage)
+                        .flatMap(notification -> gateway.publish(dispatch, notification))
+                        .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
+                        .then())
+                .subscribe()
+            );
+        log.debug("subscribe :{}({})", template.getTopicProvider(), template.getTopicName());
+
+        if (null != old) {
+            log.debug("close old subscriber:{}({})", template.getTopicProvider(), template.getTopicName());
+            old.dispose();
+        }
+    }
+
+    public Mono<Void> doSubscribe(NotifySubscriberEntity entity) {
+        return Mono.justOrEmpty(getProvider(entity.getTopicProvider()))
+            .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("不支持的主题:" + entity.getTopicProvider())))
+            .map(provider -> {
+                entity.setTopicName(provider.getName());
+                return entity;
+            })
+            .flatMap(subEntity -> {
+                if (StringUtils.isEmpty(entity.getId())) {
+                    entity.setId(null);
+                    return save(Mono.just(entity));
+                } else {
+                    return createUpdate().set(entity)
+                        .where(NotifySubscriberEntity::getId, entity.getId())
+                        .and(NotifySubscriberEntity::getSubscriberType, entity.getSubscriberType())
+                        .and(NotifySubscriberEntity::getSubscriber, entity.getSubscriber())
+                        .execute();
+                }
+            }).then();
+
+    }
+
+    @Override
+    public void run(String... args) {
+        doStart();
+        createQuery()
+            .where(NotifySubscriberEntity::getState, SubscribeState.enabled)
+            .fetch()
+            .doOnNext(this::handleSubscribe)
+            .subscribe();
+    }
+}

+ 18 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Notify.java

@@ -0,0 +1,18 @@
+package org.jetlinks.community.notify.manager.subscriber;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@AllArgsConstructor(staticName = "of")
+@Getter
+@Setter
+@NoArgsConstructor
+public class Notify {
+    private String message;
+
+    private String dataId;
+
+    private long notifyTime;
+}

+ 8 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Subscriber.java

@@ -0,0 +1,8 @@
+package org.jetlinks.community.notify.manager.subscriber;
+
+import reactor.core.publisher.Flux;
+
+public interface Subscriber {
+
+    Flux<Notify> subscribe();
+}

+ 16 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProvider.java

@@ -0,0 +1,16 @@
+package org.jetlinks.community.notify.manager.subscriber;
+
+import org.jetlinks.core.metadata.ConfigMetadata;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+
+public interface SubscriberProvider {
+    String getId();
+
+    String getName();
+
+    Mono<Subscriber> createSubscriber(Map<String, Object> config);
+
+    ConfigMetadata getConfigMetadata();
+}

+ 74 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java

@@ -0,0 +1,74 @@
+package org.jetlinks.community.notify.manager.subscriber.providers;
+
+import com.alibaba.fastjson.JSONObject;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DefaultConfigMetadata;
+import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.pro.ValueObject;
+import org.jetlinks.pro.gateway.MessageGateway;
+import org.jetlinks.pro.gateway.Subscription;
+import org.jetlinks.pro.notify.manager.subscriber.Notify;
+import org.jetlinks.pro.notify.manager.subscriber.Subscriber;
+import org.jetlinks.pro.notify.manager.subscriber.SubscriberProvider;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+
+@Component
+public class DeviceAlarmProvider implements SubscriberProvider {
+
+    private final MessageGateway messageGateway;
+
+    public DeviceAlarmProvider(MessageGateway messageGateway) {
+        this.messageGateway = messageGateway;
+    }
+
+    @Override
+    public String getId() {
+        return "device_alarm";
+    }
+
+    @Override
+    public String getName() {
+        return "设备告警";
+    }
+
+    @Override
+    public ConfigMetadata getConfigMetadata() {
+        return new DefaultConfigMetadata()
+            .add("productId", "产品ID", "产品ID,支持通配符:*", StringType.GLOBAL)
+            .add("deviceId", "设备ID", "设备ID,支持通配符:*", StringType.GLOBAL)
+            .add("productId", "告警ID", "告警ID,支持通配符:*", StringType.GLOBAL)
+            ;
+    }
+
+    @Override
+    public Mono<Subscriber> createSubscriber(Map<String, Object> config) {
+        ValueObject configs = ValueObject.of(config);
+
+        String productId = configs.getString("productId").orElse("*");
+        String deviceId = configs.getString("deviceId").orElse("*");
+        String alarmId = configs.getString("alarmId").orElse("*");
+
+        // TODO: 2020/7/16 如果用户存在租户,并且设置了通配符,应该从资产总获取所有信息进行分别订阅。
+        Flux<Notify> flux = messageGateway
+            .subscribe(Subscription.asList(
+                String.format("/rule-engine/device/alarm/%s/%s/%s", productId, deviceId, alarmId)),
+                messageGateway.nextSubscriberId("device-alarm-notifications"),
+                false)
+            .map(msg -> {
+                JSONObject json = msg.getMessage().payloadAsJson();
+
+                return Notify.of(
+                    String.format("设备[%s]发生告警:[%s]!", json.getString("deviceName"), json.getString("alarmName")),
+                    json.getString("alarmId"),
+                    System.currentTimeMillis()
+                );
+
+            });
+
+        return Mono.just(() -> flux);
+    }
+}

+ 161 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/web/NotificationController.java

@@ -0,0 +1,161 @@
+package org.jetlinks.community.notify.manager.web;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.api.crud.entity.PagerResult;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.hswebframework.web.authorization.Authentication;
+import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.exception.UnAuthorizedException;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.community.notify.manager.entity.NotificationEntity;
+import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
+import org.jetlinks.community.notify.manager.enums.SubscribeState;
+import org.jetlinks.community.notify.manager.service.NotificationService;
+import org.jetlinks.community.notify.manager.service.NotifySubscriberService;
+import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+@RestController
+@RequestMapping("/notifications")
+public class NotificationController {
+
+    private final NotificationService notificationService;
+
+    private final NotifySubscriberService subscriberService;
+
+    private final List<SubscriberProvider> providers;
+
+    public NotificationController(NotificationService notificationService,
+                                  NotifySubscriberService subscriberService,
+                                  List<SubscriberProvider> providers) {
+        this.notificationService = notificationService;
+        this.subscriberService = subscriberService;
+        this.providers = providers;
+    }
+
+    @GetMapping("/subscriptions/_query")
+    @Authorize(ignore = true)
+    public Mono<PagerResult<NotifySubscriberEntity>> querySubscription(QueryParamEntity query) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> query.toNestQuery(q -> q
+                .where(NotifySubscriberEntity::getSubscriberType, "user")
+                .and(NotifySubscriberEntity::getSubscriber, auth.getUser().getId()))
+                .execute(subscriberService::queryPager));
+
+    }
+
+    @PutMapping("/subscription/{id}/_{state}")
+    @Authorize(ignore = true)
+    public Mono<Void> changeSubscribeState(@PathVariable String id, @PathVariable SubscribeState state) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> subscriberService
+                .createUpdate()
+                .set(NotifySubscriberEntity::getState, state)
+                .where(NotifySubscriberEntity::getId, id)
+                .and(NotifySubscriberEntity::getSubscriber, auth.getUser().getId())
+                .and(NotifySubscriberEntity::getSubscriberType, "user")
+                .execute()
+                .then()
+            );
+    }
+
+    @DeleteMapping("/subscription/{id}")
+    @Authorize(ignore = true)
+    public Mono<Void> deleteSubscription(@PathVariable String id) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> subscriberService
+                .createDelete()
+                .where(NotifySubscriberEntity::getId, id)
+                .and(NotifySubscriberEntity::getSubscriber, auth.getUser().getId())
+                .and(NotifySubscriberEntity::getSubscriberType, "user")
+                .execute()
+                .then()
+            );
+    }
+
+
+    @PatchMapping("/subscribe")
+    @Authorize(ignore = true)
+    public Mono<NotifySubscriberEntity> doSubscribe(@RequestBody Mono<NotifySubscriberEntity> subscribe) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> subscribe
+                .doOnNext(e -> {
+                    e.setSubscriberType("user");
+                    e.setSubscriber(auth.getUser().getId());
+                })
+                .flatMap(e -> subscriberService
+                    .doSubscribe(e)
+                    .thenReturn(e)));
+    }
+
+    @GetMapping("/providers")
+    @Authorize(merge = false)
+    public Flux<SubscriberProviderInfo> getProviders() {
+        return Flux
+            .fromIterable(providers)
+            .map(SubscriberProviderInfo::of);
+    }
+
+
+    @GetMapping("/_query")
+    @Authorize(ignore = true)
+    public Mono<PagerResult<NotificationEntity>> queryMyNotifications(QueryParamEntity query) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> query.toNestQuery(q -> q
+                .where(NotificationEntity::getSubscriberType, "user")
+                .and(NotificationEntity::getSubscriber, auth.getUser().getId()))
+                .execute(notificationService::queryPager)
+                .defaultIfEmpty(PagerResult.empty()));
+
+    }
+
+    @GetMapping("/{id}/read")
+    @Authorize(ignore = true)
+    public Mono<NotificationEntity> readNotification(@PathVariable String id) {
+        return Authentication
+            .currentReactive()
+            .switchIfEmpty(Mono.error(UnAuthorizedException::new))
+            .flatMap(auth -> QueryParamEntity.newQuery()
+                .where(NotificationEntity::getSubscriberType, "user")
+                .and(NotificationEntity::getSubscriber, auth.getUser().getId())
+                .and(NotificationEntity::getId, id)
+                .execute(notificationService::findAndMarkRead)
+                .singleOrEmpty()
+            );
+    }
+
+
+    @Getter
+    @Setter
+    public static class SubscriberProviderInfo {
+        private String id;
+
+        private String name;
+
+        private ConfigMetadata metadata;
+
+        public static SubscriberProviderInfo of(SubscriberProvider provider) {
+            SubscriberProviderInfo info = new SubscriberProviderInfo();
+            info.id = provider.getId();
+            info.name = provider.getName();
+            info.setMetadata(provider.getConfigMetadata());
+            return info;
+        }
+    }
+
+}