package com.fastbee.mq.mqttClient;
|
|
|
import lombok.Data;
|
import lombok.NoArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
|
/**
|
* mqtt客户端回调
|
*/
|
@Slf4j
|
@Component
|
@Data
|
@NoArgsConstructor
|
public class PubMqttCallBack implements MqttCallbackExtended {
|
|
|
/**
|
* mqtt客户端
|
*/
|
private MqttAsyncClient client;
|
/**
|
* 创建客户端参数
|
*/
|
private MqttConnectOptions options;
|
|
@Resource
|
private MqttService mqttService;
|
|
private Boolean enabled;
|
|
|
public PubMqttCallBack(MqttAsyncClient client, MqttConnectOptions options,Boolean enabled) {
|
this.client = client;
|
this.options = options;
|
this.enabled = enabled;
|
}
|
|
/**
|
* mqtt客户端连接
|
*
|
* @param cause 错误
|
*/
|
@Override
|
public void connectionLost(Throwable cause) {
|
|
// 连接丢失后,一般在这里面进行重连
|
log.debug("=>mqtt 连接丢失", cause);
|
int count = 1;
|
// int sleepTime = 0;
|
boolean willConnect = true;
|
while (willConnect) {
|
try {
|
Thread.sleep(1000);
|
log.debug("=>连接[{}]断开,尝试重连第{}次", this.client.getServerURI(), count++);
|
this.client.connect(this.options);
|
log.debug("=>重连成功");
|
willConnect = false;
|
} catch (Exception e) {
|
log.error("=>重连异常", e);
|
}
|
}
|
}
|
|
/**
|
* 客户端订阅主题回调消息
|
*
|
* @param topic 主题
|
* @param message 消息
|
*/
|
@Override
|
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
// subscribe后得到的消息会执行到这里面
|
try {
|
mqttService.subscribeCallback(topic, message);
|
} catch (Exception e) {
|
log.warn("mqtt 订阅消息异常", e);
|
}
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
|
}
|
|
|
/**
|
* 监听mqtt连接消息
|
*/
|
@Override
|
public void connectComplete(boolean reconnect, String serverURI) {
|
log.info("MQTT内部客户端已经连接!");
|
System.out.print("" +
|
" * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * \n" +
|
" * _⚲_⚲_ ______ _ ____ * \n" +
|
" * | / \\ | | ____| | | | _ \\ * \n" +
|
" * | | | ● | | | | |__ __ _ ___| |_ | |_) | ___ ___ * \n" +
|
" * | \\ / | | __/ _` / __| __| | _ < / _ \\/ _ \\ * \n" +
|
" * \\ / | | | (_| \\__ \\ |_ | |_) | __/ __/ * \n" +
|
" * V |_| \\__,_|___/\\__| |____/ \\___|\\___| * \n" +
|
" * * \n"+
|
" * * * * * * * * * * * * FastBee物联网平台[✔启动成功] * * * * * * * * * * * * \n");
|
|
//连接后订阅, enable为false表示使用emq
|
if (!enabled) {
|
try {
|
mqttService.subscribe(client);
|
} catch (MqttException e) {
|
log.error("=>订阅主题失败 error={}", e.getMessage());
|
}
|
}
|
}
|
}
|