Commit b0729f01 authored by zhuangzhuang's avatar zhuangzhuang

注释mqtt部分代码

parent bed86874
Pipeline #182 canceled with stage
in 0 seconds
......@@ -9,8 +9,8 @@ import org.springframework.context.annotation.Configuration;
* @Date: Created in 9:25 2018/3/13
* @Modified By:
**/
@Configuration
@ConfigurationProperties(prefix = "tdl.mqtt")
//@Configuration
//@ConfigurationProperties(prefix = "tdl.mqtt")
public class MqttConfig {
private String url;
......
......@@ -36,268 +36,282 @@ import java.util.concurrent.TimeUnit;
* @Modified By:
**/
@Component
//@Component
public class MqttListener implements MqttCallback {
private Logger logger = LoggerFactory.getLogger(MqttListener.class);
private static final int INIT_DELAY_DEFAULT = 1000; // unit:ms
private static final int SCH_PERIOD_DEFAULT = 10 * INIT_DELAY_DEFAULT; // unit:ms
private static final String CHECKIN_TOPIC = "TDL/+/+/CheckIn";
private MqttTemlateAsync mqttAsyncClient;
private Gson gson = new Gson();
@Autowired
private MqttConfig mqttconfig;
@Autowired
ThreadPoolTaskExecutor configthreadPool;
@Autowired
private ConfigService configService;
Map<String,Future<String>> futureMap = new HashMap<String, Future<String>>();
Map<String,GWConfigWorker> gwconfigworkerMap = new HashMap<String, GWConfigWorker>();
private ScheduledExecutorService mReconnectScheduler;
private long mInitDelay = INIT_DELAY_DEFAULT;
private long mSchedulePeriod = SCH_PERIOD_DEFAULT;
private String listener_topic = CHECKIN_TOPIC;
@PostConstruct
public void initialize() throws MqttException {
try {
mqttAsyncClient = new MqttTemlateAsync(mqttconfig.getUrl() + ":" + mqttconfig.getPort(),
"TDL_CheckIn" + "_" + String.valueOf(System.currentTimeMillis()) + "_sub");
mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(mqttconfig.getUsername(), mqttconfig.getPassword(),
mqttconfig.getCacrt(), mqttconfig.getClientcrt(),mqttconfig.getClientkey(), mqttconfig.getClientpwd()), null,
new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
System.out.println("connection successfull ");
try {
mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos());
} catch (MqttException e) {
System.out.println("Error: " + e.getMessage());
}
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
System.out.println("connection fail with client id");
}
});
this.mqttAsyncClient.setCallback(this);
} catch (Exception e) {
e.printStackTrace();
}
}
private void clientReconnect() {
if (mReconnectScheduler != null) {
// mReconnectScheduler is running.
return;
}
mReconnectScheduler = Executors.newSingleThreadScheduledExecutor();
mReconnectScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
if (mqttAsyncClient != null && !mqttAsyncClient.isConnected()) {
try {
mqttAsyncClient.reconnect();
} catch (MqttSecurityException e) {
// TODO: handle exception
e.printStackTrace();
} catch (MqttException e) {
// TODO: handle exception
e.printStackTrace();
}
} else {
mReconnectScheduler.shutdown();
mReconnectScheduler = null;
}
}
}, mInitDelay, mSchedulePeriod, TimeUnit.MILLISECONDS);
}
@Override
public void connectionLost(Throwable cause) {
System.out.println(cause);
// clientReconnect();
}
private void clearOvertimeConfig(Long ts){
try {
for(String key : gwconfigworkerMap.keySet()){
String[] tmparray = key.split("_");
Long tskey = Long.valueOf(tmparray[2]);
if(ts - tskey > 60*1000){
GWConfigWorker tmpgwconfigworker = gwconfigworkerMap.get(key);
Boolean boo = tmpgwconfigworker.GCConfigClose();
if(boo){
gwconfigworkerMap.remove(key);
}
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String Message = mqttMessage.toString();
clearOvertimeConfig(System.currentTimeMillis());
String[] tmparray = topic.split("/");
String Type = tmparray[1];
String SN = tmparray[2];
if(SN.matches("^[0-9]*$")){
try {
Integer flag = parseData(Message,SN,Type);
String ConfigData;
Future<String> result = null;
GWConfigWorker gcconfig = null;
logger.info(topic+"-----"+ flag);
if(flag == 0){
ConfigCMDVo configCMDVo = new ConfigCMDVo();
configCMDVo.setAction("sleep");
configCMDVo.setT(System.currentTimeMillis()/1000l);
ConfigData = new Gson().toJson(configCMDVo);
gcconfig = new GWConfigWorker(Type,SN,ConfigData, configService,false);
}else if(flag == 2){
ConfigCMDVo configCMDVo = new ConfigCMDVo();
configCMDVo.setAction("end");
configCMDVo.setT(System.currentTimeMillis()/1000l);
ConfigData = new Gson().toJson(configCMDVo);
gcconfig = new GWConfigWorker(Type,SN,ConfigData,configService, true);
}else if(flag == 1){
String message= configService.getConfig(SN,Type).getMessage();
ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
ConfigCMDVo config = new ConfigCMDVo(configCMDVo.getAction(),configCMDVo.getGpsPeriod(),configCMDVo.getGprsPeriod(),configCMDVo.getDevList(),configCMDVo.getMode(),System.currentTimeMillis()/1000l);
gcconfig = new GWConfigWorker(Type,SN,gson.toJson(config),configService,true);
}else if(flag == 3){
String message= configService.getConfig(SN,Type).getMessage();
ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
configCMDVo.setT(System.currentTimeMillis()/1000l);
gcconfig = new GWConfigWorker(Type,SN,gson.toJson(configCMDVo),configService,true);
}
gwconfigworkerMap.put(Type+"_"+SN+"_"+System.currentTimeMillis(),gcconfig);
gcconfig.SetMqttConfig(mqttconfig.getUrl(),mqttconfig.getPort(),
mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(),
mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd());
result = configthreadPool.submit(gcconfig);
} catch (JsonSyntaxException e) {
e.printStackTrace();
System.out.println("mqttAsyncClient: message " + Message + " received syntax error.");
} catch (IllegalStateException e) {
e.printStackTrace();
System.out.println("mqttAsyncClient: " + e);
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.toString());
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
//checkIn数据解析
public Integer parseData(String message,String SN,String Type){
ConfigVo configVo = configService.getConfig(SN,Type);
//网关不存在
if(configVo == null){
return 10;//无返回值
}
CheckInVo checkInVo = new Gson().fromJson(message,CheckInVo.class);
if(configVo.getUntie() && StringUtils.isEmpty(configVo.getMessage()) ){
return 2;//解绑
}
if(StringUtils.isEmpty(configVo.getMessage()) && configVo.getUntie() == false){
return 10;//无返回值
}
JsonObject returnData = new JsonParser().parse(message).getAsJsonObject();
ConfigCMDVo configCMDVo = new Gson().fromJson(configVo.getMessage(),ConfigCMDVo.class);
if(configCMDVo == null){
return 10;//无返回值
}
if(returnData.has("tempL")
&&returnData.has("tempH")
&&returnData.has("humiL")
&&returnData.has("humiH")
&&returnData.has("pressL")
&&returnData.has("pressH")
&&returnData.has("sugEnergy")
&&returnData.has("accThreshold")
&&returnData.has("timeThreshold")
&&returnData.has("tiltThreshold")){
if(checkInVo.getDevList().size()==0 && configCMDVo !=null ){
return 3;//重发
}
if(configCMDVo.getDevList().size() == 0){
return 10;//无返回值
}
if (compare(configCMDVo.getDevList(),checkInVo.getDevList())
&& checkInVo.getGprsPeriod().equals(configCMDVo.getGprsPeriod())
&& checkInVo.getGpsPeriod().equals(configCMDVo.getGpsPeriod())
&& checkInVo.getMode().equals(configCMDVo.getMode())
&& compare(configCMDVo.getTempL(),checkInVo.getTempL())
&& compare(configCMDVo.getTempH(),checkInVo.getTempH())
&& compare(configCMDVo.getHumiL(),checkInVo.getHumiL())
&& compare(configCMDVo.getHumiH(),checkInVo.getHumiH())
&& compare(configCMDVo.getPressH(),checkInVo.getPressH())
&& compare(configCMDVo.getPressL(),checkInVo.getPressL())
&& compare(configCMDVo.getSugEnergy(),checkInVo.getSugEnergy())
&& compare(configCMDVo.getAccThreshold(),checkInVo.getAccThreshold())
&& compare(configCMDVo.getTimeThreshold(),checkInVo.getTimeThreshold())
&& compare(configCMDVo.getTiltThreshold(),checkInVo.getTiltThreshold())) {
//一致
return 0;//休眠
} else {
//不一致
return 3;//重发
}
}else{
if(checkInVo.getDevList().size()==0 && configCMDVo !=null ){
return 1;//重发
}
//判断两者的devList是否一致
if (compare(configCMDVo.getDevList(),checkInVo.getDevList())
&& checkInVo.getGprsPeriod()==(configCMDVo.getGprsPeriod())
&& checkInVo.getGpsPeriod()==(configCMDVo.getGpsPeriod())
&& checkInVo.getMode()==(configCMDVo.getMode())) {
//一致
return 0;//休眠
} else {
//不一致
return 1;//重发
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
//判断两个list是否一致
public synchronized <T extends Comparable<T>> boolean compare(List<T> a, List<T> b) {
if (a.size() != b.size()){
return false;
}
for (int i = 0; i < a.size(); i++) {
if (!a.get(i).equals(b.get(i)))
return false;
}
return true;
}
// private Logger logger = LoggerFactory.getLogger(MqttListener.class);
// private static final int INIT_DELAY_DEFAULT = 1000; // unit:ms
// private static final int SCH_PERIOD_DEFAULT = 10 * INIT_DELAY_DEFAULT; // unit:ms
// private static final String CHECKIN_TOPIC = "TDL/+/+/CheckIn";
//
// private MqttTemlateAsync mqttAsyncClient;
//
// private Gson gson = new Gson();
//
// @Autowired
// private MqttConfig mqttconfig;
//
// @Autowired
// ThreadPoolTaskExecutor configthreadPool;
//
// @Autowired
// private ConfigService configService;
//
// Map<String,Future<String>> futureMap = new HashMap<String, Future<String>>();
//
// Map<String,GWConfigWorker> gwconfigworkerMap = new HashMap<String, GWConfigWorker>();
//
// private ScheduledExecutorService mReconnectScheduler;
// private long mInitDelay = INIT_DELAY_DEFAULT;
// private long mSchedulePeriod = SCH_PERIOD_DEFAULT;
// private String listener_topic = CHECKIN_TOPIC;
//
// @PostConstruct
// public void initialize() throws MqttException {
// try {
// mqttAsyncClient = new MqttTemlateAsync(mqttconfig.getUrl() + ":" + mqttconfig.getPort(),
// "TDL_CheckIn" + "_" + String.valueOf(System.currentTimeMillis()) + "_sub");
// mqttAsyncClient.connect(MqttTemlateAsync.setSSLOptions(mqttconfig.getUsername(), mqttconfig.getPassword(),
// mqttconfig.getCacrt(), mqttconfig.getClientcrt(),mqttconfig.getClientkey(), mqttconfig.getClientpwd()), null,
// new IMqttActionListener() {
// @Override
// public void onSuccess(IMqttToken iMqttToken) {
// System.out.println("connection successfull ");
// try {
// mqttAsyncClient.subscribe(listener_topic, mqttconfig.getQos());
// } catch (MqttException e) {
// System.out.println("Error: " + e.getMessage());
// }
// }
//
// @Override
// public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
// System.out.println("connection fail with client id");
// }
// });
// this.mqttAsyncClient.setCallback(this);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
// private void clientReconnect() {
// if (mReconnectScheduler != null) {
// // mReconnectScheduler is running.
// return;
// }
//
// mReconnectScheduler = Executors.newSingleThreadScheduledExecutor();
// mReconnectScheduler.scheduleAtFixedRate(new Runnable() {
// @Override
// public void run() {
// // TODO Auto-generated method stub
// if (mqttAsyncClient != null && !mqttAsyncClient.isConnected()) {
// try {
// mqttAsyncClient.reconnect();
// } catch (MqttSecurityException e) {
// // TODO: handle exception
// e.printStackTrace();
// } catch (MqttException e) {
// // TODO: handle exception
// e.printStackTrace();
// }
// } else {
// mReconnectScheduler.shutdown();
// mReconnectScheduler = null;
// }
// }
// }, mInitDelay, mSchedulePeriod, TimeUnit.MILLISECONDS);
// }
//
// @Override
// public void connectionLost(Throwable cause) {
// System.out.println(cause);
// // clientReconnect();
// }
//
//
// private void clearOvertimeConfig(Long ts){
// try {
// for(String key : gwconfigworkerMap.keySet()){
// String[] tmparray = key.split("_");
// Long tskey = Long.valueOf(tmparray[2]);
// if(ts - tskey > 60*1000){
// GWConfigWorker tmpgwconfigworker = gwconfigworkerMap.get(key);
// Boolean boo = tmpgwconfigworker.GCConfigClose();
// if(boo){
// gwconfigworkerMap.remove(key);
// }
// }
// }
// }catch (Exception e){
// System.out.println(e.getMessage());
// }
// }
//
// @Override
// public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// String Message = mqttMessage.toString();
// clearOvertimeConfig(System.currentTimeMillis());
// String[] tmparray = topic.split("/");
// String Type = tmparray[1];
// String SN = tmparray[2];
// if(SN.matches("^[0-9]*$")){
// try {
// Integer flag = parseData(Message,SN,Type);
// String ConfigData;
// Future<String> result = null;
// GWConfigWorker gcconfig = null;
// logger.info(topic+"-----"+ flag);
// if(flag == 0){
// ConfigCMDVo configCMDVo = new ConfigCMDVo();
// configCMDVo.setAction("sleep");
// configCMDVo.setT(System.currentTimeMillis()/1000l);
// ConfigData = new Gson().toJson(configCMDVo);
// gcconfig = new GWConfigWorker(Type,SN,ConfigData, configService,false);
// }else if(flag == 2){
// ConfigCMDVo configCMDVo = new ConfigCMDVo();
// configCMDVo.setAction("end");
// configCMDVo.setT(System.currentTimeMillis()/1000l);
// ConfigData = new Gson().toJson(configCMDVo);
// gcconfig = new GWConfigWorker(Type,SN,ConfigData,configService, true);
// }else if(flag == 1){
// String message= configService.getConfig(SN,Type).getMessage();
// ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
// ConfigCMDVo config = new ConfigCMDVo(configCMDVo.getAction(),configCMDVo.getGpsPeriod(),configCMDVo.getGprsPeriod(),configCMDVo.getDevList(),configCMDVo.getMode(),System.currentTimeMillis()/1000l);
// gcconfig = new GWConfigWorker(Type,SN,gson.toJson(config),configService,true);
// }else if(flag == 3){
// String message= configService.getConfig(SN,Type).getMessage();
// ConfigCMDVo configCMDVo = gson.fromJson(message,ConfigCMDVo.class);
// configCMDVo.setT(System.currentTimeMillis()/1000l);
// gcconfig = new GWConfigWorker(Type,SN,gson.toJson(configCMDVo),configService,true);
// }
// gwconfigworkerMap.put(Type+"_"+SN+"_"+System.currentTimeMillis(),gcconfig);
// gcconfig.SetMqttConfig(mqttconfig.getUrl(),mqttconfig.getPort(),
// mqttconfig.getUsername(), mqttconfig.getPassword(),mqttconfig.getQos(),
// mqttconfig.getCacrt(),mqttconfig.getClientkey(), mqttconfig.getClientcrt(),mqttconfig.getClientpwd());
// result = configthreadPool.submit(gcconfig);
// } catch (JsonSyntaxException e) {
// e.printStackTrace();
// System.out.println("mqttAsyncClient: message " + Message + " received syntax error.");
// } catch (IllegalStateException e) {
// e.printStackTrace();
// System.out.println("mqttAsyncClient: " + e);
// } catch (Exception e) {
// e.printStackTrace();
// System.out.println(e.toString());
// }
// }
// }
//
//
//
// @Override
// public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//
// }
//
// //checkIn数据解析
// public Integer parseData(String message,String SN,String Type){
// ConfigVo configVo = configService.getConfig(SN,Type);
// //网关不存在
// if(configVo == null){
// return 10;//无返回值
// }
// CheckInVo checkInVo = new Gson().fromJson(message,CheckInVo.class);
// if(configVo.getUntie() && StringUtils.isEmpty(configVo.getMessage()) ){
// return 2;//解绑
// }
// if(StringUtils.isEmpty(configVo.getMessage()) && configVo.getUntie() == false){
// return 10;//无返回值
// }
// JsonObject returnData = new JsonParser().parse(message).getAsJsonObject();
// ConfigCMDVo configCMDVo = new Gson().fromJson(configVo.getMessage(),ConfigCMDVo.class);
// if(configCMDVo == null){
// return 10;//无返回值
// }
// if(returnData.has("tempL")
// &&returnData.has("tempH")
// &&returnData.has("humiL")
// &&returnData.has("humiH")
// &&returnData.has("pressL")
// &&returnData.has("pressH")
// &&returnData.has("sugEnergy")
// &&returnData.has("accThreshold")
// &&returnData.has("timeThreshold")
// &&returnData.has("tiltThreshold")){
// if(checkInVo.getDevList().size()==0 && configCMDVo !=null ){
// return 3;//重发
// }
// if(configCMDVo.getDevList().size() == 0){
// return 10;//无返回值
// }
// if (compare(configCMDVo.getDevList(),checkInVo.getDevList())
// && checkInVo.getGprsPeriod().equals(configCMDVo.getGprsPeriod())
// && checkInVo.getGpsPeriod().equals(configCMDVo.getGpsPeriod())
// && checkInVo.getMode().equals(configCMDVo.getMode())
// && compare(configCMDVo.getTempL(),checkInVo.getTempL())
// && compare(configCMDVo.getTempH(),checkInVo.getTempH())
// && compare(configCMDVo.getHumiL(),checkInVo.getHumiL())
// && compare(configCMDVo.getHumiH(),checkInVo.getHumiH())
// && compare(configCMDVo.getPressH(),checkInVo.getPressH())
// && compare(configCMDVo.getPressL(),checkInVo.getPressL())
// && compare(configCMDVo.getSugEnergy(),checkInVo.getSugEnergy())
// && compare(configCMDVo.getAccThreshold(),checkInVo.getAccThreshold())
// && compare(configCMDVo.getTimeThreshold(),checkInVo.getTimeThreshold())
// && compare(configCMDVo.getTiltThreshold(),checkInVo.getTiltThreshold())) {
// //一致
// return 0;//休眠
// } else {
// //不一致
// return 3;//重发
// }
// }else{
// if(checkInVo.getDevList().size()==0 && configCMDVo !=null ){
// return 1;//重发
// }
// //判断两者的devList是否一致
// if (compare(configCMDVo.getDevList(),checkInVo.getDevList())
// && checkInVo.getGprsPeriod()==(configCMDVo.getGprsPeriod())
// && checkInVo.getGpsPeriod()==(configCMDVo.getGpsPeriod())
// && checkInVo.getMode()==(configCMDVo.getMode())) {
// //一致
// return 0;//休眠
// } else {
// //不一致
// return 1;//重发
// }
//
// }
// }
//
//
//
// //判断两个list是否一致
// public synchronized <T extends Comparable<T>> boolean compare(List<T> a, List<T> b) {
// if (a.size() != b.size()){
// return false;
// }
// for (int i = 0; i < a.size(); i++) {
// if (!a.get(i).equals(b.get(i)))
// return false;
// }
// return true;
// }
}
\ No newline at end of file
......@@ -27,10 +27,10 @@ server.port=8092
# InfluxDB
spring.influxdb.url=http://witcloud-influxdb:8086
tdl.mqtt.url = ssl://47.96.128.181
tdl.mqtt.port = 8883
tdl.mqtt.username = logistics
tdl.mqtt.password = logistics37774020
#tdl.mqtt.url = ssl://47.96.128.181
#tdl.mqtt.port = 8883
#tdl.mqtt.username = logistics
#tdl.mqtt.password = logistics37774020
mqtt.userName = wtlogistics
mqtt.password = wtlogistics
\ No newline at end of file
......@@ -41,17 +41,17 @@ logging.config=classpath:logback.xml
# Mqtt
tdl.mqtt.url = ssl://172.16.1.24
tdl.mqtt.port = 8883
tdl.mqtt.username = ugen
tdl.mqtt.password = ugen
tdl.mqtt.qos = 1
tdl.mqtt.timeout = 20
# SSL Keys
tdl.mqtt.cacrt = ca.crt
tdl.mqtt.clientkey = witcd.pem
tdl.mqtt.clientcrt = witcd.crt
tdl.mqtt.clientpwd =
#tdl.mqtt.url = ssl://172.16.1.24
#tdl.mqtt.port = 8883
#tdl.mqtt.username = ugen
#tdl.mqtt.password = ugen
#tdl.mqtt.qos = 1
#tdl.mqtt.timeout = 20
## SSL Keys
#tdl.mqtt.cacrt = ca.crt
#tdl.mqtt.clientkey = witcd.pem
#tdl.mqtt.clientcrt = witcd.crt
#tdl.mqtt.clientpwd =
# InfluxDB
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment