Commit ec85cc9b authored by Carit Zhu's avatar Carit Zhu 🎱

Add unknown data flag check for MqttListener and add some logger print.

parent 1f0a9cd3
...@@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME ...@@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-all.zip
package com.example.tdl.callabletask; package com.example.tdl.callabletask;
import com.example.tdl.config.mqtt.MqttTemlateAsync; import com.example.tdl.config.mqtt.MqttTemlateAsync;
import com.example.tdl.domain.vo.CheckInVo;
import com.example.tdl.domain.vo.ConfigCMDVo;
import com.example.tdl.domain.vo.ConfigVo;
import com.example.tdl.domain.vo.RespVo; import com.example.tdl.domain.vo.RespVo;
import com.example.tdl.entity.Config;
import com.example.tdl.service.ConfigService; import com.example.tdl.service.ConfigService;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
...@@ -115,7 +104,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -115,7 +104,7 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
new IMqttActionListener() {//监听 new IMqttActionListener() {//监听
@Override @Override
public void onSuccess(IMqttToken iMqttToken) { public void onSuccess(IMqttToken iMqttToken) {
System.out.println("connection successfull "); logger.info("connection successfull ");
try { try {
if(NeedResp){ if(NeedResp){
mqttAsyncClient.subscribe(RespTopic, qos);//订阅主题 mqttAsyncClient.subscribe(RespTopic, qos);//订阅主题
...@@ -130,23 +119,23 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -130,23 +119,23 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
// } // }
} catch (MqttException e){ } catch (MqttException e){
e.printStackTrace(); e.printStackTrace();
System.out.println("Error: " + e.getMessage()); logger.error("GWConfigWorker call connect MqttException: " + e.getMessage());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("Error: " + e.getMessage()); System.out.println("GWConfigWorker call connect Exception: " + e.getMessage());
} }
} }
@Override @Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
System.out.println("connection fail with client id"); logger.error("connection fail with client id");
} }
}); });
if(NeedResp){ if (NeedResp) {
this.mqttAsyncClient.setCallback(this); this.mqttAsyncClient.setCallback(this);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
logger.info(e.toString()); logger.error("GWConfigWorker call Exception: " + e.getMessage());
} }
return ret_data; return ret_data;
} }
...@@ -207,19 +196,18 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -207,19 +196,18 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
if(NeedResp){ if(NeedResp){
mqttAsyncClient.unsubscribe(RespTopic);//不监听回复主题 mqttAsyncClient.unsubscribe(RespTopic);//不监听回复主题
} }
mqttAsyncClient.disconnect();
} catch (JsonSyntaxException e){ } catch (JsonSyntaxException e){
mqttAsyncClient.disconnect();
e.printStackTrace(); e.printStackTrace();
System.out.println("mqttAsyncClient: message " + Message + " received syntax error."); logger.error("GWConfigWorker: message " + Message + " received syntax error.");
} catch (IllegalStateException e){ } catch (IllegalStateException e){
mqttAsyncClient.disconnect();
e.printStackTrace(); e.printStackTrace();
System.out.println("mqttAsyncClient: " + e); logger.error("GWConfigWorker IllegalStateException: " + e.getMessage());
} catch (Exception e){ } catch (Exception e){
mqttAsyncClient.disconnect();
e.printStackTrace(); e.printStackTrace();
logger.info(e.toString()); logger.error("GWConfigWorker message arrived exception: " + e.getMessage());
} finally {
mqttAsyncClient.disconnect();
logger.info("(" + topic + ")Finish push config message, then disconnect.");
} }
} }
...@@ -244,11 +232,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback { ...@@ -244,11 +232,9 @@ public class GWConfigWorker implements Callable<String>, MqttCallback {
return true; return true;
} }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); logger.error("GCConfigClose exception: " + e.getMessage());
return false; return false;
} }
return true; return true;
} }
} }
...@@ -76,7 +76,7 @@ public class MqttListener implements MqttCallback { ...@@ -76,7 +76,7 @@ public class MqttListener implements MqttCallback {
new IMqttActionListener() { new IMqttActionListener() {
@Override @Override
public void onSuccess(IMqttToken iMqttToken) { public void onSuccess(IMqttToken iMqttToken) {
System.out.println("connection successfull "); logger.info("connection successfull ");
try { try {
mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos()); mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos());
} catch (MqttException e) { } catch (MqttException e) {
...@@ -163,28 +163,31 @@ public class MqttListener implements MqttCallback { ...@@ -163,28 +163,31 @@ public class MqttListener implements MqttCallback {
Future<String> result = null; Future<String> result = null;
GWConfigWorker gcconfig = null; GWConfigWorker gcconfig = null;
logger.info(topic+"-----"+ flag); logger.info(topic+"-----"+ flag);
if(flag == 0){ if (flag == 0) {
ConfigCMDVo configCMDVo = new ConfigCMDVo(); ConfigCMDVo configCMDVo = new ConfigCMDVo();
configCMDVo.setAction("sleep"); configCMDVo.setAction("sleep");
configCMDVo.setT(System.currentTimeMillis()/1000l); configCMDVo.setT(System.currentTimeMillis()/1000l);
ConfigData = new Gson().toJson(configCMDVo); ConfigData = new Gson().toJson(configCMDVo);
gcconfig = new GWConfigWorker(Type,SN,ConfigData, configService,false); gcconfig = new GWConfigWorker(Type,SN,ConfigData, configService,false);
}else if(flag == 2){ } else if(flag == 2) {
ConfigCMDVo configCMDVo = new ConfigCMDVo(); ConfigCMDVo configCMDVo = new ConfigCMDVo();
configCMDVo.setAction("end"); configCMDVo.setAction("end");
configCMDVo.setT(System.currentTimeMillis()/1000l); configCMDVo.setT(System.currentTimeMillis()/1000l);
ConfigData = new Gson().toJson(configCMDVo); ConfigData = new Gson().toJson(configCMDVo);
gcconfig = new GWConfigWorker(Type,SN,ConfigData,configService, true); gcconfig = new GWConfigWorker(Type,SN,ConfigData,configService, true);
}else if(flag == 1){ } else if(flag == 1) {
String message= configService.getConfig(SN,Type).getMessage(); String message= configService.getConfig(SN,Type).getMessage();
ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class); ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
ConfigCMDVo config = new ConfigCMDVo(configCMDVo.getAction(),configCMDVo.getGpsPeriod(),configCMDVo.getGprsPeriod(),configCMDVo.getDevList(),configCMDVo.getMode(),System.currentTimeMillis()/1000l); ConfigCMDVo config = new ConfigCMDVo(configCMDVo.getAction(),configCMDVo.getGpsPeriod(),configCMDVo.getGprsPeriod(),configCMDVo.getDevList(),configCMDVo.getMode(),System.currentTimeMillis()/1000l);
gcconfig = new GWConfigWorker(Type,SN,gson.toJson(config),configService,true); gcconfig = new GWConfigWorker(Type,SN,gson.toJson(config),configService,true);
}else if(flag == 3){ } else if(flag == 3) {
String message= configService.getConfig(SN,Type).getMessage(); String message= configService.getConfig(SN,Type).getMessage();
ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class); ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
configCMDVo.setT(System.currentTimeMillis()/1000l); configCMDVo.setT(System.currentTimeMillis()/1000l);
gcconfig = new GWConfigWorker(Type,SN,gson.toJson(configCMDVo),configService,true); gcconfig = new GWConfigWorker(Type,SN,gson.toJson(configCMDVo),configService,true);
} else {
logger.error(topic + ": received unknown flag!");
return;
} }
gwconfigworkerMap.put(Type+"_"+SN+"_"+System.currentTimeMillis(),gcconfig); gwconfigworkerMap.put(Type+"_"+SN+"_"+System.currentTimeMillis(),gcconfig);
gcconfig.SetMqttConfig(mqttconfig.getUrl(),mqttconfig.getPort(), gcconfig.SetMqttConfig(mqttconfig.getUrl(),mqttconfig.getPort(),
...@@ -193,13 +196,13 @@ public class MqttListener implements MqttCallback { ...@@ -193,13 +196,13 @@ public class MqttListener implements MqttCallback {
result = configthreadPool.submit(gcconfig); result = configthreadPool.submit(gcconfig);
} catch (JsonSyntaxException e) { } catch (JsonSyntaxException e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("mqttAsyncClient: message " + Message + " received syntax error."); logger.error("MqttListener: message " + Message + " received syntax error.");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
e.printStackTrace(); e.printStackTrace();
System.out.println("mqttAsyncClient: " + e); logger.error("MqttListener IllegalStateException: " + e.getMessage());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
System.out.println(e.toString()); logger.error("MqttListener message arrived exception: " + e.getMessage());
} }
} }
} }
......
...@@ -21,10 +21,12 @@ spring.messages.cache-seconds= 3600 ...@@ -21,10 +21,12 @@ spring.messages.cache-seconds= 3600
spring.messages.encoding=UTF-8 spring.messages.encoding=UTF-8
tdl.redis.host=172.16.1.14 #tdl.redis.host=172.16.1.14
#tdl.redis.password=
tdl.redis.host=47.97.184.225
tdl.redis.password=Witium37774020
tdl.redis.index=0 tdl.redis.index=0
tdl.redis.port=6379 tdl.redis.port=6379
tdl.redis.password=
tdl.redis.poolmaxactive=8 tdl.redis.poolmaxactive=8
tdl.redis.poolmaxwait=-1 tdl.redis.poolmaxwait=-1
tdl.redis.poolmaxidle=8 tdl.redis.poolmaxidle=8
...@@ -37,10 +39,11 @@ logging.config=classpath:logback.xml ...@@ -37,10 +39,11 @@ logging.config=classpath:logback.xml
# Mqtt # Mqtt
tdl.mqtt.url = ssl://172.16.1.24 #tdl.mqtt.url = ssl://172.16.1.24
tdl.mqtt.url = ssl://47.96.128.181
tdl.mqtt.port = 8883 tdl.mqtt.port = 8883
tdl.mqtt.username = ugen tdl.mqtt.username = logistics
tdl.mqtt.password = ugen tdl.mqtt.password = logistics37774020
tdl.mqtt.qos = 1 tdl.mqtt.qos = 1
tdl.mqtt.timeout = 20 tdl.mqtt.timeout = 20
# SSL Keys # SSL Keys
...@@ -51,15 +54,15 @@ tdl.mqtt.clientpwd = ...@@ -51,15 +54,15 @@ tdl.mqtt.clientpwd =
# InfluxDB # InfluxDB
spring.influxdb.username=witcloud #spring.influxdb.username=witcloud
spring.influxdb.password=Witcloud37774020 #spring.influxdb.password=Witcloud37774020
spring.influxdb.url=https://ts-bp1q738i505oj79t7.influxdata.rds.aliyuncs.com:3242 #spring.influxdb.url=https://ts-bp1q738i505oj79t7.influxdata.rds.aliyuncs.com:3242
spring.influxdb.database=logistics #spring.influxdb.database=logistics
#spring.influxdb.url=http://47.97.184.225:8086 spring.influxdb.url=http://172.16.1.14:8086
#spring.influxdb.username=admin spring.influxdb.username=admin
#spring.influxdb.password=37774020 spring.influxdb.password=37774020
#spring.influxdb.database=original spring.influxdb.database=TDLData2
spring.influxdb.retentionPolicy = tdl_policy spring.influxdb.retentionPolicy = tdl_policy
spring.influxdb.connent-timeout=10 spring.influxdb.connent-timeout=10
spring.influxdb.read-timeout=30 spring.influxdb.read-timeout=30
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment