Commit aeb4355b authored by Carit Zhu's avatar Carit Zhu 🎱

Version 1.0.4:

1. 修改MqttListener支持断线重连后重新订阅checkIn主题.
parent 28873a7d
Pipeline #1581 passed with stage
in 0 seconds
...@@ -15,7 +15,7 @@ apply plugin: 'idea' ...@@ -15,7 +15,7 @@ apply plugin: 'idea'
apply plugin: 'org.springframework.boot' apply plugin: 'org.springframework.boot'
group = 'com.example' group = 'com.example'
version = '1.0.3-release' version = '1.0.4-release'
sourceCompatibility = 1.8 sourceCompatibility = 1.8
repositories { repositories {
......
...@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit; ...@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
**/ **/
@Component @Component
public class MqttListener implements MqttCallback { public class MqttListener implements MqttCallbackExtended {
private Logger logger = LoggerFactory.getLogger(MqttListener.class); private Logger logger = LoggerFactory.getLogger(MqttListener.class);
private static final int INIT_DELAY_DEFAULT = 1000; // unit:ms private static final int INIT_DELAY_DEFAULT = 1000; // unit:ms
...@@ -69,14 +69,15 @@ public class MqttListener implements MqttCallback { ...@@ -69,14 +69,15 @@ public class MqttListener implements MqttCallback {
@PostConstruct @PostConstruct
public void initialize() throws MqttException { public void initialize() throws MqttException {
try { try {
String clientId = "TDL_CheckIn" + "_" + String.valueOf(System.currentTimeMillis()) + "_sub";
mqttAsyncClient = new MqttTemlateAsync(mqttconfig.getUrl() + ":" + mqttconfig.getPort(), mqttAsyncClient = new MqttTemlateAsync(mqttconfig.getUrl() + ":" + mqttconfig.getPort(),
"TDL_CheckIn" + "_" + String.valueOf(System.currentTimeMillis()) + "_sub"); clientId);
mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(mqttconfig.getUsername(), mqttconfig.getPassword(), mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(mqttconfig.getUsername(), mqttconfig.getPassword(),
mqttconfig.getCacrt(), mqttconfig.getClientcrt(),mqttconfig.getClientkey(), mqttconfig.getClientpwd()), null, mqttconfig.getCacrt(), mqttconfig.getClientcrt(),mqttconfig.getClientkey(), mqttconfig.getClientpwd()), null,
new IMqttActionListener() { new IMqttActionListener() {
@Override @Override
public void onSuccess(IMqttToken iMqttToken) { public void onSuccess(IMqttToken iMqttToken) {
logger.info("connection successfull "); logger.info("connect successfully!");
try { try {
mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos()); mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos());
} catch (MqttException e) { } catch (MqttException e) {
...@@ -86,7 +87,13 @@ public class MqttListener implements MqttCallback { ...@@ -86,7 +87,13 @@ public class MqttListener implements MqttCallback {
@Override @Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
logger.error("connection fail with client id"); logger.error("connection fail(" + clientId + "), reason is " + throwable.getMessage());
try {
logger.info("Start to reconnect(" + clientId + ")");
mqttAsyncClient.reconnect();
} catch (MqttException e) {
logger.error("MqttAsyncService reconnect MqttException: " + e.getMessage());
}
} }
}); });
this.mqttAsyncClient.setCallback(this); this.mqttAsyncClient.setCallback(this);
...@@ -95,38 +102,24 @@ public class MqttListener implements MqttCallback { ...@@ -95,38 +102,24 @@ public class MqttListener implements MqttCallback {
} }
} }
private void clientReconnect() { private void resubscribeTopics() {
if (mReconnectScheduler != null) { try {
// mReconnectScheduler is running. mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos());
return; } catch (MqttException e) {
logger.error("connection MqttException: " + e.getMessage());
} }
}
mReconnectScheduler = Executors.newSingleThreadScheduledExecutor(); @Override
mReconnectScheduler.scheduleAtFixedRate(new Runnable() { public void connectComplete(boolean reconnect, String serverURI) {
@Override logger.info((reconnect ? "reconnect" : "connect") + " complete!");
public void run() { /* connect/reconnected complete, then subscribe topics stored */
// TODO Auto-generated method stub resubscribeTopics();
if (mqttAsyncClient != null && !mqttAsyncClient.isConnected()) {
try {
mqttAsyncClient.reconnect();
} catch (MqttSecurityException e) {
// TODO: handle exception
e.printStackTrace();
} catch (MqttException e) {
// TODO: handle exception
e.printStackTrace();
}
} else {
mReconnectScheduler.shutdown();
mReconnectScheduler = null;
}
}
}, mInitDelay, mSchedulePeriod, TimeUnit.MILLISECONDS);
} }
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
System.out.println(cause); logger.info("connection lost, reason is " + cause.getMessage());
// clientReconnect(); // clientReconnect();
} }
...@@ -288,8 +281,6 @@ public class MqttListener implements MqttCallback { ...@@ -288,8 +281,6 @@ public class MqttListener implements MqttCallback {
} }
} }
//判断两个list是否一致 //判断两个list是否一致
public synchronized <T extends Comparable<T>> boolean compare(List<T> a, List<T> b) { public synchronized <T extends Comparable<T>> boolean compare(List<T> a, List<T> b) {
if (a.size() != b.size()){ if (a.size() != b.size()){
...@@ -301,6 +292,4 @@ public class MqttListener implements MqttCallback { ...@@ -301,6 +292,4 @@ public class MqttListener implements MqttCallback {
} }
return true; return true;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment