Commit 2a8dad13 authored by Ian Craggs's avatar Ian Craggs

Add offline buffering test, and reconnect function

parent 2eeb5537
......@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4 test_mqtt4async
TEST_FILES_A = test4 test9 test_mqtt4async
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5
......
......@@ -82,6 +82,10 @@
</target>
<target name="test" >
<exec executable="python3" dir="test" spawn="true">
<arg value="mqttsas.py" />
<arg value="${test.hostname}" />
</exec>
<if>
<os family="windows"/>
<then>
......@@ -89,7 +93,7 @@
<foreach target="runAtest" param="aTest" list="test1,test4"/>
</then>
<else>
<foreach target="runAtest" param="aTest" list="test1,test2,test4"/>
<foreach target="runAtest" param="aTest" list="test9,test1,test2,test4"/>
</else>
</if>
<foreach target="runSSLtest" param="aTest" list="test3,test5"/>
......
......@@ -858,7 +858,9 @@ int MQTTAsync_reconnect(MQTTAsync handle)
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m->automaticReconnect && m->shouldBeConnected)
if (m->automaticReconnect)
{
if (m->shouldBeConnected)
{
m->reconnectNow = 1;
if (m->retrying == 0)
......@@ -868,6 +870,20 @@ int MQTTAsync_reconnect(MQTTAsync handle)
}
rc = MQTTASYNC_SUCCESS;
}
}
else
{
/* to reconnect, put the connect command to the head of the command queue */
MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
/* make sure that the version attempts are restarted */
if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
conn->command.details.conn.MQTTVersion = 0;
MQTTAsync_addCommand(conn, sizeof(m->connect));
rc = MQTTASYNC_SUCCESS;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
......@@ -1367,11 +1383,14 @@ void MQTTAsync_checkTimeouts()
{
if (m->reconnectNow || MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000))
{
/* put the connect command to the head of the command queue, using the next serverURI */
/* to reconnect put the connect command to the head of the command queue */
MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
/* make sure that the version attempts are restarted */
if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
conn->command.details.conn.MQTTVersion = 0;
Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
MQTTAsync_addCommand(conn, sizeof(m->connect));
m->reconnectNow = 0;
......
......@@ -465,7 +465,7 @@ DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_
DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);
DLLExport int MQTTAsync_reconnect(MQTTAsync handle);
/**
......@@ -755,8 +755,8 @@ typedef struct
} MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 4, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0, \
0, 1, 60}
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 4, 60, 1, 10, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, 0, 0, 1, 60}
/**
* This function attempts to connect a previously-created client (see
......
"""
*******************************************************************
Copyright (c) 2013, 2014 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
"""
Assertions are used to validate incoming data, but are omitted from outgoing packets. This is
so that the tests that use this package can send invalid data for error testing.
"""
import logging
logger = logging.getLogger("mqttsas")
# Low-level protocol interface
class MQTTException(Exception):
pass
# Message types
CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
PINGREQ, PINGRESP, DISCONNECT = range(1, 15)
packetNames = [ "reserved", \
"Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
"Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
"Pingreq", "Pingresp", "Disconnect"]
classNames = [ "reserved", \
"Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \
"Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \
"Pingreqs", "Pingresps", "Disconnects"]
def MessageType(byte):
if byte != None:
rc = byte[0] >> 4
else:
rc = None
return rc
def getPacket(aSocket):
"receive the next packet"
buf = aSocket.recv(1) # get the first byte fixed header
if buf == b"":
return None
if str(aSocket).find("[closed]") != -1:
closed = True
else:
closed = False
if closed:
return None
# now get the remaining length
multiplier = 1
remlength = 0
while 1:
next = aSocket.recv(1)
while len(next) == 0:
next = aSocket.recv(1)
buf += next
digit = buf[-1]
remlength += (digit & 127) * multiplier
if digit & 128 == 0:
break
multiplier *= 128
# receive the remaining length if there is any
rest = bytes([])
if remlength > 0:
while len(rest) < remlength:
rest += aSocket.recv(remlength-len(rest))
assert len(rest) == remlength
return buf + rest
class FixedHeaders:
def __init__(self, aMessageType):
self.MessageType = aMessageType
self.DUP = False
self.QoS = 0
self.RETAIN = False
self.remainingLength = 0
def __eq__(self, fh):
return self.MessageType == fh.MessageType and \
self.DUP == fh.DUP and \
self.QoS == fh.QoS and \
self.RETAIN == fh.RETAIN # and \
# self.remainingLength == fh.remainingLength
def __repr__(self):
"return printable representation of our data"
return classNames[self.MessageType]+'(DUP='+repr(self.DUP)+ \
", QoS="+repr(self.QoS)+", Retain="+repr(self.RETAIN)
def pack(self, length):
"pack data into string buffer ready for transmission down socket"
buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\
(self.QoS << 1) | self.RETAIN])
self.remainingLength = length
buffer += self.encode(length)
return buffer
def encode(self, x):
assert 0 <= x <= 268435455
buffer = b''
while 1:
digit = x % 128
x //= 128
if x > 0:
digit |= 0x80
buffer += bytes([digit])
if x == 0:
break
return buffer
def unpack(self, buffer):
"unpack data from string buffer into separate fields"
b0 = buffer[0]
self.MessageType = b0 >> 4
self.DUP = ((b0 >> 3) & 0x01) == 1
self.QoS = (b0 >> 1) & 0x03
self.RETAIN = (b0 & 0x01) == 1
(self.remainingLength, bytes) = self.decode(buffer[1:])
return bytes + 1 # length of fixed header
def decode(self, buffer):
multiplier = 1
value = 0
bytes = 0
while 1:
bytes += 1
digit = buffer[0]
buffer = buffer[1:]
value += (digit & 127) * multiplier
if digit & 128 == 0:
break
multiplier *= 128
return (value, bytes)
def writeInt16(length):
return bytes([length // 256, length % 256])
def readInt16(buf):
return buf[0]*256 + buf[1]
def writeUTF(data):
# data could be a string, or bytes. If string, encode into bytes with utf-8
return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8"))
def readUTF(buffer, maxlen):
if maxlen >= 2:
length = readInt16(buffer)
else:
raise MQTTException("Not enough data to read string length")
maxlen -= 2
if length > maxlen:
raise MQTTException("Length delimited string too long")
buf = buffer[2:2+length].decode("utf-8")
logger.info("[MQTT-4.7.3-2] topic names and filters not include null")
zz = buf.find("\x00") # look for null in the UTF string
if zz != -1:
raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf)
for c in range (0xD800, 0xDFFF):
zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string
if zz != -1:
raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf)
if buf.find("\uFEFF") != -1:
logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string")
return buf
def writeBytes(buffer):
return writeInt16(len(buffer)) + buffer
def readBytes(buffer):
length = readInt16(buffer)
return buffer[2:2+length]
class Packets:
def pack(self):
buffer = self.fh.pack(0)
return buffer
def __repr__(self):
return repr(self.fh)
def __eq__(self, packet):
return self.fh == packet.fh if packet else False
class Connects(Packets):
def __init__(self, buffer = None):
self.fh = FixedHeaders(CONNECT)
# variable header
self.ProtocolName = "MQTT"
self.ProtocolVersion = 4
self.CleanSession = True
self.WillFlag = False
self.WillQoS = 0
self.WillRETAIN = 0
self.KeepAliveTimer = 30
self.usernameFlag = False
self.passwordFlag = False
# Payload
self.ClientIdentifier = "" # UTF-8
self.WillTopic = None # UTF-8
self.WillMessage = None # binary
self.username = None # UTF-8
self.password = None # binary
if buffer != None:
self.unpack(buffer)
def pack(self):
connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \
(self.WillQoS << 3) | (self.WillRETAIN << 5) | \
(self.usernameFlag << 6) | (self.passwordFlag << 7)])
buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \
connectFlags + writeInt16(self.KeepAliveTimer)
buffer += writeUTF(self.ClientIdentifier)
if self.WillFlag:
buffer += writeUTF(self.WillTopic)
buffer += writeBytes(self.WillMessage)
if self.usernameFlag:
buffer += writeUTF(self.username)
if self.passwordFlag:
buffer += writeBytes(self.password)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == CONNECT
try:
fhlen = self.fh.unpack(buffer)
packlen = fhlen + self.fh.remainingLength
assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen)
curlen = fhlen # points to after header + remaining length
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.ProtocolName) + 2
assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName
self.ProtocolVersion = buffer[curlen]
curlen += 1
connectFlags = buffer[curlen]
assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0"
self.CleanSession = ((connectFlags >> 1) & 0x01) == 1
self.WillFlag = ((connectFlags >> 2) & 0x01) == 1
self.WillQoS = (connectFlags >> 3) & 0x03
self.WillRETAIN = (connectFlags >> 5) & 0x01
self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1
self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1
curlen +=1
if self.WillFlag:
assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3"
else:
assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false"
assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false"
self.KeepAliveTimer = readInt16(buffer[curlen:])
curlen += 2
logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field")
logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long")
self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.ClientIdentifier) + 2
if self.WillFlag:
self.WillTopic = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.WillTopic) + 2
self.WillMessage = readBytes(buffer[curlen:])
curlen += len(self.WillMessage) + 2
logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present")
else:
self.WillTopic = self.WillMessage = None
if self.usernameFlag:
assert len(buffer) > curlen+2, "Buffer too short to read username length"
self.username = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.username) + 2
logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1")
else:
logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0")
assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0"
if self.passwordFlag:
assert len(buffer) > curlen+2, "Buffer too short to read password length"
self.password = readBytes(buffer[curlen:])
curlen += len(self.password) + 2
logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0")
else:
logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0")
if self.WillFlag and self.usernameFlag and self.passwordFlag:
logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present")
assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d"
except:
logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform")
raise
def __repr__(self):
buf = repr(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\
repr(self.ProtocolVersion)+", CleanSession="+repr(self.CleanSession) +\
", WillFlag="+repr(self.WillFlag)+", KeepAliveTimer=" +\
repr(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\
", usernameFlag="+repr(self.usernameFlag)+", passwordFlag="+repr(self.passwordFlag)
if self.WillFlag:
buf += ", WillQoS=" + repr(self.WillQoS) +\
", WillRETAIN=" + repr(self.WillRETAIN) +\
", WillTopic='"+ self.WillTopic +\
"', WillMessage='"+str(self.WillMessage)+"'"
if self.username:
buf += ", username="+self.username
if self.password:
buf += ", password="+str(self.password)
return buf+")"
def __eq__(self, packet):
rc = Packets.__eq__(self, packet) and \
self.ProtocolName == packet.ProtocolName and \
self.ProtocolVersion == packet.ProtocolVersion and \
self.CleanSession == packet.CleanSession and \
self.WillFlag == packet.WillFlag and \
self.KeepAliveTimer == packet.KeepAliveTimer and \
self.ClientIdentifier == packet.ClientIdentifier and \
self.WillFlag == packet.WillFlag
if rc and self.WillFlag:
rc = self.WillQoS == packet.WillQoS and \
self.WillRETAIN == packet.WillRETAIN and \
self.WillTopic == packet.WillTopic and \
self.WillMessage == packet.WillMessage
return rc
class Connacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0):
self.fh = FixedHeaders(CONNACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
self.flags = 0
self.returnCode = ReturnCode
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = bytes([self.flags, self.returnCode])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 4
assert MessageType(buffer) == CONNACK
self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength
assert buffer[2] in [0, 1], "Connect Acknowledge Flags"
self.returnCode = buffer[3]
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
def __repr__(self):
return repr(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+repr(self.returnCode)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.returnCode == packet.returnCode
class Disconnects(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(DISCONNECT)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == DISCONNECT
self.fh.unpack(buffer)
assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength
logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0")
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
def __repr__(self):
return repr(self.fh)+")"
class Publishes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""):
self.fh = FixedHeaders(PUBLISH)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.topicName = TopicName
self.messageIdentifier = MsgId
# payload
self.data = Payload
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeUTF(self.topicName)
if self.fh.QoS != 0:
buffer += writeInt16(self.messageIdentifier)
buffer += self.data
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBLISH
fhlen = self.fh.unpack(buffer)
assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2"
packlen = fhlen + self.fh.remainingLength
assert len(buffer) >= packlen
curlen = fhlen
try:
self.topicName = readUTF(buffer[fhlen:], packlen - curlen)
except UnicodeDecodeError:
logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8")
raise
curlen += len(self.topicName) + 2
if self.fh.QoS != 0:
self.messageIdentifier = readInt16(buffer[curlen:])
logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2")
curlen += 2
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
else:
logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0")
self.messageIdentifier = 0
self.data = buffer[curlen:fhlen + self.fh.remainingLength]
if self.fh.QoS == 0:
assert self.fh.DUP == False, "[MQTT-2.1.2-4]"
return fhlen + self.fh.remainingLength
def __repr__(self):
rc = repr(self.fh)
if self.fh.QoS != 0:
rc += ", MsgId="+repr(self.messageIdentifier)
rc += ", TopicName="+repr(self.topicName)+", Payload="+repr(self.data)+")"
return rc
def __eq__(self, packet):
rc = Packets.__eq__(self, packet) and \
self.topicName == packet.topicName and \
self.data == packet.data
if rc and self.fh.QoS != 0:
rc = self.messageIdentifier == packet.messageIdentifier
return rc
class Pubacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBACK
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId "+repr(self.messageIdentifier)
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubrecs(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBREC)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBREC
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubrels(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBREL)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBREL
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL"
logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok")
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubcomps(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBCOMP)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBCOMP
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Subscribes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(SUBSCRIBE)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of topic, qos pairs
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += writeUTF(d[0]) + bytes([d[1]])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == SUBSCRIBE
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe")
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
topic = readUTF(buffer[-leftlen:], leftlen)
leftlen -= len(topic) + 2
qos = buffer[-leftlen]
assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero"
leftlen -= 1
self.data.append((topic, qos))
assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe"
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Subacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(SUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of qos
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += bytes([d])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == SUBACK
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
qos = buffer[-leftlen]
assert qos in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80"
leftlen -= 1
self.data.append(qos)
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Unsubscribes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(UNSUBSCRIBE)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of topics
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += writeUTF(d)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == UNSUBSCRIBE
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe")
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
topic = readUTF(buffer[-leftlen:], leftlen)
leftlen -= len(topic) + 2
self.data.append(topic)
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0")
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Unsubacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(UNSUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == UNSUBACK
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pingreqs(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(PINGREQ)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PINGREQ
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen
def __repr__(self):
return repr(self.fh)+")"
class Pingresps(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(PINGRESP)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PINGRESP
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen
def __repr__(self):
return repr(self.fh)+")"
classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs,
Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes,
Unsubacks, Pingreqs, Pingresps, Disconnects]
def unpackPacket(buffer):
if MessageType(buffer) != None:
packet = classes[MessageType(buffer)]()
packet.unpack(buffer)
else:
packet = None
return packet
if __name__ == "__main__":
fh = FixedHeaders(CONNECT)
tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152,
20555666, 268435454, 268435455]
for x in tests:
try:
assert x == fh.decode(fh.encode(x))[0]
except AssertionError:
print("Test failed for x =", x, fh.decode(fh.encode(x)))
try:
fh.decode(fh.encode(268435456))
print("Error")
except AssertionError:
pass
for packet in classes[1:]:
before = str(packet())
after = str(unpackPacket(packet().pack()))
try:
assert before == after
except:
print("before:", before, "\nafter:", after)
print("End")
"""
*******************************************************************
Copyright (c) 2013, 2016 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
# Trace MQTT traffic
import MQTTV311 as MQTTV3
import socket, sys, select, traceback, datetime, os, socketserver
logging = True
myWindow = None
def timestamp():
now = datetime.datetime.now()
return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
class MyHandler(socketserver.StreamRequestHandler):
def handle(self):
if not hasattr(self, "ids"):
self.ids = {}
if not hasattr(self, "versions"):
self.versions = {}
inbuf = True
i = o = e = None
try:
clients = self.request
brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
brokers.connect((brokerhost, brokerport))
while inbuf != None:
(i, o, e) = select.select([clients, brokers], [], [])
for s in i:
if s == clients:
inbuf = MQTTV3.getPacket(clients) # get one packet
if inbuf == None:
break
try:
packet = MQTTV3.unpackPacket(inbuf)
if packet.fh.MessageType == MQTTV3.PUBLISH and \
packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE":
print("Terminating client", self.ids[id(clients)])
brokers.close()
clients.close()
break
elif packet.fh.MessageType == MQTTV3.CONNECT:
self.ids[id(clients)] = packet.ClientIdentifier
self.versions[id(clients)] = 3
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
print([hex(b) for b in inbuf])
print(inbuf)
except:
traceback.print_exc()
brokers.send(inbuf) # pass it on
elif s == brokers:
inbuf = MQTTV3.getPacket(brokers) # get one packet
if inbuf == None:
break
try:
print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
except:
traceback.print_exc()
clients.send(inbuf)
print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
except:
print(repr((i, o, e)), repr(inbuf))
traceback.print_exc()
if id(clients) in self.ids.keys():
del self.ids[id(clients)]
elif id(clients) in self.versions.keys():
del self.versions[id(clients)]
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def run():
global brokerhost, brokerport
myhost = 'localhost'
if len(sys.argv) > 1:
brokerhost = sys.argv[1]
else:
brokerhost = 'localhost'
if len(sys.argv) > 2:
brokerport = int(sys.argv[2])
else:
brokerport = 1883
if brokerhost == myhost:
myport = brokerport + 1
else:
myport = 1883
print("Listening on port", str(myport)+", broker on port", brokerport)
s = ThreadingTCPServer(("", myport), MyHandler)
s.serve_forever()
if __name__ == "__main__":
run()
/*******************************************************************************
* Copyright (c) 2009, 2015 IBM Corp.
* Copyright (c) 2009, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -12,6 +12,7 @@
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix thread id display
*******************************************************************************/
/**
......@@ -360,7 +361,7 @@ thread_return_type WINAPI test1_sendAndReceive(void* n)
rc = MQTTClient_subscribe(c, test_topic, subsqos);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
MyLog(LOGA_INFO, "Thread %d, %d messages at QoS %d", Thread_getid(), iterations, qos);
MyLog(LOGA_INFO, "Thread %u, %d messages at QoS %d", Thread_getid(), iterations, qos);
test1_pubmsg.payload = test1_pubmsg_check.payload;
test1_pubmsg.payloadlen = test1_pubmsg_check.payloadlen;
test1_pubmsg.retained = 0;
......
......@@ -21,11 +21,6 @@
*
*/
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h"
#include <string.h>
......@@ -33,10 +28,10 @@
#include "Thread.h"
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
......@@ -47,134 +42,59 @@
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define snprintf _snprintf
#endif
char unique[50]; // unique suffix/prefix to add to clientid/topic etc
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void usage()
{
printf("Options:\n");
printf("\t--test_no <test_no> - Run test number <test_no>\n");
printf("\t--server <hostname> - Connect to <hostname> for tests\n");
printf("\t--client_key <key_file> - Use <key_file> as the client certificate for SSL authentication\n");
printf("\t--client_key_pass <password> - Use <password> to access the private key in the client certificate\n");
printf("\t--server_key <key_file> - Use <key_file> as the trusted certificate for server\n");
printf("\t--verbose - Enable verbose output \n");
printf("\t--help - This help output\n");
printf("help!!\n");
exit(-1);
}
struct Options
{
char connection[100]; /**< connection to system under test. */
char mutual_auth_connection[100]; /**< connection to system under test. */
char nocert_mutual_auth_connection[100];
char server_auth_connection[100];
char anon_connection[100];
char* client_key_file;
char* client_key_pass;
char* server_key_file;
char* client_private_key_file;
char* connection; /**< connection to system under test. */
char* proxy_connection; /**< connection to proxy */
int verbose;
int test_no;
int size;
} options =
{
"ssl://m2m.eclipse.org:18883",
"ssl://m2m.eclipse.org:18884",
"ssl://m2m.eclipse.org:18887",
"ssl://m2m.eclipse.org:18885",
"ssl://m2m.eclipse.org:18886",
"../../../test/ssl/client.pem",
NULL,
"../../../test/ssl/test-root-ca.crt",
NULL,
"iot.eclipse.org:1883",
"localhost:1883",
0,
0,
5000000
};
typedef struct
{
MQTTAsync client;
char clientid[24];
char topic[100];
int maxmsgs;
int rcvdmsgs[3];
int sentmsgs[3];
int testFinished;
int subscribed;
} AsyncTestClient;
#define AsyncTestClient_initializer {NULL, "\0", "\0", 0, {0, 0, 0}, {0, 0, 0}, 0, 0}
void getopts(int argc, char** argv)
{
int count = 1;
while (count < argc)
{
if (strcmp(argv[count], "--help") == 0)
{
usage();
}
else if (strcmp(argv[count], "--test_no") == 0)
if (strcmp(argv[count], "--test_no") == 0)
{
if (++count < argc)
options.test_no = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--client_key") == 0)
{
if (++count < argc)
options.client_key_file = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--client_key_pass") == 0)
else if (strcmp(argv[count], "--connection") == 0)
{
if (++count < argc)
options.client_key_pass = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--server_key") == 0)
{
if (++count < argc)
options.server_key_file = argv[count];
options.connection = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0)
{
options.verbose = 1;
printf("\nSetting verbose on\n");
}
else if (strcmp(argv[count], "--hostname") == 0)
{
if (++count < argc)
{
sprintf(options.connection, "ssl://%s:18883", argv[count]);
printf("Setting connection to %s\n", options.connection);
sprintf(options.mutual_auth_connection, "ssl://%s:18884", argv[count]);
printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection);
sprintf(options.nocert_mutual_auth_connection, "ssl://%s:18887", argv[count]);
printf("Setting nocert_mutual_auth_connection to %s\n",
options.nocert_mutual_auth_connection);
sprintf(options.server_auth_connection, "ssl://%s:18885", argv[count]);
printf("Setting server_auth_connection to %s\n", options.server_auth_connection);
sprintf(options.anon_connection, "ssl://%s:18886", argv[count]);
printf("Setting anon_connection to %s\n", options.anon_connection);
}
else
usage();
}
count++;
}
}
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
......@@ -328,8 +248,17 @@ void myassert(char* filename, int lineno, char* description, int value,
Tests: offline buffering - sending messages while disconnected
1. send some messages while disconnected, check that they are sent
2. check max-buffered
3. check auto-reconnect parms alter behaviour as expected
2. repeat test 1 using serverURIs
3. repeat test 1 using auto reconnect
4. repeast test 2 using auto reconnect
5. check max-buffered
6. check auto-reconnect parms alter behaviour as expected
Tests: automatic reconnect
- check that connected() is called
- check that reconnect() causes reconnect attempt
- check that reconnect() fails if no connect has been previously attempted
*********************************************************************/
......@@ -347,6 +276,28 @@ void myassert(char* filename, int lineno, char* description, int value,
*********************************************************************/
int test1_will_message_received = 0;
int test1_messages_received = 0;
int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
int rc;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test1_will_message_received = 1;
else
test1_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int test1Finished = 0;
int test1OnFailureCalled = 0;
......@@ -359,7 +310,7 @@ void test1cOnFailure(void* context, MQTTAsync_failureData* response)
test1Finished = 1;
}
void test1cOnFailure(void* context, MQTTAsync_failureData* response)
void test1dOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
......@@ -369,12 +320,59 @@ void test1cOnFailure(void* context, MQTTAsync_failureData* response)
void test1cOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
MQTTAsync c = (MQTTAsync)context;
int rc;
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
int test1dReady = 0;
char willTopic[100];
char test_topic[50];
void test1donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test1dReady = 1;
}
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
void test1dOnConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test1donSubscribe;
opts.context = c;
rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test1Finished = 1;
}
int test1c_connected = 0;
void test1cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test1c_connected = 1;
}
......@@ -385,9 +383,18 @@ int test1(struct Options options)
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
char* test_topic = "C client offline buffering test";
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
sprintf(willTopic, "paho-test9-1-%s", unique);
sprintf(clientidc, "paho-test9-1-c-%s", unique);
sprintf(clientidd, "paho-test9-1-d-%s", unique);
sprintf(test_topic, "paho-test9-1-test topic %s", unique);
test1Finished = 0;
failures = 0;
......@@ -395,8 +402,9 @@ int test1(struct Options options)
fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "paho-test9-test1-c", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
createOptions.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL, &createOptions);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
......@@ -404,8 +412,7 @@ int test1(struct Options options)
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, "paho-test9-test1-d", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
......@@ -415,8 +422,11 @@ int test1(struct Options options)
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
//opts.username = "testuser";
//opts.password = "testpassword";
rc = MQTTAsync_setCallbacks(d, d, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
......@@ -437,15 +447,19 @@ int test1(struct Options options)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* let client c go: connect, publish some messages, and send disconnect command to proxy */
rc = MQTTAsync_setConnected(c, c, test1cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will->topicName = willTopic;
opts.onSuccess = test1cOnConnect;
opts.onFailure = test1cOnFailure;
opts.context = c;
opts.cleansession = 0;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
......@@ -456,11 +470,56 @@ int test1(struct Options options)
goto exit;
}
/* wait for will message */
while (!test1_will_message_received && ++count < 10000)
MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
test1c_connected = 0;
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 3; ++i)
{
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
sprintf(buf, "QoS %d message", i);
pubmsg.payload = buf;
pubmsg.payloadlen = strlen(pubmsg.payload) + 1;
pubmsg.qos = i;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
while (tokens[i] != -1)
++i;
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
rc = MQTTAsync_reconnect(c);
assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
/* wait for client to be reconnected */
while (!test1c_connected == 0 && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (!test1Finished && ++count < 10000)
while (test1_messages_received < 3 && ++count < 10000)
MySleep(100);
exit: MQTTAsync_destroy(&c);
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
......@@ -473,12 +532,16 @@ void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
printf("%s\n", message);
}
int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test1, };
sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
xml = fopen("TEST-test9.xml", "w");
fprintf(xml, "<testsuite name=\"test9\" tests=\"%lu\">\n", ARRAY_SIZE(tests) - 1);
......@@ -490,7 +553,7 @@ int main(int argc, char** argv)
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
failures = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment