Commit d796e99a authored by wangjunqiangs's avatar wangjunqiangs

添加简单的超时清理,防止config线程过多,卡死emq(未测试)

parent 6e15f3cc
...@@ -50,7 +50,7 @@ public class MqttListener implements MqttCallback { ...@@ -50,7 +50,7 @@ public class MqttListener implements MqttCallback {
@Autowired @Autowired
private ConfigService configService; private ConfigService configService;
Map<String,Future<String>> futureMap = new HashMap<String,Future<String>>(); Map<String,Future<String>> futureMap = new HashMap<String, Future<String>>();
private ScheduledExecutorService mReconnectScheduler; private ScheduledExecutorService mReconnectScheduler;
private long mInitDelay = INIT_DELAY_DEFAULT; private long mInitDelay = INIT_DELAY_DEFAULT;
...@@ -121,6 +121,23 @@ public class MqttListener implements MqttCallback { ...@@ -121,6 +121,23 @@ public class MqttListener implements MqttCallback {
// clientReconnect(); // clientReconnect();
} }
private void clearOvertimeConfig(Long ts){
try {
for(String key : futureMap.keySet()){
String[] tmparray = key.split("_");
Long tskey = Long.valueOf(tmparray[2]);
if(ts - tskey > 60*1000){
Future<String> result = futureMap.get(key);
result.cancel(true);
if(result.isCancelled())
futureMap.remove(key);
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String Message = mqttMessage.toString(); String Message = mqttMessage.toString();
...@@ -128,6 +145,7 @@ public class MqttListener implements MqttCallback { ...@@ -128,6 +145,7 @@ public class MqttListener implements MqttCallback {
String[] tmparray = topic.split("/"); String[] tmparray = topic.split("/");
String Type = tmparray[1]; String Type = tmparray[1];
String SN = tmparray[2]; String SN = tmparray[2];
clearOvertimeConfig(timestamp);
if(SN.matches("^[0-9]*$")){ if(SN.matches("^[0-9]*$")){
try { try {
Integer flag = parseData(Message,SN,Type); Integer flag = parseData(Message,SN,Type);
...@@ -155,11 +173,13 @@ public class MqttListener implements MqttCallback { ...@@ -155,11 +173,13 @@ public class MqttListener implements MqttCallback {
mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(), mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(),
mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd()); mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd());
result = configthreadPool.submit(gcconfig); result = configthreadPool.submit(gcconfig);
futureMap.put(Type+"_"+SN+"_"+timestamp,result);
// while (result.isDone()) // while (result.isDone())
// { // {
// String respcmd = result.get(); // String respcmd = result.get();
// } // }
// futureMap.put(timestamp.toString(),result); //
} catch (JsonSyntaxException e) { } catch (JsonSyntaxException e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("mqttAsyncClient: message " + Message + " received syntax error."); System.out.println("mqttAsyncClient: message " + Message + " received syntax error.");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment