Commit a8926720 authored by Ian Craggs's avatar Ian Craggs

Add full test1 tests for V5

parent 1b61a3bf
...@@ -801,7 +801,7 @@ static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason ...@@ -801,7 +801,7 @@ static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason
client->connected = 0; client->connected = 0;
client->connect_state = 0; client->connect_state = 0;
if (client->cleansession) if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
MQTTClient_cleanSession(client); MQTTClient_cleanSession(client);
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -1551,7 +1551,13 @@ exit: ...@@ -1551,7 +1551,13 @@ exit:
int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos) int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
{ {
MQTTResponse response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL); MQTTClients* m = handle;
MQTTResponse response = {MQTTCLIENT_SUCCESS, NULL};
if (m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
else
response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
return response.reasonCode; return response.reasonCode;
} }
...@@ -1575,7 +1581,13 @@ MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos ...@@ -1575,7 +1581,13 @@ MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos
int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos) int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
{ {
MQTTResponse response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL); MQTTClients* m = handle;
MQTTResponse response = {MQTTCLIENT_SUCCESS, NULL};
if (m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
else
response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
return response.reasonCode; return response.reasonCode;
} }
...@@ -1698,8 +1710,6 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p ...@@ -1698,8 +1710,6 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p
rc = MQTTCLIENT_DISCONNECTED; rc = MQTTCLIENT_DISCONNECTED;
else if (!UTF8_validateString(topicName)) else if (!UTF8_validateString(topicName))
rc = MQTTCLIENT_BAD_UTF8_STRING; rc = MQTTCLIENT_BAD_UTF8_STRING;
else if (m->c->MQTTVersion >= MQTTVERSION_5 && properties == NULL)
rc = MQTTCLIENT_NULL_PARAMETER;
if (rc != MQTTCLIENT_SUCCESS) if (rc != MQTTCLIENT_SUCCESS)
goto exit; goto exit;
...@@ -1738,7 +1748,15 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p ...@@ -1738,7 +1748,15 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p
p->msgId = msgid; p->msgId = msgid;
p->MQTTVersion = m->c->MQTTVersion; p->MQTTVersion = m->c->MQTTVersion;
if (m->c->MQTTVersion >= MQTTVERSION_5) if (m->c->MQTTVersion >= MQTTVERSION_5)
p->properties = *properties; {
if (properties)
p->properties = *properties;
else
{
MQTTProperties props = MQTTProperties_initializer;
p->properties = props;
}
}
rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg); rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
...@@ -1781,7 +1799,13 @@ exit: ...@@ -1781,7 +1799,13 @@ exit:
int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, void* payload, int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTClient_deliveryToken* deliveryToken) int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
{ {
MQTTResponse rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken); MQTTClients* m = handle;
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
if (m->c->MQTTVersion >= MQTTVERSION_5)
rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
else
rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
return rc.reasonCode; return rc.reasonCode;
} }
...@@ -1820,13 +1844,16 @@ exit: ...@@ -1820,13 +1844,16 @@ exit:
int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message, int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
MQTTClient_deliveryToken* deliveryToken) MQTTClient_deliveryToken* deliveryToken)
{ {
MQTTClients* m = handle;
MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL}; MQTTResponse rc = {MQTTCLIENT_SUCCESS, NULL};
if (strncmp(message->struct_id, "MQTM", 4) != 0 || if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
(message->struct_version != 0 && message->struct_version != 1)) (message->struct_version != 0 && message->struct_version != 1))
return MQTTCLIENT_BAD_STRUCTURE; rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
else if (m->c->MQTTVersion >= MQTTVERSION_5)
rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken); rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
else
rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
return rc.reasonCode; return rc.reasonCode;
} }
......
...@@ -712,6 +712,7 @@ typedef struct ...@@ -712,6 +712,7 @@ typedef struct
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1 * MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1
* MQTTVERSION_3_1 (3) = only try version 3.1 * MQTTVERSION_3_1 (3) = only try version 3.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1 * MQTTVERSION_3_1_1 (4) = only try version 3.1.1
* MQTTVERSION_5 (5) = only try version 5.0
*/ */
int MQTTVersion; int MQTTVersion;
/** /**
...@@ -726,8 +727,9 @@ typedef struct ...@@ -726,8 +727,9 @@ typedef struct
/** /**
* Optional binary password. Only checked and used if the password option is NULL * Optional binary password. Only checked and used if the password option is NULL
*/ */
struct { struct
int len; /**< binary password length */ {
int len; /**< binary password length */
const void* data; /**< binary password data */ const void* data; /**< binary password data */
} binarypwd; } binarypwd;
} MQTTClient_connectOptions; } MQTTClient_connectOptions;
......
...@@ -110,8 +110,44 @@ ADD_TEST( ...@@ -110,8 +110,44 @@ ADD_TEST(
COMMAND "test15" "--test_no" "1" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test15" "--test_no" "1" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test15-2-multithread-callbacks
COMMAND "test15" "--test_no" "2" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test15-3-connack-return-codes
COMMAND "test15" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test15-4-client-persistence
COMMAND "test15" "--test_no" "4" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test15-5-disconnect-with-quiesce
COMMAND "test15" "--test_no" "5" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test15-6-connlost-will-message
COMMAND "test15" "--test_no" "6" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test15-7-connlost-binary-will-message
COMMAND "test15" "--test_no" "7" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test15-1-single-thread-client test15-1-single-thread-client
test15-2-multithread-callbacks
test15-3-connack-return-codes
test15-4-client-persistence
test15-5-disconnect-with-quiesce
test15-6-connlost-will-message
test15-7-connlost-binary-will-message
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
......
This diff is collapsed.
This diff is collapsed.
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
Contributors: Contributors:
Ian Craggs - initial implementation and/or documentation Ian Craggs - initial implementation and/or documentation
Ian Craggs - add MQTTV5 support
******************************************************************* *******************************************************************
""" """
from __future__ import print_function from __future__ import print_function
...@@ -20,11 +21,15 @@ from __future__ import print_function ...@@ -20,11 +21,15 @@ from __future__ import print_function
import socket, sys, select, traceback, datetime, os import socket, sys, select, traceback, datetime, os
try: try:
import socketserver import socketserver
import MQTTV311 as MQTTV3 # Trace MQTT traffic - Python 3 version import MQTTV311 # Trace MQTT traffic - Python 3 version
import MQTTV5
except: except:
traceback.print_exc()
import SocketServer as socketserver import SocketServer as socketserver
import MQTTV3112 as MQTTV3 # Trace MQTT traffic - Python 2 version import MQTTV3112 as MQTTV311 # Trace MQTT traffic - Python 2 version
import MQTTV5
MQTT = MQTTV311
logging = True logging = True
myWindow = None myWindow = None
...@@ -38,6 +43,7 @@ suspended = [] ...@@ -38,6 +43,7 @@ suspended = []
class MyHandler(socketserver.StreamRequestHandler): class MyHandler(socketserver.StreamRequestHandler):
def handle(self): def handle(self):
global MQTT
if not hasattr(self, "ids"): if not hasattr(self, "ids"):
self.ids = {} self.ids = {}
if not hasattr(self, "versions"): if not hasattr(self, "versions"):
...@@ -55,12 +61,30 @@ class MyHandler(socketserver.StreamRequestHandler): ...@@ -55,12 +61,30 @@ class MyHandler(socketserver.StreamRequestHandler):
if s in suspended: if s in suspended:
print("suspended") print("suspended")
if s == clients and s not in suspended: if s == clients and s not in suspended:
inbuf = MQTTV3.getPacket(clients) # get one packet inbuf = MQTT.getPacket(clients) # get one packet
if inbuf == None: if inbuf == None:
break break
try: try:
packet = MQTTV3.unpackPacket(inbuf) # if connect, this could be MQTTV3 or MQTTV5
if packet.fh.MessageType == MQTTV3.PUBLISH and \ if inbuf[0] >> 4 == 1: # connect packet
protocol_string = b'MQTT'
pos = inbuf.find(protocol_string)
if pos != -1:
version = inbuf[pos + len(protocol_string)]
if version == 5:
MQTT = MQTTV5
else:
MQTT = MQTTV311
packet = MQTT.unpackPacket(inbuf)
if hasattr(packet.fh, "MessageType"):
packet_type = packet.fh.MessageType
publish_type = MQTT.PUBLISH
connect_type = MQTT.CONNECT
else:
packet_type = packet.fh.PacketType
publish_type = MQTT.PacketTypes.PUBLISH
connect_type = MQTT.PacketTypes.CONNECT
if packet_type == publish_type and \
packet.topicName == "MQTTSAS topic" and \ packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE": packet.data == b"TERMINATE":
print("Terminating client", self.ids[id(clients)]) print("Terminating client", self.ids[id(clients)])
...@@ -68,26 +92,26 @@ class MyHandler(socketserver.StreamRequestHandler): ...@@ -68,26 +92,26 @@ class MyHandler(socketserver.StreamRequestHandler):
clients.close() clients.close()
terminated = True terminated = True
break break
elif packet.fh.MessageType == MQTTV3.PUBLISH and \ elif packet_type == publish_type and \
packet.topicName == "MQTTSAS topic" and \ packet.topicName == "MQTTSAS topic" and \
packet.data == b"TERMINATE_SERVER": packet.data == b"TERMINATE_SERVER":
print("Suspending client ", self.ids[id(clients)]) print("Suspending client ", self.ids[id(clients)])
suspended.append(clients) suspended.append(clients)
elif packet.fh.MessageType == MQTTV3.CONNECT: elif packet_type == connect_type:
self.ids[id(clients)] = packet.ClientIdentifier self.ids[id(clients)] = packet.ClientIdentifier
self.versions[id(clients)] = 3 self.versions[id(clients)] = 3
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet)) print(timestamp() , "C to S", self.ids[id(clients)], str(packet))
#print([hex(b) for b in inbuf]) #print([hex(b) for b in inbuf])
#print(inbuf) #print(inbuf)
except: except:
traceback.print_exc() traceback.print_exc()
brokers.send(inbuf) # pass it on brokers.send(inbuf) # pass it on
elif s == brokers: elif s == brokers:
inbuf = MQTTV3.getPacket(brokers) # get one packet inbuf = MQTT.getPacket(brokers) # get one packet
if inbuf == None: if inbuf == None:
break break
try: try:
print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf))) print(timestamp(), "S to C", self.ids[id(clients)], str(MQTT.unpackPacket(inbuf)))
except: except:
traceback.print_exc() traceback.print_exc()
clients.send(inbuf) clients.send(inbuf)
......
...@@ -1166,7 +1166,7 @@ int main(int argc, char** argv) ...@@ -1166,7 +1166,7 @@ int main(int argc, char** argv)
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
setenv("MQTT_C_CLIENT_TRACE", "ON", 1); setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0); setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1);
getopts(argc, argv); getopts(argc, argv);
......
This diff is collapsed.
...@@ -2056,8 +2056,7 @@ int test7(struct Options options) ...@@ -2056,8 +2056,7 @@ int test7(struct Options options)
{ {
char* testname = "test7"; char* testname = "test7";
int subsqos = 2; int subsqos = 2;
AsyncTestClient tc = AsyncTestClient tc = AsyncTestClient_initializer;
AsyncTestClient_initializer;
MQTTAsync c; MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
......
...@@ -61,7 +61,7 @@ struct ...@@ -61,7 +61,7 @@ struct
int persistence; int persistence;
} opts = } opts =
{ {
"tcp://localhost:1885", "tcp://localhost:1884",
NULL, NULL,
0, 0,
"tcp://localhost:7777", "tcp://localhost:7777",
...@@ -297,22 +297,24 @@ void control_connectionLost(void* context, char* cause) ...@@ -297,22 +297,24 @@ void control_connectionLost(void* context, char* cause)
*/ */
int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{ {
MyLog(LOGA_ALWAYS, "Control message arrived: %.*s %s", MyLog(LOGA_ALWAYS, "Control message arrived: %.*s wait message: %s",
m->payloadlen, m->payload, (wait_message == NULL) ? "None" : wait_message); m->payloadlen, m->payload, (wait_message == NULL) ? "None" : wait_message);
if (strncmp(m->payload, "stop", 4) == 0) if (strncmp(m->payload, "stop", 4) == 0)
{ {
MyLog(LOGA_ALWAYS, "Stop message arrived, stopping..."); MyLog(LOGA_ALWAYS, "Stop message arrived, stopping...");
stopping = 1; stopping = 1;
} }
else if (wait_message != NULL && strncmp(wait_message, m->payload, else if (wait_message != NULL && strncmp(wait_message, m->payload,
strlen(wait_message)) == 0) strlen(wait_message)) == 0)
{ {
MyLog(LOGA_ALWAYS, "Wait message %s found", wait_message);
control_found = 1; control_found = 1;
wait_message = NULL; wait_message = NULL;
} }
else if (wait_message2 != NULL && strncmp(wait_message2, m->payload, else if (wait_message2 != NULL && strncmp(wait_message2, m->payload,
strlen(wait_message2)) == 0) strlen(wait_message2)) == 0)
{ {
MyLog(LOGA_ALWAYS, "Wait message2 %s found", wait_message);
control_found = 2; control_found = 2;
wait_message2 = NULL; wait_message2 = NULL;
} }
...@@ -351,7 +353,7 @@ int control_wait(char* message) ...@@ -351,7 +353,7 @@ int control_wait(char* message)
sprintf(buf, "waiting for: %s", message); sprintf(buf, "waiting for: %s", message);
control_send(buf); control_send(buf);
MyLog(LOGA_ALWAYS, "waiting for: %s", message); MyLog(LOGA_ALWAYS, "Waiting for: %s", message);
while (control_found == 0 && stopping == 0) while (control_found == 0 && stopping == 0)
{ {
if (++count == 300) if (++count == 300)
...@@ -362,6 +364,7 @@ int control_wait(char* message) ...@@ -362,6 +364,7 @@ int control_wait(char* message)
} }
MySleep(1000); MySleep(1000);
} }
MyLog(LOGA_ALWAYS, "Control message found: %s, control_found %d", message, control_found);
return control_found; return control_found;
} }
...@@ -377,7 +380,7 @@ int control_which(char* message1, char* message2) ...@@ -377,7 +380,7 @@ int control_which(char* message1, char* message2)
while (control_found == 0) while (control_found == 0)
{ {
if (++count == 300) if (++count == 300)
return 0; /* time out and tell the caller the message was not found */ break; /* time out and tell the caller the message was not found */
MySleep(1000); MySleep(1000);
} }
return control_found; return control_found;
...@@ -750,7 +753,7 @@ void client_onSubscribe(void* context, MQTTAsync_successData* response) ...@@ -750,7 +753,7 @@ void client_onSubscribe(void* context, MQTTAsync_successData* response)
void client_onFailure(void* context, MQTTAsync_failureData* response) void client_onFailure(void* context, MQTTAsync_failureData* response)
{ {
MQTTAsync c = (MQTTAsync)context; MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In failure callback"); MyLog(LOGA_INFO, "In failure callback");
client_subscribed = -1; client_subscribed = -1;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment