package com.fastbee.mqtt.service.impl; import com.fastbee.common.utils.gateway.mq.TopicsUtils; import com.fastbee.mqtt.model.ClientMessage; import com.fastbee.mqtt.service.IMessageStore; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * Retain will Qos12消息存储接口 -TODO 后续Redis处理 * * @author gsb * @date 2022/10/14 14:35 */ @Service public class MessageStoreImpl implements IMessageStore { /** * 存储消息,保留消息,遗留消息 */ private final Map willOrRetainMap = new ConcurrentHashMap<>(); /** * Qos2 Pub消息 */ private final Map publishMap = new ConcurrentHashMap<>(); /** * Qos2 REL IN消息 */ private final Set outRelSet = new HashSet<>(); /** * Qos2 REL out */ private final Set inRelSet = new HashSet<>(); /** * 存储控制包 * * @param topic: 控制包所属主题 * @param clientMessage: 需要存储的消息 */ @Override public void storeMessage(String topic, ClientMessage clientMessage) { willOrRetainMap.put(topic, clientMessage); } /** * 清除topic下的所有消息 * * @param topic: 主题 */ @Override public void cleanTopic(String topic) { willOrRetainMap.remove(topic); } /** * 根据clientId清除消息 * * @param clientId: 客户端唯一标识 */ @Override public void removeMessage(String clientId) { for (Map.Entry entry : willOrRetainMap.entrySet()) { if (entry.getValue().getClientId().equals(clientId)) { willOrRetainMap.remove(entry.getKey()); } } } /** * 匹配主题过滤器,匹配消息 * * @param topicFilter: 主题过滤器 */ @Override public List searchMessages(String topicFilter) { List messageList = new ArrayList<>(); for (String topic : willOrRetainMap.keySet()) { if (TopicsUtils.matchTopic(topic, topicFilter)) { messageList.add(willOrRetainMap.get(topic)); } } return messageList; } /** * 保存 clientMessage * * @param messageId 消息id */ @Override public void savePubMsg(Integer messageId, ClientMessage clientMessage){ publishMap.put(messageId,clientMessage); } /** * 移除 * * @param messageId 消息id */ @Override public void removePubMsg(int messageId){ publishMap.remove(messageId); } /** * 保存 REL IN * * @param messageId 消息id */ @Override public void saveRelInMsg(int messageId){ inRelSet.add(messageId); } /** * 保存 REL OUT * * @param messageId 消息id */ @Override public void saveRelOutMsg(int messageId){ outRelSet.add(messageId); } /** * 移除 * * @param messageId 消息id */ @Override public void removeRelInMsg(int messageId){ inRelSet.remove(messageId); } /** * 移除 * * @param messageId 消息id */ @Override public void removeRelOutMsg(int messageId){ outRelSet.remove(messageId); } /** * 判断Rel out是否包含消息id */ @Override public boolean outRelContains(int messageId){ return outRelSet.contains(messageId); } }