|
@@ -93,6 +93,7 @@ public class DefaultMessageGateway implements MessageGateway {
|
|
|
return Flux.defer(() -> root.find(message.getTopic())
|
|
|
.flatMapIterable(TopicPart::getSessionId)
|
|
|
.flatMap(id -> Mono.justOrEmpty(sessions.get(id)))
|
|
|
+ .distinct(ConnectionSession::getId)
|
|
|
.filter(connectionSession -> connectionSession.isAlive() && filter.test(connectionSession))
|
|
|
.flatMap(session ->
|
|
|
session.connection
|