Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
T
TDLCloud
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
WitCloud
TDLCloud
Commits
71ccf766
Commit
71ccf766
authored
Mar 13, 2018
by
wangjunqiangs
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add MQTT to Config GW
parent
1caf261c
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
721 additions
and
15 deletions
+721
-15
build.gradle
build.gradle
+1
-0
ca.crt
ca.crt
+22
-0
GWConfigWorker.java
...ain/java/com/example/tdl/callabletask/GWConfigWorker.java
+168
-1
ThreadPoolConfiguration.java
.../java/com/example/tdl/config/ThreadPoolConfiguration.java
+68
-1
MqttConfig.java
src/main/java/com/example/tdl/config/mqtt/MqttConfig.java
+89
-0
MqttTemlateAsync.java
...in/java/com/example/tdl/config/mqtt/MqttTemlateAsync.java
+152
-1
MqttListener.java
src/main/java/com/example/tdl/mqtt/MqttListener.java
+143
-1
application.properties
src/main/resources/application.properties
+14
-11
witcd.crt
witcd.crt
+36
-0
witcd.pem
witcd.pem
+28
-0
No files found.
build.gradle
View file @
71ccf766
...
...
@@ -44,6 +44,7 @@ dependencies {
compile
(
'com.alibaba:fastjson:1.1.37'
)
compile
(
'org.apache.commons:commons-compress:1.9'
)
compile
(
'org.springframework.boot:spring-boot-starter-data-redis'
)
compile
(
'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
)
}
jar
{
String
someString
=
''
...
...
ca.crt
0 → 100644
View file @
71ccf766
-----BEGIN CERTIFICATE-----
MIIDpzCCAo+gAwIBAgIJAPSf0joIQuzvMA0GCSqGSIb3DQEBDQUAMGoxFzAVBgNV
BAMMDkFuIE1RVFQgYnJva2VyMRYwFAYDVQQKDA1Pd25UcmFja3Mub3JnMRQwEgYD
VQQLDAtnZW5lcmF0ZS1DQTEhMB8GCSqGSIb3DQEJARYSbm9ib2R5QGV4YW1wbGUu
bmV0MB4XDTE3MTIxODA1NTQyN1oXDTMyMTIxNDA1NTQyN1owajEXMBUGA1UEAwwO
QW4gTVFUVCBicm9rZXIxFjAUBgNVBAoMDU93blRyYWNrcy5vcmcxFDASBgNVBAsM
C2dlbmVyYXRlLUNBMSEwHwYJKoZIhvcNAQkBFhJub2JvZHlAZXhhbXBsZS5uZXQw
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQD7xqyFWDSb+6nGAE5O++oR
/fRsMoWRI13qUqy1RHQ/P2y+D2jj41m0XKEvoqPwbMKwiOPQXz5CJ9fSc6j2YUuE
r2FVEb78MprvusFpYWHYk9ammBK4AQ2sW+Y6PgsXBmq9uejhlayF9frBZCqvz/Fl
It60ZEKxv5/hHi5T9cIrphbSsJAzXPXL0qhqeaHLcgBzJVZ7/pMxAQYE0XzgHMma
qbUFV4UK8HSilkv5q4OaEkd6vyLPcMRsqx77oksJjXcn2cdJGcbLKKM2y96p/s2I
jxkeSx+Z7mRLMB5mis+ucVWrtLNEjFO1Vry0nJDtX8m6/mubqqo5PZ+NL0puQ8Y1
AgMBAAGjUDBOMB0GA1UdDgQWBBSJeQbZY2YHxfORj42yZLfU6Ehn0TAfBgNVHSME
GDAWgBSJeQbZY2YHxfORj42yZLfU6Ehn0TAMBgNVHRMEBTADAQH/MA0GCSqGSIb3
DQEBDQUAA4IBAQDVQUa2E4DUz3sV4l+PoDd+/pHFpDTfhc0jgwaDZ3Bveu0Bhhhq
5wKQN3CXHZDPkhKeKAeqbLDcqyUccU7pMstpWS/fNNbKkYrMLvRG2aBg51zBa6Nl
h5lHLYwgwL9MBJMKaWyZMXjmOleuw6Sktj2GeJcUoQwjp07CfwoO+sTlZoRSzyfq
B8qzC9oXVAvftwHxr6OI7WzuSm0R3F35SiO+CEdnVdRnyF3GudERCaCkhUdra5ea
uPlJ7jaVG659m0/S14w5qkIserNrIZp2xTNlFET9+AZJwUXO9LvlVy0tdDNqKc5P
Xr1Tewwl0zBr5V19x7MF9kTF24xJMLJj+bEO
-----END CERTIFICATE-----
src/main/java/com/example/tdl/callabletask/GWConfigWorker.java
View file @
71ccf766
package
com
.
example
.
tdl
.
callabletask
;
import
com.example.tdl.config.mqtt.MqttTemlateAsync
;
import
com.google.gson.Gson
;
import
com.google.gson.JsonObject
;
import
com.google.gson.JsonParser
;
import
com.google.gson.JsonSyntaxException
;
import
org.eclipse.paho.client.mqttv3.*
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:34 2018/3/13
* @Modified By:
**/
public
class
GWConfigWorker
{
public
class
GWConfigWorker
implements
Callable
<
String
>,
MqttCallback
{
private
static
final
int
INIT_DELAY_DEFAULT
=
1000
;
// unit:ms
private
static
final
int
SCH_PERIOD_DEFAULT
=
10
*
INIT_DELAY_DEFAULT
;
// unit:ms
private
static
final
String
PUB_TOPIC
=
"/Config"
;
private
static
final
String
SUB_TOPIC
=
"/Config/Resp"
;
private
String
SN
;
private
String
Type
;
private
Boolean
NeedResp
=
false
;
private
String
Device
;
private
String
ConfigCMD
;
private
String
ConfigTopic
;
private
String
RespTopic
;
private
String
ret_data
=
null
;
private
MqttTemlateAsync
mqttAsyncClient
;
private
Gson
gson
=
new
Gson
();
private
ScheduledExecutorService
mReconnectScheduler
;
private
long
mInitDelay
=
INIT_DELAY_DEFAULT
;
private
long
mSchedulePeriod
=
SCH_PERIOD_DEFAULT
;
private
String
url
;
private
String
port
;
private
String
username
;
private
String
password
;
private
int
qos
;
private
String
cacrt
;
private
String
clientkey
;
private
String
clientcrt
;
private
String
clientpwd
;
public
void
SetMqttConfig
(
String
url
,
String
port
,
String
user
,
String
password
,
int
qos
,
String
mcacrt
,
String
mclientkey
,
String
mclientcrt
,
String
mclientpwd
)
{
this
.
url
=
url
;
this
.
port
=
port
;
this
.
username
=
user
;
this
.
password
=
password
;
this
.
qos
=
qos
;
this
.
cacrt
=
mcacrt
;
this
.
clientkey
=
mclientkey
;
this
.
clientcrt
=
mclientcrt
;
this
.
clientpwd
=
mclientpwd
;
}
public
GWConfigWorker
(
String
type
,
String
sn
,
String
CMD
,
Boolean
flag
)
throws
Exception
{
this
.
SN
=
sn
;
this
.
Type
=
type
;
this
.
Device
=
Type
+
"_"
+
SN
;
this
.
ConfigCMD
=
CMD
;
this
.
NeedResp
=
false
;
this
.
ConfigTopic
=
"GW/"
+
Type
+
"/"
+
SN
+
PUB_TOPIC
;
this
.
RespTopic
=
"GW/"
+
Type
+
"/"
+
SN
+
SUB_TOPIC
;
}
@Override
public
String
call
()
throws
Exception
{
try
{
mqttAsyncClient
=
new
MqttTemlateAsync
(
this
.
url
+
":"
+
this
.
port
,
Device
+
"_"
+
String
.
valueOf
(
System
.
currentTimeMillis
())
+
"_config"
);
mqttAsyncClient
.
connect
(
MqttTemlateAsync
.
setSSLOptions
(
this
.
username
,
this
.
password
,
this
.
cacrt
,
this
.
clientcrt
,
this
.
clientkey
,
this
.
clientpwd
),
null
,
new
IMqttActionListener
()
{
@Override
public
void
onSuccess
(
IMqttToken
iMqttToken
)
{
System
.
out
.
println
(
"connection successfull "
);
try
{
if
(
NeedResp
)
mqttAsyncClient
.
subscribe
(
RespTopic
,
qos
);
MqttMessage
msg
=
new
MqttMessage
();
msg
.
setQos
(
1
);
msg
.
setRetained
(
true
);
msg
.
setPayload
(
ConfigCMD
.
getBytes
());
mqttAsyncClient
.
publish
(
ConfigTopic
,
msg
);
}
catch
(
MqttException
e
)
{
System
.
out
.
println
(
"Error: "
+
e
.
getMessage
());
}
}
@Override
public
void
onFailure
(
IMqttToken
iMqttToken
,
Throwable
throwable
)
{
System
.
out
.
println
(
"connection fail with client id"
);
}
});
this
.
mqttAsyncClient
.
setCallback
(
this
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
ret_data
;
}
private
void
clientReconnect
()
{
if
(
mReconnectScheduler
!=
null
)
{
// mReconnectScheduler is running.
return
;
}
mReconnectScheduler
=
Executors
.
newSingleThreadScheduledExecutor
();
mReconnectScheduler
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
// TODO Auto-generated method stub
if
(
mqttAsyncClient
!=
null
&&
!
mqttAsyncClient
.
isConnected
())
{
try
{
mqttAsyncClient
.
reconnect
();
}
catch
(
MqttSecurityException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
catch
(
MqttException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
}
else
{
mReconnectScheduler
.
shutdown
();
mReconnectScheduler
=
null
;
}
}
},
mInitDelay
,
mSchedulePeriod
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
void
connectionLost
(
Throwable
cause
)
{
System
.
out
.
println
(
cause
);
clientReconnect
();
}
@Override
public
void
messageArrived
(
String
topic
,
MqttMessage
mqttMessage
)
throws
Exception
{
String
Message
=
mqttMessage
.
toString
();
try
{
this
.
ret_data
=
Message
;
mqttAsyncClient
.
unsubscribe
(
RespTopic
);
mqttAsyncClient
.
disconnect
();
}
catch
(
JsonSyntaxException
e
)
{
System
.
out
.
println
(
"mqttAsyncClient: message "
+
Message
+
" received syntax error."
);
}
catch
(
IllegalStateException
e
)
{
System
.
out
.
println
(
"mqttAsyncClient: "
+
e
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
@Override
public
void
deliveryComplete
(
IMqttDeliveryToken
iMqttDeliveryToken
)
{
}
}
src/main/java/com/example/tdl/config/ThreadPoolConfiguration.java
View file @
71ccf766
package
com
.
example
.
tdl
.
config
;
import
org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.AdviceMode
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.core.Ordered
;
import
org.springframework.scheduling.TaskScheduler
;
import
org.springframework.scheduling.annotation.AsyncConfigurer
;
import
org.springframework.scheduling.annotation.EnableAsync
;
import
org.springframework.scheduling.annotation.EnableScheduling
;
import
org.springframework.scheduling.annotation.SchedulingConfigurer
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
;
import
org.springframework.scheduling.config.ScheduledTaskRegistrar
;
import
java.util.concurrent.Executor
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:38 2018/3/13
* @Modified By:
**/
public
class
ThreadPoolConfiguration
{
@Configuration
@EnableScheduling
@EnableAsync
(
mode
=
AdviceMode
.
PROXY
,
proxyTargetClass
=
false
,
order
=
Ordered
.
HIGHEST_PRECEDENCE
)
public
class
ThreadPoolConfiguration
implements
AsyncConfigurer
,
SchedulingConfigurer
{
@Value
(
"${threadpool.corepoolsize}"
)
int
corePoolSize
;
@Value
(
"${threadpool.maxpoolsize}"
)
int
maxPoolSize
;
@Bean
public
ThreadPoolTaskExecutor
taskExecutor
()
{
ThreadPoolTaskExecutor
executor
=
new
ThreadPoolTaskExecutor
();
executor
.
setCorePoolSize
(
corePoolSize
);
executor
.
setMaxPoolSize
(
maxPoolSize
);
executor
.
setAwaitTerminationSeconds
(
60
);
executor
.
setWaitForTasksToCompleteOnShutdown
(
true
);
return
executor
;
}
@Bean
public
ThreadPoolTaskScheduler
taskScheduler
()
{
ThreadPoolTaskScheduler
scheduler
=
new
ThreadPoolTaskScheduler
();
scheduler
.
setPoolSize
(
corePoolSize
);
scheduler
.
setThreadNamePrefix
(
"scheduledTask-"
);
scheduler
.
setAwaitTerminationSeconds
(
60
);
scheduler
.
setWaitForTasksToCompleteOnShutdown
(
true
);
return
scheduler
;
}
@Override
public
Executor
getAsyncExecutor
()
{
Executor
executor
=
this
.
taskScheduler
();
return
executor
;
}
@Override
public
void
configureTasks
(
ScheduledTaskRegistrar
taskRegistrar
)
{
TaskScheduler
scheduler
=
this
.
taskScheduler
();
taskRegistrar
.
setTaskScheduler
(
scheduler
);
}
@Override
public
AsyncUncaughtExceptionHandler
getAsyncUncaughtExceptionHandler
()
{
// TODO Auto-generated method stub
return
null
;
}
}
src/main/java/com/example/tdl/config/mqtt/MqttConfig.java
View file @
71ccf766
package
com
.
example
.
tdl
.
config
.
mqtt
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.context.annotation.Configuration
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:25 2018/3/13
* @Modified By:
**/
@Configuration
@ConfigurationProperties
(
prefix
=
"tdl.mqtt"
)
public
class
MqttConfig
{
private
String
url
;
private
String
port
;
private
String
username
;
private
String
password
;
private
int
qos
;
private
String
cacrt
;
private
String
clientkey
;
private
String
clientcrt
;
private
String
clientpwd
;
public
String
getUrl
()
{
return
url
;
}
public
void
setUrl
(
String
url
)
{
this
.
url
=
url
;
}
public
String
getPort
()
{
return
port
;
}
public
void
setPort
(
String
port
)
{
this
.
port
=
port
;
}
public
String
getUsername
()
{
return
username
;
}
public
void
setUsername
(
String
username
)
{
this
.
username
=
username
;
}
public
String
getPassword
()
{
return
password
;
}
public
void
setPassword
(
String
password
)
{
this
.
password
=
password
;
}
public
int
getQos
()
{
return
qos
;
}
public
void
setQos
(
int
qos
)
{
this
.
qos
=
qos
;
}
public
String
getCacrt
()
{
return
cacrt
;
}
public
void
setCacrt
(
String
cacrt
)
{
this
.
cacrt
=
cacrt
;
}
public
String
getClientkey
()
{
return
clientkey
;
}
public
void
setClientkey
(
String
clientkey
)
{
this
.
clientkey
=
clientkey
;
}
public
String
getClientcrt
()
{
return
clientcrt
;
}
public
void
setClientcrt
(
String
clientcrt
)
{
this
.
clientcrt
=
clientcrt
;
}
public
String
getClientpwd
()
{
return
clientpwd
;
}
public
void
setClientpwd
(
String
clientpwd
)
{
this
.
clientpwd
=
clientpwd
;
}
}
src/main/java/com/example/tdl/config/mqtt/MqttTemlateAsync.java
View file @
71ccf766
package
com
.
example
.
tdl
.
config
.
mqtt
;
import
org.apache.commons.codec.binary.Base64
;
import
org.eclipse.paho.client.mqttv3.MqttAsyncClient
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
;
import
javax.net.ssl.KeyManagerFactory
;
import
javax.net.ssl.SSLContext
;
import
javax.net.ssl.SSLSocketFactory
;
import
javax.net.ssl.TrustManagerFactory
;
import
java.io.*
;
import
java.security.KeyFactory
;
import
java.security.KeyStore
;
import
java.security.PrivateKey
;
import
java.security.SecureRandom
;
import
java.security.cert.CertificateFactory
;
import
java.security.cert.X509Certificate
;
import
java.security.interfaces.RSAPrivateKey
;
import
java.security.spec.EncodedKeySpec
;
import
java.security.spec.PKCS8EncodedKeySpec
;
import
java.util.Properties
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:29 2018/3/13
* @Modified By:
**/
public
class
MqttTemlateAsync
{
public
class
MqttTemlateAsync
extends
MqttAsyncClient
{
public
MqttTemlateAsync
(
String
serverURL
,
String
clientId
)
throws
MqttException
{
super
(
serverURL
,
clientId
,
new
MemoryPersistence
());
// set default MqttClientPersistence as MemoryPersistence
}
private
static
String
getPem
(
String
path
)
throws
Exception
{
FileInputStream
fin
=
new
FileInputStream
(
path
);
BufferedReader
br
=
new
BufferedReader
(
new
InputStreamReader
(
fin
));
String
readLine
=
null
;
StringBuilder
sb
=
new
StringBuilder
();
while
((
readLine
=
br
.
readLine
())!=
null
){
if
(
readLine
.
charAt
(
0
)==
'-'
){
continue
;
}
else
{
sb
.
append
(
readLine
);
sb
.
append
(
'\r'
);
}
}
fin
.
close
();
return
sb
.
toString
();
}
public
static
PrivateKey
getPrivateKey
(
String
path
)
throws
Exception
{
byte
[]
buffer
=
new
Base64
().
decode
(
getPem
(
path
));
EncodedKeySpec
keySpec
=
new
PKCS8EncodedKeySpec
(
buffer
);
KeyFactory
keyFactory
=
KeyFactory
.
getInstance
(
"RSA"
);
return
(
RSAPrivateKey
)
keyFactory
.
generatePrivate
(
keySpec
);
}
private
static
SSLSocketFactory
getSSLSocketFactory
(
String
caPath
,
String
crtPath
,
String
keyPath
,
String
password
)
throws
Exception
{
// CA certificate is used to authenticate server
CertificateFactory
certFactory
=
CertificateFactory
.
getInstance
(
"X.509"
);
FileInputStream
caFileInput
=
new
FileInputStream
(
caPath
);
X509Certificate
caCert
=
(
X509Certificate
)
certFactory
.
generateCertificate
(
caFileInput
);
KeyStore
caKeyStore
=
KeyStore
.
getInstance
(
"JKS"
);
caKeyStore
.
load
(
null
,
null
);
caKeyStore
.
setCertificateEntry
(
"ca-certificate"
,
caCert
);
TrustManagerFactory
tmf
=
TrustManagerFactory
.
getInstance
(
"PKIX"
);
tmf
.
init
(
caKeyStore
);
caFileInput
.
close
();
CertificateFactory
clientFactory
=
CertificateFactory
.
getInstance
(
"X.509"
);
FileInputStream
clientCrtInput
=
new
FileInputStream
(
crtPath
);
X509Certificate
clientCert
=
(
X509Certificate
)
clientFactory
.
generateCertificate
(
clientCrtInput
);
clientCrtInput
.
close
();
// client key and certificates are sent to server so it can authenticate
// us
KeyStore
clientKeyStore
=
KeyStore
.
getInstance
(
KeyStore
.
getDefaultType
());
clientKeyStore
.
load
(
null
,
null
);
clientKeyStore
.
setCertificateEntry
(
"certificate"
,
clientCert
);
password
=
(
password
!=
null
)
?
password
:
""
;
clientKeyStore
.
setKeyEntry
(
"private-key"
,
getPrivateKey
(
keyPath
),
password
.
toCharArray
(),
new
java
.
security
.
cert
.
Certificate
[]{
clientCert
});
KeyManagerFactory
kmf
=
KeyManagerFactory
.
getInstance
(
KeyManagerFactory
.
getDefaultAlgorithm
());
kmf
.
init
(
clientKeyStore
,
password
.
toCharArray
());
// finally, create SSL socket factory
SSLContext
context
=
SSLContext
.
getInstance
(
"TLSv1.2"
);
context
.
init
(
kmf
.
getKeyManagers
(),
tmf
.
getTrustManagers
(),
new
SecureRandom
());
return
context
.
getSocketFactory
();
}
@SuppressWarnings
(
"unused"
)
private
static
Properties
getSSLProperties
()
{
Properties
props
=
null
;
InputStream
caInput
=
null
;
try
{
props
=
new
Properties
();
caInput
=
MqttTemlateAsync
.
class
.
getClassLoader
().
getResourceAsStream
(
"mqttssl.properties"
);
props
.
load
(
caInput
);
}
catch
(
FileNotFoundException
e
)
{
System
.
out
.
println
(
"getSSLProperties: mqttssl.properties file not found!"
);
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"getSSLProperties: other IOException!"
);
}
finally
{
try
{
if
(
null
!=
caInput
)
{
caInput
.
close
();
}
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
"getSSLProperties: mqttssl.properties file stream close exception!"
);
}
}
return
props
;
}
public
static
MqttConnectOptions
defaultOptions
(){
String
str
=
"witium"
;
char
[]
bm
;
MqttConnectOptions
mqttConnectOptions
=
new
MqttConnectOptions
();
mqttConnectOptions
.
setCleanSession
(
true
);
mqttConnectOptions
.
setUserName
(
"test"
);
bm
=
str
.
toCharArray
();
mqttConnectOptions
.
setPassword
(
bm
);
return
mqttConnectOptions
;
}
/* For raw tcp only */
public
static
MqttConnectOptions
setOptions
(
String
username
,
String
pwd
){
MqttConnectOptions
mqttConnectOptions
=
new
MqttConnectOptions
();
mqttConnectOptions
.
setCleanSession
(
true
);
mqttConnectOptions
.
setUserName
(
username
);
mqttConnectOptions
.
setPassword
(
pwd
.
toCharArray
());
mqttConnectOptions
.
setAutomaticReconnect
(
true
);
// enable auto reconnect.
return
mqttConnectOptions
;
}
public
static
MqttConnectOptions
setSSLOptions
(
String
username
,
String
pwd
,
String
cacrt
,
String
clientcrt
,
String
clientkey
,
String
clientpwd
){
MqttConnectOptions
mqttConnectOptions
=
new
MqttConnectOptions
();
SSLSocketFactory
sslFactory
=
null
;
try
{
sslFactory
=
getSSLSocketFactory
(
cacrt
,
clientcrt
,
clientkey
,
clientpwd
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
mqttConnectOptions
.
setSocketFactory
(
sslFactory
);
mqttConnectOptions
.
setCleanSession
(
true
);
mqttConnectOptions
.
setUserName
(
username
);
mqttConnectOptions
.
setPassword
(
pwd
.
toCharArray
());
mqttConnectOptions
.
setAutomaticReconnect
(
true
);
// enable auto reconnect.
return
mqttConnectOptions
;
}
}
src/main/java/com/example/tdl/mqtt/MqttListener.java
View file @
71ccf766
package
com
.
example
.
tdl
.
mqtt
;
import
com.example.tdl.callabletask.GWConfigWorker
;
import
com.example.tdl.config.mqtt.MqttConfig
;
import
com.example.tdl.config.mqtt.MqttTemlateAsync
;
import
com.google.gson.JsonObject
;
import
com.google.gson.JsonParser
;
import
com.google.gson.JsonSyntaxException
;
import
org.eclipse.paho.client.mqttv3.*
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.PostConstruct
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
/**
* @Author: lelouch
* @Description:
* @Date: Created in 9:41 2018/3/13
* @Modified By:
**/
public
class
MqttListener
{
@Component
public
class
MqttListener
implements
MqttCallback
{
private
static
final
int
INIT_DELAY_DEFAULT
=
1000
;
// unit:ms
private
static
final
int
SCH_PERIOD_DEFAULT
=
10
*
INIT_DELAY_DEFAULT
;
// unit:ms
private
static
final
String
CHECKIN_TOPIC
=
"GW/+/+/CheckIn"
;
private
MqttTemlateAsync
mqttAsyncClient
;
@Autowired
private
MqttConfig
mqttconfig
;
@Autowired
ThreadPoolTaskExecutor
configthreadPool
;
Map
<
String
,
Future
<
String
>>
futureMap
=
new
HashMap
<
String
,
Future
<
String
>>();
private
ScheduledExecutorService
mReconnectScheduler
;
private
long
mInitDelay
=
INIT_DELAY_DEFAULT
;
private
long
mSchedulePeriod
=
SCH_PERIOD_DEFAULT
;
private
String
listener_topic
=
CHECKIN_TOPIC
;
@PostConstruct
public
void
initialize
()
throws
MqttException
{
try
{
mqttAsyncClient
=
new
MqttTemlateAsync
(
mqttconfig
.
getUrl
()
+
":"
+
mqttconfig
.
getPort
(),
"TDL_CheckIn"
+
"_"
+
String
.
valueOf
(
System
.
currentTimeMillis
())
+
"_sub"
);
mqttAsyncClient
.
connect
(
MqttTemlateAsync
.
setSSLOptions
(
mqttconfig
.
getUsername
(),
mqttconfig
.
getPassword
(),
mqttconfig
.
getCacrt
(),
mqttconfig
.
getClientcrt
(),
mqttconfig
.
getClientkey
(),
mqttconfig
.
getClientpwd
()),
null
,
new
IMqttActionListener
()
{
@Override
public
void
onSuccess
(
IMqttToken
iMqttToken
)
{
System
.
out
.
println
(
"connection successfull "
);
try
{
mqttAsyncClient
.
subscribe
(
listener_topic
,
mqttconfig
.
getQos
());
}
catch
(
MqttException
e
)
{
System
.
out
.
println
(
"Error: "
+
e
.
getMessage
());
}
}
@Override
public
void
onFailure
(
IMqttToken
iMqttToken
,
Throwable
throwable
)
{
System
.
out
.
println
(
"connection fail with client id"
);
}
});
this
.
mqttAsyncClient
.
setCallback
(
this
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
private
void
clientReconnect
()
{
if
(
mReconnectScheduler
!=
null
)
{
// mReconnectScheduler is running.
return
;
}
mReconnectScheduler
=
Executors
.
newSingleThreadScheduledExecutor
();
mReconnectScheduler
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
// TODO Auto-generated method stub
if
(
mqttAsyncClient
!=
null
&&
!
mqttAsyncClient
.
isConnected
())
{
try
{
mqttAsyncClient
.
reconnect
();
}
catch
(
MqttSecurityException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
catch
(
MqttException
e
)
{
// TODO: handle exception
e
.
printStackTrace
();
}
}
else
{
mReconnectScheduler
.
shutdown
();
mReconnectScheduler
=
null
;
}
}
},
mInitDelay
,
mSchedulePeriod
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
void
connectionLost
(
Throwable
cause
)
{
System
.
out
.
println
(
cause
);
clientReconnect
();
}
@Override
public
void
messageArrived
(
String
topic
,
MqttMessage
mqttMessage
)
throws
Exception
{
String
Message
=
mqttMessage
.
toString
();
Long
timestamp
=
System
.
currentTimeMillis
();
try
{
String
[]
tmparray
=
topic
.
split
(
"/"
);
String
Type
=
tmparray
[
1
];
String
SN
=
tmparray
[
2
];
String
ConfigData
=
"{\"action\":\"config\"}"
;
Future
<
String
>
result
=
null
;
GWConfigWorker
gcconfig
=
new
GWConfigWorker
(
Type
,
SN
,
ConfigData
,
true
);
gcconfig
.
SetMqttConfig
(
mqttconfig
.
getUrl
(),
mqttconfig
.
getPort
(),
mqttconfig
.
getUsername
(),
mqttconfig
.
getPassword
(),
mqttconfig
.
getQos
(),
mqttconfig
.
getCacrt
(),
mqttconfig
.
getClientkey
(),
mqttconfig
.
getClientcrt
(),
mqttconfig
.
getClientpwd
());
result
=
configthreadPool
.
submit
(
gcconfig
);
while
(
result
.
isDone
())
{
String
respcmd
=
result
.
get
();
}
// futureMap.put(timestamp.toString(),result);
}
catch
(
JsonSyntaxException
e
)
{
System
.
out
.
println
(
"mqttAsyncClient: message "
+
Message
+
" received syntax error."
);
}
catch
(
IllegalStateException
e
)
{
System
.
out
.
println
(
"mqttAsyncClient: "
+
e
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
@Override
public
void
deliveryComplete
(
IMqttDeliveryToken
iMqttDeliveryToken
)
{
}
}
src/main/resources/application.properties
View file @
71ccf766
...
...
@@ -3,6 +3,9 @@ spring.datasource.username=root
spring.datasource.password
=
37774020
spring.datasource.driver-class-name
=
com.mysql.jdbc.Driver
threadpool.corepoolsize
=
30
threadpool.maxpoolsize
=
50
server.port
=
8092
management.security.enabled
=
false
...
...
@@ -35,16 +38,16 @@ spring.redis.timeout=0
# Logging
logging.config
=
classpath:logback.xml
tdl.mqtt.url
=
tcp://192.168.1.87
tdl.mqtt.
port
=
1883
tdl.mqtt.
username
=
test
tdl.mqtt.
password
=
witium
tdl.mqtt.
qos
=
0
tdl.mqtt.
timeout
=
20;
tdl.mqtt.
clientPubId
=
witiumcloud
tdl.mqtt.useCredential
=
true
tdl.mqtt.client
SubId
=
witiumcloud
tdl.mqtt.client
SubTopic
=
witium/#
# Mqtt
tdl.mqtt.
url
=
ssl://192.168.1.11
tdl.mqtt.
port
=
8883
tdl.mqtt.
username
=
ugen
tdl.mqtt.
password
=
ugen
tdl.mqtt.
qos
=
1
tdl.mqtt.
timeout
=
20
# SSL Keys
tdl.mqtt.cacrt
=
ca.crt
tdl.mqtt.client
key
=
witcd.pem
tdl.mqtt.client
crt
=
witcd.crt
witcd.crt
0 → 100644
View file @
71ccf766
-----BEGIN CERTIFICATE-----
MIIGSTCCBTGgAwIBAgIJAPP+hRKC88uwMA0GCSqGSIb3DQEBDQUAMGoxFzAVBgNV
BAMMDkFuIE1RVFQgYnJva2VyMRYwFAYDVQQKDA1Pd25UcmFja3Mub3JnMRQwEgYD
VQQLDAtnZW5lcmF0ZS1DQTEhMB8GCSqGSIb3DQEJARYSbm9ib2R5QGV4YW1wbGUu
bmV0MB4XDTE3MTIxODA1NTQyN1oXDTMyMTIxNDA1NTQyN1owajEXMBUGA1UEAwwO
d2l0aXVtLWNlbnRvczcxFjAUBgNVBAoMDU93blRyYWNrcy5vcmcxFDASBgNVBAsM
C2dlbmVyYXRlLUNBMSEwHwYJKoZIhvcNAQkBFhJub2JvZHlAZXhhbXBsZS5uZXQw
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDAc33VO+dY4YB1UGwOGHk4
lNQYYmU8OrSgz/Dc4Nf1NBAFCHWmew2b0/73iIVzaVfI08KGVHlVYf+BsKFXiwAM
JFeKfrkBSWdt6JcdxIxY/njVwaqzdwtFELFhGLHHsFVmMul54lkrlIAQM5uYMAEK
E3ddvKhHYdK+ngfQ+RFt3fxgiUlyUyQ3jAqf8O+w3MpPD7W1jSyyhiFyARK26WKI
CC24Ap0Fic+OW1MBvuU9NRwzqejdYSnFCyWew4a6X5f9nYDTBCY+C0/fP3AQlSpy
LPpYJQg9cAPMtegJshmPlKPKW3+P4GjM/ue0HdEjeiBU4HQz5uZ7vCWV3a3vIAL7
AgMBAAGjggLwMIIC7DAMBgNVHRMBAf8EAjAAMBEGCWCGSAGG+EIBAQQEAwIGQDAL
BgNVHQ8EBAMCBeAwIQYJYIZIAYb4QgENBBQWEkJyb2tlciBDZXJ0aWZpY2F0ZTAd
BgNVHQ4EFgQUy9xYX8wztckkL1Fw8szen8pAVfswgZwGA1UdIwSBlDCBkYAUiXkG
2WNmB8XzkY+NsmS31OhIZ9GhbqRsMGoxFzAVBgNVBAMMDkFuIE1RVFQgYnJva2Vy
MRYwFAYDVQQKDA1Pd25UcmFja3Mub3JnMRQwEgYDVQQLDAtnZW5lcmF0ZS1DQTEh
MB8GCSqGSIb3DQEJARYSbm9ib2R5QGV4YW1wbGUubmV0ggkA9J/SOghC7O8wggFQ
BgNVHREEggFHMIIBQ4cErBIAAYcQ/oAAAAAAAAAAQpv//q1FrocErBEAAYcQ/oAA
AAAAAAAAQg7//kQFD4cEwKgBC4cQ/oAAAAAAAAD6562tA4vHX4cQ/oAAAAAAAADE
65D//hNZoIcQ/oAAAAAAAACgNSb//roFHocQ/oAAAAAAAACkPLD//ldR+IcQ/oAA
AAAAAADAuP///tEsvocQ/oAAAAAAAAD097v//uTa/4cQ/oAAAAAAAAAIIFL//jGX
+IcQ/oAAAAAAAABkZo7//m6XZocQ/oAAAAAAAAAI02f//uilgYcQ/oAAAAAAAADM
OWz//pABv4cQ/oAAAAAAAABUINf//j3Wk4cQ/oAAAAAAAAC0Yf7//i6taYcQ/oAA
AAAAAACsmJv//pfBxocEfwAAAYcQAAAAAAAAAAAAAAAAAAAAAYIJbG9jYWxob3N0
MIGGBgNVHSAEfzB9MHsGAysFCDB0MBwGCCsGAQUFBwIBFhBodHRwOi8vbG9jYWxo
b3N0MFQGCCsGAQUFBwICMEgwEBYJT3duVHJhY2tzMAMCAQEaNFRoaXMgQ0EgaXMg
Zm9yIGEgbG9jYWwgTVFUVCBicm9rZXIgaW5zdGFsbGF0aW9uIG9ubHkwDQYJKoZI
hvcNAQENBQADggEBAJAn2SYRfnyJnCiO6gqhQrvBrdhUun85DG7prfa+8pI4+XXi
OGPjwonzKgpnkPWG1JSQJbgeVEnlFoGhDgUrw8NT1aDBrs0KAOiuQqpuJ8aYmIpn
j64tvM5vS8NB4ZfJLk5BCGIQdbtfqBDhAFbNZxVRA7XLDzCfEp7idSL3Wv7fe4q3
lZotb9t0g9tRM2hy+1MBcWwUe4j/H9+JDkn4ijVE8amJraAUPXC+6Gwc+HSoRc8X
BLIt+EwSbZ02UD2EnWaxXh0VdhNe1iNmIgDm62sulcNv0g6RERb2hZarvhbOvw8f
45HTKazDxHgnaw365fk2FxlwpoDUmN+k82/0xr8=
-----END CERTIFICATE-----
witcd.pem
0 → 100644
View file @
71ccf766
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDAc33VO+dY4YB1
UGwOGHk4lNQYYmU8OrSgz/Dc4Nf1NBAFCHWmew2b0/73iIVzaVfI08KGVHlVYf+B
sKFXiwAMJFeKfrkBSWdt6JcdxIxY/njVwaqzdwtFELFhGLHHsFVmMul54lkrlIAQ
M5uYMAEKE3ddvKhHYdK+ngfQ+RFt3fxgiUlyUyQ3jAqf8O+w3MpPD7W1jSyyhiFy
ARK26WKICC24Ap0Fic+OW1MBvuU9NRwzqejdYSnFCyWew4a6X5f9nYDTBCY+C0/f
P3AQlSpyLPpYJQg9cAPMtegJshmPlKPKW3+P4GjM/ue0HdEjeiBU4HQz5uZ7vCWV
3a3vIAL7AgMBAAECggEBAI/+2gwOQk+Sipc8kV0KmsjnMrr8BFtIYZZhgAsEgRGK
3uGlLsJeGBScTo3PrGpqS7CMIWR5GD/2j0LucgoaivKKzqVlo+9+iNEH0uoHAiwk
Wy01NxAI5nnKv62y2HJdUciiF2eHIMkm15tHwgfhAyYS4qFUY2TloNXXUGnfslbN
EbkoB1LBnsue6YlI34q8jAv2/zZxz68yfA8Rdy63gQa9HVrCz85nf+MoGFnFeeGo
sYV7bfXYuTszYJ/6B748Eub5bTHaUpZwrbLpdTGXyUr6cDh2EagtGhnHiFEpBF+I
rzIJTeGUjByingsWm+aU+e1aheDLFzB/a2u4R8ljJAECgYEA3+mGR4uJ/ekUhUnS
Uer2YGBBZ0v/wepj6Zy5H0HGh2/Zl6RVcE8KKiU9d1iCzTz76RDtqlzNYScSAuGg
1SlSD0mjwRBZ7NHGPa6wMz7fBh8zLrzKSDAjaHtl1xYj8h2NXfSfGmjQFrvp9xb5
tMVmSArVgk5v6NP6V5pVIVfTcHsCgYEA3AfIIwcWErZOpcQd3DbP2JZPct2nh7hF
vknG99zjSl3AbhEAq7xJZup8Co2+Ydioh313ICc04IHXU+EE1TuUzj9Cy6rRy2jJ
RzZ5RL0YYSlhBbVCA53K5JRma6lvmEV1mi+8xX1r7HzNDEY+vc4GT27rSnaNWzDR
IgTJNE+9b4ECgYAnvrrwHiu7X1EeK7FiixN6nfteC5Z1ZFah1AjkLHfB+K+mBCyK
R+1k98FGWva02ZtZjoCxWdG26gl/a29yBuijAA5KKc3chIvszQJHxmQuIQGjzS0W
zTyrwiOcnzh4NJiwUcigFnTBZE+uZWyjktqJ9Spa+/NPX7kzLZjo1z8X2wKBgDJf
pbY/PRJzonXY/G0aL2OKpg3KKTytN2plQaQlcfPomtGYGus2ZCenr1ZLNTb5fzvl
wYwe7cAoeb++WHjo9xxw5z3xka5cAxjGo+TU1ewIqSnBYaeqTHUgdIJZfC8EV/RA
0Zr1J+wwdlFFE8AoltFAt6WIlrw15OtA5T+QsfIBAoGBAKhkaUbYDCU53KhtetAY
aSdHjuM77tHMDs9eDmwZRUzENBzf30AZTJUrQ8CbYTGCveSm++sRD4Yz8RzOJyGc
CkV/NjysEj8AYwHeDQdYtbA0eZRiB/FjNvcN8dGeGxX/phPP8Z3Zkq2TtWO5inyR
WojHNWuI139lTdbVE/r5BkGT
-----END PRIVATE KEY-----
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment