Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
T
TDLCloud
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
WitCloud
TDLCloud
Commits
58563571
Commit
58563571
authored
Sep 04, 2019
by
zhuangzhuang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1.增加mqtt的监听
parent
085162c5
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
305 additions
and
3 deletions
+305
-3
MqttListener.java
src/main/java/com/example/tdl/mqtt/MqttListener.java
+302
-0
application.properties
src/main/resources/application.properties
+3
-3
No files found.
src/main/java/com/example/tdl/mqtt/MqttListener.java
View file @
58563571
package
com
.
example
.
tdl
.
mqtt
;
import
com.example.tdl.callabletask.GWConfigWorker
;
import
com.example.tdl.config.mqtt.MqttConfig
;
import
com.example.tdl.config.mqtt.MqttTemlateAsync
;
import
com.example.tdl.domain.vo.CheckInVo
;
import
com.example.tdl.domain.vo.ConfigCMDVo
;
import
com.example.tdl.domain.vo.ConfigVo
;
import
com.example.tdl.service.ConfigService
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
com.google.gson.JsonParser
;
import
com.google.gson.JsonSyntaxException
;
import
org.apache.commons.lang3.StringUtils
;
import
org.eclipse.paho.client.mqttv3.*
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.PostConstruct
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:41 2018/3/13
* @Modified By:
**/
@Component
public
class
MqttListener
implements
MqttCallback
{
private
Logger
logger
=
LoggerFactory
.
getLogger
(
MqttListener
.
class
);
private
static
final
int
INIT_DELAY_DEFAULT
=
1000
;
// unit:ms
private
static
final
int
SCH_PERIOD_DEFAULT
=
10
*
INIT_DELAY_DEFAULT
;
// unit:ms
private
static
final
String
CHECKIN_TOPIC
=
"TDL/+/+/CheckIn"
;
private
MqttTemlateAsync
mqttAsyncClient
;
private
Gson
gson
=
new
Gson
();
@Autowired
private
MqttConfig
mqttconfig
;
@Autowired
ThreadPoolTaskExecutor
configthreadPool
;
@Autowired
private
ConfigService
configService
;
Map
<
String
,
Future
<
String
>>
futureMap
=
new
HashMap
<
String
,
Future
<
String
>>();
Map
<
String
,
GWConfigWorker
>
gwconfigworkerMap
=
new
HashMap
<
String
,
GWConfigWorker
>();
private
ScheduledExecutorService
mReconnectScheduler
;
private
long
mInitDelay
=
INIT_DELAY_DEFAULT
;
private
long
mSchedulePeriod
=
SCH_PERIOD_DEFAULT
;
private
String
listener_topic
=
CHECKIN_TOPIC
;
@PostConstruct
public
void
initialize
()
throws
MqttException
{
try
{
mqttAsyncClient
=
new
MqttTemlateAsync
(
mqttconfig
.
getUrl
()
+
":"
+
mqttconfig
.
getPort
(),
"TDL_CheckIn"
+
"_"
+
String
.
valueOf
(
System
.
currentTimeMillis
())
+
"_sub"
);
mqttAsyncClient
.
connect
(
MqttTemlateAsync
.
setSSLOptions
(
mqttconfig
.
getUsername
(),
mqttconfig
.
getPassword
(),
mqttconfig
.
getCacrt
(),
mqttconfig
.
getClientcrt
(),
mqttconfig
.
getClientkey
(),
mqttconfig
.
getClientpwd
()),
null
,
new
IMqttActionListener
()
{
@Override
public
void
onSuccess
(
IMqttToken
iMqttToken
)
{
System
.
out
.
println
(
"connection successfull "
);
try
{
mqttAsyncClient
.
subscribe
(
listener_topic
,
mqttconfig
.
getQos
());
}
catch
(
MqttException
e
)
{
System
.
out
.
println
(
"Error: "
+
e
.
getMessage
());
}
}
@Override
public
void
onFailure
(
IMqttToken
iMqttToken
,
Throwable
throwable
)
{
System
.
out
.
println
(
"connection fail with client id"
);
}
});
this
.
mqttAsyncClient
.
setCallback
(
this
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
private
void
clientReconnect
()
{
if
(
mReconnectScheduler
!=
null
)
{
// mReconnectScheduler is running.
return
;
}
mReconnectScheduler
=
Executors
.
newSingleThreadScheduledExecutor
();
mReconnectScheduler
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
// TODO Auto-generated method stub
if
(
mqttAsyncClient
!=
null
&&
!
mqttAsyncClient
.
isConnected
())
{
try
{
mqttAsyncClient
.
reconnect
();
}
catch
(
MqttSecurityException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
catch
(
MqttException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
}
else
{
mReconnectScheduler
.
shutdown
();
mReconnectScheduler
=
null
;
}
}
},
mInitDelay
,
mSchedulePeriod
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
void
connectionLost
(
Throwable
cause
)
{
System
.
out
.
println
(
cause
);
// clientReconnect();
}
private
void
clearOvertimeConfig
(
Long
ts
){
try
{
for
(
String
key
:
gwconfigworkerMap
.
keySet
()){
String
[]
tmparray
=
key
.
split
(
"_"
);
Long
tskey
=
Long
.
valueOf
(
tmparray
[
2
]);
if
(
ts
-
tskey
>
60
*
1000
){
GWConfigWorker
tmpgwconfigworker
=
gwconfigworkerMap
.
get
(
key
);
Boolean
boo
=
tmpgwconfigworker
.
GCConfigClose
();
if
(
boo
){
gwconfigworkerMap
.
remove
(
key
);
}
}
}
}
catch
(
Exception
e
){
System
.
out
.
println
(
e
.
getMessage
());
}
}
@Override
public
void
messageArrived
(
String
topic
,
MqttMessage
mqttMessage
)
throws
Exception
{
String
Message
=
mqttMessage
.
toString
();
clearOvertimeConfig
(
System
.
currentTimeMillis
());
String
[]
tmparray
=
topic
.
split
(
"/"
);
String
Type
=
tmparray
[
1
];
String
SN
=
tmparray
[
2
];
if
(
SN
.
matches
(
"^[0-9]*$"
)){
try
{
Integer
flag
=
parseData
(
Message
,
SN
,
Type
);
String
ConfigData
;
Future
<
String
>
result
=
null
;
GWConfigWorker
gcconfig
=
null
;
logger
.
info
(
topic
+
"-----"
+
flag
);
if
(
flag
==
0
){
ConfigCMDVo
configCMDVo
=
new
ConfigCMDVo
();
configCMDVo
.
setAction
(
"sleep"
);
configCMDVo
.
setT
(
System
.
currentTimeMillis
()/
1000
l
);
ConfigData
=
new
Gson
().
toJson
(
configCMDVo
);
gcconfig
=
new
GWConfigWorker
(
Type
,
SN
,
ConfigData
,
configService
,
false
);
}
else
if
(
flag
==
2
){
ConfigCMDVo
configCMDVo
=
new
ConfigCMDVo
();
configCMDVo
.
setAction
(
"end"
);
configCMDVo
.
setT
(
System
.
currentTimeMillis
()/
1000
l
);
ConfigData
=
new
Gson
().
toJson
(
configCMDVo
);
gcconfig
=
new
GWConfigWorker
(
Type
,
SN
,
ConfigData
,
configService
,
true
);
}
else
if
(
flag
==
1
){
String
message
=
configService
.
getConfig
(
SN
,
Type
).
getMessage
();
ConfigCMDVo
configCMDVo
=
gson
.
fromJson
(
message
,
ConfigCMDVo
.
class
);
ConfigCMDVo
config
=
new
ConfigCMDVo
(
configCMDVo
.
getAction
(),
configCMDVo
.
getGpsPeriod
(),
configCMDVo
.
getGprsPeriod
(),
configCMDVo
.
getDevList
(),
configCMDVo
.
getMode
(),
System
.
currentTimeMillis
()/
1000
l
);
gcconfig
=
new
GWConfigWorker
(
Type
,
SN
,
gson
.
toJson
(
config
),
configService
,
true
);
}
else
if
(
flag
==
3
){
String
message
=
configService
.
getConfig
(
SN
,
Type
).
getMessage
();
ConfigCMDVo
configCMDVo
=
gson
.
fromJson
(
message
,
ConfigCMDVo
.
class
);
configCMDVo
.
setT
(
System
.
currentTimeMillis
()/
1000
l
);
gcconfig
=
new
GWConfigWorker
(
Type
,
SN
,
gson
.
toJson
(
configCMDVo
),
configService
,
true
);
}
gwconfigworkerMap
.
put
(
Type
+
"_"
+
SN
+
"_"
+
System
.
currentTimeMillis
(),
gcconfig
);
gcconfig
.
SetMqttConfig
(
mqttconfig
.
getUrl
(),
mqttconfig
.
getPort
(),
mqttconfig
.
getUsername
(),
mqttconfig
.
getPassword
(),
mqttconfig
.
getQos
(),
mqttconfig
.
getCacrt
(),
mqttconfig
.
getClientkey
(),
mqttconfig
.
getClientcrt
(),
mqttconfig
.
getClientpwd
());
result
=
configthreadPool
.
submit
(
gcconfig
);
}
catch
(
JsonSyntaxException
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"mqttAsyncClient: message "
+
Message
+
" received syntax error."
);
}
catch
(
IllegalStateException
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
"mqttAsyncClient: "
+
e
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
System
.
out
.
println
(
e
.
toString
());
}
}
}
@Override
public
void
deliveryComplete
(
IMqttDeliveryToken
iMqttDeliveryToken
)
{
}
//checkIn数据解析
public
Integer
parseData
(
String
message
,
String
SN
,
String
Type
){
ConfigVo
configVo
=
configService
.
getConfig
(
SN
,
Type
);
//网关不存在
if
(
configVo
==
null
){
return
10
;
//无返回值
}
CheckInVo
checkInVo
=
new
Gson
().
fromJson
(
message
,
CheckInVo
.
class
);
if
(
configVo
.
getUntie
()
&&
StringUtils
.
isEmpty
(
configVo
.
getMessage
())
){
return
2
;
//解绑
}
if
(
StringUtils
.
isEmpty
(
configVo
.
getMessage
())
&&
configVo
.
getUntie
()
==
false
){
return
10
;
//无返回值
}
JsonObject
returnData
=
new
JsonParser
().
parse
(
message
).
getAsJsonObject
();
ConfigCMDVo
configCMDVo
=
new
Gson
().
fromJson
(
configVo
.
getMessage
(),
ConfigCMDVo
.
class
);
if
(
configCMDVo
==
null
){
return
10
;
//无返回值
}
if
(
returnData
.
has
(
"tempL"
)
&&
returnData
.
has
(
"tempH"
)
&&
returnData
.
has
(
"humiL"
)
&&
returnData
.
has
(
"humiH"
)
&&
returnData
.
has
(
"pressL"
)
&&
returnData
.
has
(
"pressH"
)
&&
returnData
.
has
(
"sugEnergy"
)
&&
returnData
.
has
(
"accThreshold"
)
&&
returnData
.
has
(
"timeThreshold"
)
&&
returnData
.
has
(
"tiltThreshold"
)){
if
(
checkInVo
.
getDevList
().
size
()==
0
&&
configCMDVo
!=
null
){
return
3
;
//重发
}
if
(
configCMDVo
.
getDevList
().
size
()
==
0
){
return
10
;
//无返回值
}
if
(
compare
(
configCMDVo
.
getDevList
(),
checkInVo
.
getDevList
())
&&
checkInVo
.
getGprsPeriod
().
equals
(
configCMDVo
.
getGprsPeriod
())
&&
checkInVo
.
getGpsPeriod
().
equals
(
configCMDVo
.
getGpsPeriod
())
&&
checkInVo
.
getMode
().
equals
(
configCMDVo
.
getMode
())
&&
compare
(
configCMDVo
.
getTempL
(),
checkInVo
.
getTempL
())
&&
compare
(
configCMDVo
.
getTempH
(),
checkInVo
.
getTempH
())
&&
compare
(
configCMDVo
.
getHumiL
(),
checkInVo
.
getHumiL
())
&&
compare
(
configCMDVo
.
getHumiH
(),
checkInVo
.
getHumiH
())
&&
compare
(
configCMDVo
.
getPressH
(),
checkInVo
.
getPressH
())
&&
compare
(
configCMDVo
.
getPressL
(),
checkInVo
.
getPressL
())
&&
compare
(
configCMDVo
.
getSugEnergy
(),
checkInVo
.
getSugEnergy
())
&&
compare
(
configCMDVo
.
getAccThreshold
(),
checkInVo
.
getAccThreshold
())
&&
compare
(
configCMDVo
.
getTimeThreshold
(),
checkInVo
.
getTimeThreshold
())
&&
compare
(
configCMDVo
.
getTiltThreshold
(),
checkInVo
.
getTiltThreshold
()))
{
//一致
return
0
;
//休眠
}
else
{
//不一致
return
3
;
//重发
}
}
else
{
if
(
checkInVo
.
getDevList
().
size
()==
0
&&
configCMDVo
!=
null
){
return
1
;
//重发
}
//判断两者的devList是否一致
if
(
compare
(
configCMDVo
.
getDevList
(),
checkInVo
.
getDevList
())
&&
checkInVo
.
getGprsPeriod
()==(
configCMDVo
.
getGprsPeriod
())
&&
checkInVo
.
getGpsPeriod
()==(
configCMDVo
.
getGpsPeriod
())
&&
checkInVo
.
getMode
()==(
configCMDVo
.
getMode
()))
{
//一致
return
0
;
//休眠
}
else
{
//不一致
return
1
;
//重发
}
}
}
//判断两个list是否一致
public
synchronized
<
T
extends
Comparable
<
T
>>
boolean
compare
(
List
<
T
>
a
,
List
<
T
>
b
)
{
if
(
a
.
size
()
!=
b
.
size
()){
return
false
;
}
for
(
int
i
=
0
;
i
<
a
.
size
();
i
++)
{
if
(!
a
.
get
(
i
).
equals
(
b
.
get
(
i
)))
return
false
;
}
return
true
;
}
}
\ No newline at end of file
src/main/resources/application.properties
View file @
58563571
...
...
@@ -41,10 +41,10 @@ logging.config=classpath:logback.xml
# Mqtt
tdl.mqtt.url
=
ssl://
172.16.1.24
tdl.mqtt.url
=
ssl://
47.96.128.181
tdl.mqtt.port
=
8883
tdl.mqtt.username
=
ugen
tdl.mqtt.password
=
ugen
tdl.mqtt.username
=
logistics
tdl.mqtt.password
=
logistics37774020
tdl.mqtt.qos
=
1
tdl.mqtt.timeout
=
20
# SSL Keys
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment