diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java index c733431515901e17816151100a86095e92c1141c..0f2bc8c93b390712323bc01c18e7aceb5bb9d7c7 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java @@ -35,7 +35,12 @@ public interface IntegrateCluster extends IntegrateGetter { */ String getLocalNode(); - + /** + * acquire local node id + * + * @return String + */ + String getLocalNodeConsistentId(); /** * 訂閱 diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java index cd9464bce52a1d2cc1efca6dbf794b2390ebe118..cfd5e8fc312fc7494e2156e99772eb3562748cd6 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java @@ -22,6 +22,8 @@ public class PublishRecProtocol implements Protocol { ReceiveContext receiveContext = contextView.get(ReceiveContext.class); LogManager logManager = receiveContext.getLogManager(); logManager.printWarn(mqttChannel, LogEvent.PUBLISH_REC, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); + //MQTT-QoS=2时订阅时收到重试消息问题:发布收到报文收到后执行取消重试功能 + contextView.get(ReceiveContext.class).getRetryManager().cancelRetry(mqttChannel, message.getMessageId()); receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment(); mqttChannel.write(MqttMessageUtils.buildPublishRel(message.getMessageId())); } diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java index be667838b9df8cdca71d70159c574c2df2bdc029..93b5e862ead24d5187211fbb70f4e921b3a4b146 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java @@ -63,6 +63,10 @@ public class IgniteIntegrateCluster implements IntegrateCluster, Serializable { return igniteIntegrate.getIgnite().cluster().localNode().addresses().stream().findFirst().orElse(ServerUtils.serverIp); } + @Override + public String getLocalNodeConsistentId() { + return igniteIntegrate.getIgnite().cluster().localNode().consistentId().toString(); + } @Override public void listenTopic(String topic) { diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java index 8592d085f064a23fae6e8c391d70438cb270748e..b1c16bd6e3e0adc8cad2f0040431a2878b4d949b 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java @@ -6,6 +6,7 @@ import io.github.quickmsg.common.integrate.Integrate; import io.github.quickmsg.common.integrate.SubscribeTopic; import io.github.quickmsg.common.integrate.topic.IntegrateTopics; import io.github.quickmsg.common.metric.CounterType; +import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.common.utils.TopicRegexUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -17,6 +18,7 @@ import org.apache.ignite.configuration.CollectionConfiguration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -65,7 +67,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { integrate.getCluster().listenTopic(topic); mqttChannel.getTopics().add(subscribeTopic); if (isWildcard(topic)) { - shareCache.add(topic); + shareCache.add(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } } @@ -94,7 +96,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { private void clearCache(String topic) { integrate.getCluster().stopListenTopic(topic); if (isWildcard(topic)) { - shareCache.remove(topic); + shareCache.remove(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } @@ -122,7 +124,16 @@ public class IgniteIntegrateTopics implements IntegrateTopics { @Override public Set getWildcardTopics(String topic) { - return shareCache.stream().filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); + Set nodeIds = integrate.getCluster().getClusterNode(); + return shareCache.stream().map(tp -> { + for (String id : nodeIds) { + if(tp.contains(id)){ + tp = tp.replaceFirst(id, ""); + break; + } + } + return tp; + }).filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); } @Override