Commit 7b679b6f authored by zhuangzhuang's avatar zhuangzhuang

5.9---清除config,放置emq卡死

parent 9a8a367e
...@@ -109,7 +109,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -109,7 +109,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
public String call() throws Exception { public String call() throws Exception {
try { try {
mqttAsyncClient = new MqttTemlateAsync(this.url + ":" + this.port, mqttAsyncClient = new MqttTemlateAsync(this.url + ":" + this.port,
Device + "_" + String.valueOf(System.currentTimeMillis()) + "_config"); Device + "_" + String.valueOf(System.currentTimeMillis()) + "_TDLconfig");
mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(this.username, this.password, mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(this.username, this.password,
this.cacrt, this.clientcrt, this.clientkey, this.clientpwd), null, this.cacrt, this.clientcrt, this.clientkey, this.clientpwd), null,
new IMqttActionListener() {//监听 new IMqttActionListener() {//监听
...@@ -141,7 +141,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -141,7 +141,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
System.out.println("connection fail with client id"); System.out.println("connection fail with client id");
} }
}); });
if(NeedResp){
this.mqttAsyncClient.setCallback(this); this.mqttAsyncClient.setCallback(this);
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
logger.info(e.toString()); logger.info(e.toString());
...@@ -195,6 +197,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -195,6 +197,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
if(respVo.getResponse().equals("config")){ if(respVo.getResponse().equals("config")){
//配置回复 //配置回复
int i= configService.bindSuccess(SN,Type,respVo.getStatus()); int i= configService.bindSuccess(SN,Type,respVo.getStatus());
}else if(respVo.getResponse().equals("end")){ }else if(respVo.getResponse().equals("end")){
//解绑回复 //解绑回复
if(respVo.getStatus() ==0 || respVo.getStatus() == 1){ if(respVo.getStatus() ==0 || respVo.getStatus() == 1){
...@@ -236,7 +239,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -236,7 +239,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
public void GCConfigClose(){ public void GCConfigClose(){
try { try {
if(mqttAsyncClient.isConnected()){
mqttAsyncClient.disconnect(); mqttAsyncClient.disconnect();
}
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
} }
......
...@@ -52,6 +52,8 @@ public class MqttListener implements MqttCallback { ...@@ -52,6 +52,8 @@ public class MqttListener implements MqttCallback {
Map<String,Future<String>> futureMap = new HashMap<String, Future<String>>(); Map<String,Future<String>> futureMap = new HashMap<String, Future<String>>();
Map<String,GWConfigWorker> gwconfigworkerMap = new HashMap<String, GWConfigWorker>();
private ScheduledExecutorService mReconnectScheduler; private ScheduledExecutorService mReconnectScheduler;
private long mInitDelay = INIT_DELAY_DEFAULT; private long mInitDelay = INIT_DELAY_DEFAULT;
private long mSchedulePeriod = SCH_PERIOD_DEFAULT; private long mSchedulePeriod = SCH_PERIOD_DEFAULT;
...@@ -124,14 +126,16 @@ public class MqttListener implements MqttCallback { ...@@ -124,14 +126,16 @@ public class MqttListener implements MqttCallback {
private void clearOvertimeConfig(Long ts){ private void clearOvertimeConfig(Long ts){
try { try {
for(String key : futureMap.keySet()){ for(String key : gwconfigworkerMap.keySet()){
String[] tmparray = key.split("_"); String[] tmparray = key.split("_");
Long tskey = Long.valueOf(tmparray[2]); Long tskey = Long.valueOf(tmparray[2]);
if(ts - tskey > 60*1000){ if(ts - tskey > 60*1000){
Future<String> result = futureMap.get(key); GWConfigWorker tmpgwconfigworker = gwconfigworkerMap.get(key);
result.cancel(true); tmpgwconfigworker.GCConfigClose();
if(result.isCancelled()) // Future<String> result = futureMap.get(key);
futureMap.remove(key); // result.cancel(true);
// if(result.isCancelled())
// futureMap.remove(key);
} }
} }
}catch (Exception e){ }catch (Exception e){
...@@ -174,7 +178,8 @@ public class MqttListener implements MqttCallback { ...@@ -174,7 +178,8 @@ public class MqttListener implements MqttCallback {
mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(), mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(),
mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd()); mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd());
result = configthreadPool.submit(gcconfig); result = configthreadPool.submit(gcconfig);
futureMap.put(Type+"_"+SN+"_"+timestamp,result); //futureMap.put(Type+"_"+SN+"_"+timestamp,result);
gwconfigworkerMap.put(Type+"_"+SN+"_"+timestamp,gcconfig);
// while (result.isDone()) // while (result.isDone())
// { // {
// String respcmd = result.get(); // String respcmd = result.get();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment