|
@@ -18,10 +18,10 @@ import java.util.function.Consumer;
|
|
|
public class RedissionMessageSubscribe<M extends Message> implements MessageSubscribe<M> {
|
|
|
private MessageSubject iam;
|
|
|
private RedissonClient redisson;
|
|
|
-
|
|
|
- private boolean running = false;
|
|
|
-
|
|
|
- private List<Consumer<M>> consumers = new ArrayList<>();
|
|
|
+ private boolean running = false;
|
|
|
+ private int listenerId = 0;
|
|
|
+ private List<Consumer<M>> consumers = new ArrayList<>();
|
|
|
+ private RTopic<M> topic;
|
|
|
|
|
|
public RedissionMessageSubscribe(MessageSubject iam, RedissonClient redisson) {
|
|
|
this.iam = iam;
|
|
@@ -47,6 +47,16 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void cancel() {
|
|
|
+ running = false;
|
|
|
+ if (listenerId != 0 && topic != null) {
|
|
|
+ topic.removeListener(listenerId);
|
|
|
+ topic = null;
|
|
|
+ }
|
|
|
+ consumers.clear();
|
|
|
+ }
|
|
|
+
|
|
|
private static SerializationCodec codec = new SerializationCodec();
|
|
|
|
|
|
private void doRun() {
|
|
@@ -59,7 +69,11 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
|
|
|
try {
|
|
|
countDownLatch.trySetCount(1);
|
|
|
countDownLatch.await();
|
|
|
- consumers.forEach(cons -> cons.accept(queue.peek()));
|
|
|
+ consumers.forEach(cons -> {
|
|
|
+ M message = queue.poll();
|
|
|
+ if (null != message)
|
|
|
+ cons.accept(message);
|
|
|
+ });
|
|
|
} catch (InterruptedException e) {
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
@@ -74,8 +88,8 @@ public class RedissionMessageSubscribe<M extends Message> implements MessageSubs
|
|
|
return;
|
|
|
}
|
|
|
if (iam instanceof TopicMessageSubject) {
|
|
|
- RTopic<M> topic = redisson.getTopic("topic_" + ((TopicMessageSubject) iam).getTopic(), codec);
|
|
|
- topic.addListener((channel, msg) -> consumers.forEach(cons -> cons.accept(msg)));
|
|
|
+ topic = redisson.getTopic("topic_" + ((TopicMessageSubject) iam).getTopic(), codec);
|
|
|
+ listenerId = topic.addListener((channel, msg) -> consumers.forEach(cons -> cons.accept(msg)));
|
|
|
}
|
|
|
running = true;
|
|
|
}
|