Commit 3b1e70dd authored by Ian Craggs's avatar Ian Craggs

Consolidate mqttsas and mqttsas2 into one which handles both Python2 and Python3

parent 09fe0744
"""
*******************************************************************
Copyright (c) 2013, 2016 IBM Corp.
Copyright (c) 2013, 2018 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
......@@ -15,11 +15,15 @@
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
from __future__ import print_function
# Trace MQTT traffic
import MQTTV311 as MQTTV3
import socket, sys, select, traceback, datetime, os, socketserver
import socket, sys, select, traceback, datetime, os
try:
import socketserver
import MQTTV311 as MQTTV3 # Trace MQTT traffic - Python 3 version
except:
import SocketServer as socketserver
import MQTTV3112 as MQTTV3 # Trace MQTT traffic - Python 2 version
logging = True
myWindow = None
......@@ -29,6 +33,7 @@ def timestamp():
now = datetime.datetime.now()
return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
suspended = []
class MyHandler(socketserver.StreamRequestHandler):
......@@ -43,10 +48,13 @@ class MyHandler(socketserver.StreamRequestHandler):
clients = self.request
brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
brokers.connect((brokerhost, brokerport))
while inbuf != None:
terminated = False
while inbuf != None and not terminated:
(i, o, e) = select.select([clients, brokers], [], [])
for s in i:
if s == clients:
if s in suspended:
print("suspended")
if s == clients and s not in suspended:
inbuf = MQTTV3.getPacket(clients) # get one packet
if inbuf == None:
break
......@@ -58,13 +66,19 @@ class MyHandler(socketserver.StreamRequestHandler):
print("Terminating client", self.ids[id(clients)])
brokers.close()
clients.close()
terminated = True
break
elif packet.fh.MessageType == MQTTV3.PUBLISH and \
packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE_SERVER":
print("Suspending client ", self.ids[id(clients)])
suspended.append(clients)
elif packet.fh.MessageType == MQTTV3.CONNECT:
self.ids[id(clients)] = packet.ClientIdentifier
self.versions[id(clients)] = 3
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
print([hex(b) for b in inbuf])
print(inbuf)
#print([hex(b) for b in inbuf])
#print(inbuf)
except:
traceback.print_exc()
brokers.send(inbuf) # pass it on
......@@ -91,24 +105,27 @@ class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
def run():
global brokerhost, brokerport
myhost = 'localhost'
myhost = '127.0.0.1'
if len(sys.argv) > 1:
brokerhost = sys.argv[1]
else:
brokerhost = 'localhost'
brokerhost = '127.0.0.1'
if len(sys.argv) > 2:
brokerport = int(sys.argv[2])
else:
brokerport = 1883
if len(sys.argv) > 3:
myport = int(sys.argv[3])
else:
if brokerhost == myhost:
myport = brokerport + 1
else:
myport = 1883
print("Listening on port", str(myport)+", broker on port", brokerport)
s = ThreadingTCPServer(("", myport), MyHandler)
s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
s.serve_forever()
if __name__ == "__main__":
......
"""
*******************************************************************
Copyright (c) 2013, 2017 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
from __future__ import print_function
# Trace MQTT traffic
import MQTTV3112 as MQTTV3
import socket, sys, select, traceback, datetime, os
import SocketServer as socketserver
logging = True
myWindow = None
def timestamp():
now = datetime.datetime.now()
return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
class MyHandler(socketserver.StreamRequestHandler):
def handle(self):
if not hasattr(self, "ids"):
self.ids = {}
if not hasattr(self, "versions"):
self.versions = {}
inbuf = True
i = o = e = None
try:
clients = self.request
brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
brokers.connect((brokerhost, brokerport))
terminated = False
while inbuf != None and not terminated:
(i, o, e) = select.select([clients, brokers], [], [])
for s in i:
if s == clients:
inbuf = MQTTV3.getPacket(clients) # get one packet
if inbuf == None:
break
try:
packet = MQTTV3.unpackPacket(inbuf)
if packet.fh.MessageType == MQTTV3.PUBLISH and \
packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE":
print("Terminating client", self.ids[id(clients)])
brokers.close()
clients.close()
terminated = True
break
elif packet.fh.MessageType == MQTTV3.CONNECT:
self.ids[id(clients)] = packet.ClientIdentifier
self.versions[id(clients)] = 3
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
#print([hex(b) for b in inbuf])
#print(inbuf)
except:
traceback.print_exc()
brokers.send(inbuf) # pass it on
elif s == brokers:
inbuf = MQTTV3.getPacket(brokers) # get one packet
if inbuf == None:
break
try:
print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
except:
traceback.print_exc()
clients.send(inbuf)
print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
except:
print(repr((i, o, e)), repr(inbuf))
traceback.print_exc()
if id(clients) in self.ids.keys():
del self.ids[id(clients)]
elif id(clients) in self.versions.keys():
del self.versions[id(clients)]
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def run():
global brokerhost, brokerport
myhost = '127.0.0.1'
if len(sys.argv) > 1:
brokerhost = sys.argv[1]
else:
brokerhost = '127.0.0.1'
if len(sys.argv) > 2:
brokerport = int(sys.argv[2])
else:
brokerport = 1883
if len(sys.argv) > 3:
myport = int(sys.argv[3])
else:
if brokerhost == myhost:
myport = brokerport + 1
else:
myport = 1883
print("Listening on port", str(myport)+", broker on port", brokerport)
s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
s.serve_forever()
if __name__ == "__main__":
run()
......@@ -8,7 +8,7 @@ cd build.paho
echo "travis build dir $TRAVIS_BUILD_DIR pwd $PWD"
cmake -DCMAKE_BUILD_TYPE=Debug -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE ..
make
python ../test/mqttsas2.py &
python ../test/mqttsas.py &
ctest -VV --timeout 600
cpack --verbose
kill %1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment