package com.fastbee.mq.mqttClient; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.redis.RedisCache; import com.fastbee.common.enums.FunctionReplyStatus; import com.fastbee.common.exception.ServiceException; import com.fastbee.iot.domain.FunctionLog; import com.fastbee.iot.service.IFunctionLogService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 发布服务mqtt客户端 */ @Component @Slf4j public class PubMqttClient { @Resource private MqttClientConfig mqttConfig; @Resource(name = "pubMqttCallBack") private PubMqttCallBack mqttCallBack; /** * 连接配置 */ private MqttConnectOptions options; /** * MQTT异步客户端 */ private MqttAsyncClient client; /** * 是否连接标记 */ private boolean isConnected = false; @Resource private RedisCache redisCache; @Resource private IFunctionLogService functionLogService; /** * 启动MQTT客户端 */ public synchronized void initialize() { try { setOptions(); createClient(); while (!client.isConnected()) { IMqttToken token = client.connect(options); if(token != null && token.isComplete()) { log.debug("=>内部MQTT客户端启动成功"); this.isConnected = true; break; } log.debug("=>内部mqtt客户端连接中..."); Thread.sleep(20000); } } catch (MqttException ex) { log.error("=>MQTT客户端初始化异常", ex); } catch (Exception e) { log.error("=>连接MQTT服务器异常", e); this.isConnected = false; } } public boolean isConnected() { return this.isConnected; } private void createClient() { try { if (client == null) { /*host为主机名,clientId是连接MQTT的客户端ID*/ client = new MqttAsyncClient(mqttConfig.getHostUrl(), getClientId(), new MemoryPersistence()); //设置回调函数 client.setCallback(mqttCallBack); mqttCallBack.setClient(client); mqttCallBack.setOptions(this.options); mqttCallBack.setEnabled(mqttConfig.getEnabled()); } } catch (Exception e) { log.error("=>mqtt客户端创建错误"); } } /** * 设置连接属性 */ private void setOptions() { if (options != null) { options = null; } options = new MqttConnectOptions(); options.setConnectionTimeout(mqttConfig.getTimeout()); options.setKeepAliveInterval(mqttConfig.getKeepalive()); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); //设置自动重新连接 options.setAutomaticReconnect(true); /*设置为false,断开连接,不清除session,重连后还是原来的session 保留订阅的主题,能接收离线期间的消息*/ options.setCleanSession(true); } /** * 断开与mqtt的连接 */ public synchronized void disconnect() { //判断客户端是否null 是否连接 if (client != null && client.isConnected()) { try { IMqttToken token = client.disconnect(); token.waitForCompletion(); } catch (MqttException e) { log.error("=>断开mqtt连接发生错误 message={}", e.getMessage()); throw new ServiceException("断开mqtt连接发生错误" + e.getMessage()); } } client = null; } /** * 重新连接MQTT */ public synchronized void refresh() { disconnect(); initialize(); } /** * 拼接客户端id */ public final String getClientId() { return FastBeeConstant.SERVER.WM_PREFIX + System.currentTimeMillis(); } /** * 发布qos=1,非持久化 */ public void publish(String topic, byte[] pushMessage, FunctionLog log) { try { redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L); redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24); publish(pushMessage, topic, false, 0); if (null != log) { //存储服务下发成功 log.setResultMsg(FunctionReplyStatus.NORELY.getMessage()); log.setResultCode(FunctionReplyStatus.NORELY.getCode()); functionLogService.insertFunctionLog(log); } } catch (Exception e) { if (null != log) { //服务下发失败存储 log.setResultMsg(FunctionReplyStatus.FAIl.getMessage() + "原因: " + e.getMessage()); log.setResultCode(FunctionReplyStatus.FAIl.getCode()); functionLogService.insertFunctionLog(log); } } } /** * 发布主题 * * @param message payload消息体 * @param topic 主题 * @param retained 是否保留消息 * @param qos 消息质量 * Qos1:消息发送一次,不确保 * Qos2:至少分发一次,服务器确保接收消息进行确认 * Qos3:只分发一次,确保消息送达和只传递一次 */ public void publish(byte[] message, String topic, boolean retained, int qos) { //设置mqtt消息 MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message); IMqttDeliveryToken token; try { token = client.publish(topic, mqttMessage); token.waitForCompletion(); } catch (MqttPersistenceException e) { log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage()); throw new ServiceException(e.getMessage()); } catch (MqttException ex) { throw new ServiceException(ex.getMessage()); } } /** * 发布 * * @param qos 连接方式 * @param retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void publish(int qos, boolean retained, String topic, String pushMessage) { redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L); redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24); log.info("发布主题[{}],发布消息[{}]" + topic,pushMessage); MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); try { IMqttDeliveryToken token = client.publish(topic, message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage()); } } }