Commit d5b49f7e authored by Ian Craggs's avatar Ian Craggs

Add remaining interops tests to sync_client_test.c

parent 86dcb196
...@@ -1246,6 +1246,14 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char** topic, int* qo ...@@ -1246,6 +1246,14 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char** topic, int* qo
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
if (pack != NULL) if (pack != NULL)
{ {
Suback* sub = (Suback*)pack;
ListElement* current = NULL;
i = 0;
while (ListNextElement(sub->qoss, &current))
{
int* reqqos = (int*)(current->content);
qos[i++] = *reqqos;
}
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket); rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
m->pack = NULL; m->pack = NULL;
} }
......
...@@ -384,9 +384,9 @@ char* readUTF(char** pptr, char* enddata) ...@@ -384,9 +384,9 @@ char* readUTF(char** pptr, char* enddata)
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
* @return the character read * @return the character read
*/ */
char readChar(char** pptr) unsigned char readChar(char** pptr)
{ {
char c = **pptr; unsigned char c = **pptr;
(*pptr)++; (*pptr)++;
return c; return c;
} }
...@@ -533,8 +533,8 @@ int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net) ...@@ -533,8 +533,8 @@ int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
header.byte = 0; header.byte = 0;
header.bits.type = type; header.bits.type = type;
header.bits.dup = dup; header.bits.dup = dup;
if (type == PUBREL) if (type == PUBREL)
header.bits.qos = 1; header.bits.qos = 1;
writeInt(&ptr, msgid); writeInt(&ptr, msgid);
if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != TCPSOCKET_INTERRUPTED) if ((rc = MQTTPacket_send(net, header, buf, 2, 1)) != TCPSOCKET_INTERRUPTED)
free(buf); free(buf);
......
...@@ -202,7 +202,7 @@ int MQTTPacket_encode(char* buf, int length); ...@@ -202,7 +202,7 @@ int MQTTPacket_encode(char* buf, int length);
int MQTTPacket_decode(networkHandles* net, int* value); int MQTTPacket_decode(networkHandles* net, int* value);
int readInt(char** pptr); int readInt(char** pptr);
char* readUTF(char** pptr, char* enddata); char* readUTF(char** pptr, char* enddata);
char readChar(char** pptr); unsigned char readChar(char** pptr);
void writeChar(char** pptr, char c); void writeChar(char** pptr, char c);
void writeInt(char** pptr, int anInt); void writeInt(char** pptr, int anInt);
void writeUTF(char** pptr, char* string); void writeUTF(char** pptr, char* string);
......
...@@ -127,11 +127,17 @@ int UTF8_validate(int len, char* data) ...@@ -127,11 +127,17 @@ int UTF8_validate(int len, char* data)
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (len == 0)
{
rc = 1;
goto exit;
}
curdata = UTF8_char_validate(len, data); curdata = UTF8_char_validate(len, data);
while (curdata && (curdata < data + len)) while (curdata && (curdata < data + len))
curdata = UTF8_char_validate(len, curdata); curdata = UTF8_char_validate(len, curdata);
rc = curdata != NULL; rc = curdata != NULL;
exit:
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
......
...@@ -50,9 +50,10 @@ struct Options ...@@ -50,9 +50,10 @@ struct Options
char* username; char* username;
char* password; char* password;
int verbose; int verbose;
int test_no;
int MQTTVersion; int MQTTVersion;
int iterations; int iterations;
int run_dollar_topics_test;
int run_subscribe_failure_test;
} options = } options =
{ {
"tcp://localhost:1883", "tcp://localhost:1883",
...@@ -61,9 +62,10 @@ struct Options ...@@ -61,9 +62,10 @@ struct Options
NULL, NULL,
NULL, NULL,
0, 0,
0,
MQTTVERSION_3_1_1, MQTTVERSION_3_1_1,
1, 1,
0,
0,
}; };
...@@ -79,19 +81,22 @@ void getopts(int argc, char** argv) ...@@ -79,19 +81,22 @@ void getopts(int argc, char** argv)
while (count < argc) while (count < argc)
{ {
if (strcmp(argv[count], "--test_no") == 0) if (strcmp(argv[count], "--dollar_topics_test") == 0 || strcmp(argv[count], "--$") == 0)
{ {
if (++count < argc) options.run_dollar_topics_test = 1;
options.test_no = atoi(argv[count]); printf("Running $ topics test\n");
else }
usage(); else if (strcmp(argv[count], "--subscribe_failure_test") == 0 || strcmp(argv[count], "-s") == 0)
{
options.run_subscribe_failure_test = 1;
printf("Running subscribe failure test\n");
} }
else if (strcmp(argv[count], "--connection") == 0) else if (strcmp(argv[count], "--connection") == 0)
{ {
if (++count < argc) if (++count < argc)
{ {
options.connection = argv[count]; options.connection = argv[count];
printf("\nSetting connection to %s\n", options.connection); printf("Setting connection to %s\n", options.connection);
} }
else else
usage(); usage();
...@@ -101,7 +106,7 @@ void getopts(int argc, char** argv) ...@@ -101,7 +106,7 @@ void getopts(int argc, char** argv)
if (++count < argc) if (++count < argc)
{ {
options.clientid1 = argv[count]; options.clientid1 = argv[count];
printf("\nSetting clientid1 to %s\n", options.clientid1); printf("Setting clientid1 to %s\n", options.clientid1);
} }
else else
usage(); usage();
...@@ -111,7 +116,7 @@ void getopts(int argc, char** argv) ...@@ -111,7 +116,7 @@ void getopts(int argc, char** argv)
if (++count < argc) if (++count < argc)
{ {
options.clientid2 = argv[count]; options.clientid2 = argv[count];
printf("\nSetting clientid2 to %s\n", options.clientid2); printf("Setting clientid2 to %s\n", options.clientid2);
} }
else else
usage(); usage();
...@@ -121,7 +126,7 @@ void getopts(int argc, char** argv) ...@@ -121,7 +126,7 @@ void getopts(int argc, char** argv)
if (++count < argc) if (++count < argc)
{ {
options.username = argv[count]; options.username = argv[count];
printf("\nSetting username to %s\n", options.username); printf("Setting username to %s\n", options.username);
} }
else else
usage(); usage();
...@@ -131,7 +136,7 @@ void getopts(int argc, char** argv) ...@@ -131,7 +136,7 @@ void getopts(int argc, char** argv)
if (++count < argc) if (++count < argc)
{ {
options.password = argv[count]; options.password = argv[count];
printf("\nSetting password to %s\n", options.password); printf("Setting password to %s\n", options.password);
} }
else else
usage(); usage();
...@@ -141,7 +146,7 @@ void getopts(int argc, char** argv) ...@@ -141,7 +146,7 @@ void getopts(int argc, char** argv)
if (++count < argc) if (++count < argc)
{ {
options.MQTTVersion = atoi(argv[count]); options.MQTTVersion = atoi(argv[count]);
printf("setting MQTT version to %d\n", options.MQTTVersion); printf("Setting MQTT version to %d\n", options.MQTTVersion);
} }
else else
usage(); usage();
...@@ -149,7 +154,10 @@ void getopts(int argc, char** argv) ...@@ -149,7 +154,10 @@ void getopts(int argc, char** argv)
else if (strcmp(argv[count], "--iterations") == 0) else if (strcmp(argv[count], "--iterations") == 0)
{ {
if (++count < argc) if (++count < argc)
{
options.iterations = atoi(argv[count]); options.iterations = atoi(argv[count]);
printf("Setting iterations to %d\n", options.iterations);
}
else else
usage(); usage();
} }
...@@ -232,14 +240,17 @@ void myassert(char* filename, int lineno, char* description, int value, char* fo ...@@ -232,14 +240,17 @@ void myassert(char* filename, int lineno, char* description, int value, char* fo
++tests; ++tests;
if (!value) if (!value)
{ {
int count;
va_list args; va_list args;
++failures; ++failures;
printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description); printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
va_start(args, format); va_start(args, format);
vprintf(format, args); count = vprintf(format, args);
va_end(args); va_end(args);
if (count)
printf("\n");
//cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n", //cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
// description, filename, lineno); // description, filename, lineno);
...@@ -273,6 +284,18 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_mess ...@@ -273,6 +284,18 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_mess
} }
void clearMessages()
{
int i;
for (i = 0; i < messageCount; ++i)
{
MQTTClient_free(messagesArrived[i].topicName);
MQTTClient_freeMessage(&messagesArrived[i].m);
}
messageCount = 0;
}
void cleanup() void cleanup()
{ {
// clean all client state // clean all client state
...@@ -336,12 +359,7 @@ void cleanup() ...@@ -336,12 +359,7 @@ void cleanup()
MQTTClient_destroy(&aclient); MQTTClient_destroy(&aclient);
for (i = 0; i < messageCount; ++i) clearMessages();
{
MQTTClient_free(messagesArrived[i].topicName);
MQTTClient_freeMessage(&messagesArrived[i].m);
}
messageCount = 0;
MyLog(LOGA_INFO, "Finished cleaning up"); MyLog(LOGA_INFO, "Finished cleaning up");
} }
...@@ -394,12 +412,7 @@ int basic_test() ...@@ -394,12 +412,7 @@ int basic_test()
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
assert("3 Messages received", messageCount == 3, "messageCount was %d", messageCount); assert("3 Messages received", messageCount == 3, "messageCount was %d", messageCount);
for (i = 0; i < messageCount; ++i) clearMessages();
{
MQTTClient_free(messagesArrived[i].topicName);
MQTTClient_freeMessage(&messagesArrived[i].m);
}
messageCount = 0;
rc = MQTTClient_connect(aclient, &opts); rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
...@@ -484,12 +497,7 @@ int offline_message_queueing_test() ...@@ -484,12 +497,7 @@ int offline_message_queueing_test()
MyLog(LOGA_INFO, "This server %s queueing QoS 0 messages for offline clients", (messageCount == 3) ? "is" : "is not"); MyLog(LOGA_INFO, "This server %s queueing QoS 0 messages for offline clients", (messageCount == 3) ? "is" : "is not");
for (i = 0; i < messageCount; ++i) clearMessages();
{
MQTTClient_free(messagesArrived[i].topicName);
MQTTClient_freeMessage(&messagesArrived[i].m);
}
messageCount = 0;
MyLog(LOGA_INFO, "Offline message queueing test %s", (failures == 0) ? "succeeded" : "failed"); MyLog(LOGA_INFO, "Offline message queueing test %s", (failures == 0) ? "succeeded" : "failed");
return failures; return failures;
...@@ -667,7 +675,7 @@ typedef struct ...@@ -667,7 +675,7 @@ typedef struct
int will_message_test() int will_message_test()
{ {
int i, rc; int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer; MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient aclient, bclient; MQTTClient aclient, bclient;
...@@ -711,7 +719,9 @@ int will_message_test() ...@@ -711,7 +719,9 @@ int will_message_test()
msleep(100); msleep(100);
test6_socket_close(((MQTTClients*)aclient)->c->net.socket); test6_socket_close(((MQTTClients*)aclient)->c->net.socket);
msleep(5000);
while (messageCount == 0 && ++count < 10)
msleep(1000);
rc = MQTTClient_disconnect(bclient, 100); rc = MQTTClient_disconnect(bclient, 100);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
...@@ -729,145 +739,335 @@ int will_message_test() ...@@ -729,145 +739,335 @@ int will_message_test()
} }
#if 0 int overlapping_subscriptions_test()
def overlapping_subscriptions_test(): {
# overlapping subscriptions. When there is more than one matching subscription for the same client for a topic, /* overlapping subscriptions. When there is more than one matching subscription for the same client for a topic,
# the server may send back one message with the highest QoS of any matching subscription, or one message for the server may send back one message with the highest QoS of any matching subscription, or one message for
# each subscription with a matching QoS. each subscription with a matching QoS. */
succeeded = True
try: int i, rc;
callback.clear() MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
callback2.clear() MQTTClient aclient;
aclient.connect(host=host, port=port) char* topicList[] = {wildtopics[6], wildtopics[0]};
aclient.subscribe([wildtopics[6], wildtopics[0]], [2, 1]) int qosList[] = {2, 1};
aclient.publish(topics[3], b"overlapping topic filters", 2)
time.sleep(1) MyLog(LOGA_INFO, "Starting overlapping subscriptions test");
assert len(callback.messages) in [1, 2]
if len(callback.messages) == 1: clearMessages();
print("This server is publishing one message for all matching overlapping subscriptions, not one for each.") tests = failures = 0;
assert callback.messages[0][2] == 2
else: opts.keepAliveInterval = 20;
print("This server is publishing one message per each matching overlapping subscription.") opts.cleansession = 1;
assert (callback.messages[0][2] == 2 and callback.messages[1][2] == 1) or \ opts.username = options.username;
(callback.messages[0][2] == 1 and callback.messages[1][2] == 2), callback.messages opts.password = options.password;
aclient.disconnect() opts.MQTTVersion = options.MQTTVersion;
except:
traceback.print_exc() rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
succeeded = False assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
print("Overlapping subscriptions test", "succeeded" if succeeded else "failed")
return succeeded rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_subscribeMany(aclient, 2, topicList, qosList);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_publish(aclient, topics[3], strlen("overlapping topic filters") + 1,
"overlapping topic filters", 2, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
msleep(1000);
assert("1 or 2 messages received", messageCount == 1 || messageCount == 2, "messageCount was %d", messageCount);
if (messageCount == 1)
{
MyLog(LOGA_INFO, "This server is publishing one message for all matching overlapping subscriptions, not one for each.");
assert("QoS should be 2", messagesArrived[0].m->qos == 2, "QoS was %d", messagesArrived[0].m->qos);
}
else
{
MyLog(LOGA_INFO, "This server is publishing one message per each matching overlapping subscription.");
assert1("QoSs should be 1 and 2",
(messagesArrived[0].m->qos == 2 && messagesArrived[1].m->qos == 1) ||
(messagesArrived[0].m->qos == 1 && messagesArrived[1].m->qos == 2),
"QoSs were %d %d", messagesArrived[0].m->qos, messagesArrived[1].m->qos);
}
rc = MQTTClient_disconnect(aclient, 100);
MQTTClient_destroy(&aclient);
MyLog(LOGA_INFO, "Overlapping subscription test %s", (failures == 0) ? "succeeded" : "failed");
return failures;
}
int keepalive_test()
{
/* keepalive processing. We should be kicked off by the server if we don't send or receive any data, and don't send
any pings either. */
int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
MQTTClient aclient, bclient;
MyLog(LOGA_INFO, "Starting keepalive test");
tests = failures = 0;
clearMessages();
opts.keepAliveInterval = 5;
opts.cleansession = 1;
opts.username = options.username;
opts.password = options.password;
opts.MQTTVersion = options.MQTTVersion;
opts.will = &wopts;
opts.will->message = "keepalive expiry";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = topics[4];
rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_create(&bclient, options.connection, options.clientid2, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_setCallbacks(bclient, NULL, NULL, messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_connect(bclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_subscribe(bclient, topics[4], 2);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
while (messageCount == 0 && ++count < 15)
msleep(1000);
rc = MQTTClient_disconnect(bclient, 100);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
assert("Should have will message", messageCount == 1, "messageCount was %d", messageCount);
rc = MQTTClient_disconnect(aclient, 100);
MQTTClient_destroy(&aclient);
MyLog(LOGA_INFO, "Keepalive test %s", (failures == 0) ? "succeeded" : "failed");
return failures;
}
int redelivery_on_reconnect_test()
{
/* redelivery on reconnect. When a QoS 1 or 2 exchange has not been completed, the server should retry the
appropriate MQTT packets */
int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient aclient;
MyLog(LOGA_INFO, "Starting redelivery on reconnect test");
tests = failures = 0;
clearMessages();
opts.keepAliveInterval = 0;
opts.cleansession = 0;
opts.username = options.username;
opts.password = options.password;
opts.MQTTVersion = options.MQTTVersion;
rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_subscribe(aclient, wildtopics[6], 2);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
MQTTClient_yield();
// no background processing because no callback has been set
rc = MQTTClient_publish(aclient, topics[1], 6, "qos 1", 2, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_publish(aclient, topics[3], 6, "qos 2", 2, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_disconnect(aclient, 0);
assert("No messages should have been received yet", messageCount == 0, "messageCount was %d", messageCount);
rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
while (messageCount < 2 && ++count < 5)
msleep(1000);
assert("Should have 2 messages", messageCount == 2, "messageCount was %d", messageCount);
rc = MQTTClient_disconnect(aclient, 100);
MQTTClient_destroy(&aclient);
MyLog(LOGA_INFO, "Redelivery on reconnect test %s", (failures == 0) ? "succeeded" : "failed");
return failures;
}
int zero_length_clientid_test()
{
int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient aclient;
MyLog(LOGA_INFO, "Starting zero length clientid test");
tests = failures = 0;
clearMessages();
opts.keepAliveInterval = 0;
opts.cleansession = 0;
opts.username = options.username;
opts.password = options.password;
opts.MQTTVersion = options.MQTTVersion;
rc = MQTTClient_create(&aclient, options.connection, "", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("rc 2 from connect", rc == 2, "rc was %d", rc); // this should always fail
opts.cleansession = 1;
rc = MQTTClient_connect(aclient, &opts);
assert("Connack rc should be 0 or 2", rc == MQTTCLIENT_SUCCESS || rc == 2, "rc was %d", rc);
MyLog(LOGA_INFO, "This server %s support zero length clientids", (rc == 2) ? "does not" : "does");
if (rc == MQTTCLIENT_SUCCESS)
rc = MQTTClient_disconnect(aclient, 100);
MQTTClient_destroy(&aclient);
MyLog(LOGA_INFO, "Zero length clientid test %s", (failures == 0) ? "succeeded" : "failed");
return failures;
}
int dollar_topics_test()
{
/* $ topics. The specification says that a topic filter which starts with a wildcard does not match topic names that
begin with a $. Publishing to a topic which starts with a $ may not be allowed on some servers (which is entirely valid),
so this test will not work and should be omitted in that case.
*/
int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient aclient;
char dollartopic[20];
MyLog(LOGA_INFO, "Starting $ topics test");
sprintf(dollartopic, "$%s", topics[1]);
clearMessages();
opts.keepAliveInterval = 5;
opts.cleansession = 1;
opts.username = options.username;
opts.password = options.password;
opts.MQTTVersion = options.MQTTVersion;
rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
msleep(1000); // wait for any retained messages, hopefully
clearMessages();
rc = MQTTClient_publish(aclient, topics[1], 20, "not sent to dollar topic", 1, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_publish(aclient, dollartopic, 20, "sent to dollar topic", 1, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
msleep(1000);
assert("Should have 1 message", messageCount == 1, "messageCount was %d", messageCount);
rc = MQTTClient_disconnect(aclient, 100);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
MQTTClient_destroy(&aclient);
MyLog(LOGA_INFO, "$ topics test %s", (failures == 0) ? "succeeded" : "failed");
return failures;
}
int subscribe_failure_test()
{
/* Subscribe failure. A new feature of MQTT 3.1.1 is the ability to send back negative reponses to subscribe
requests. One way of doing this is to subscribe to a topic which is not allowed to be subscribed to.
*/
int i, rc, count = 0;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTClient aclient;
unsigned int subqos = 2;
MyLog(LOGA_INFO, "Starting subscribe failure test");
clearMessages();
opts.keepAliveInterval = 5;
opts.cleansession = 1;
opts.username = options.username;
opts.password = options.password;
opts.MQTTVersion = options.MQTTVersion;
rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_connect(aclient, &opts);
assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
def keepalive_test(): rc = MQTTClient_subscribeMany(aclient, 1, &nosubscribe_topics[0], &subqos);
# keepalive processing. We should be kicked off by the server if we don't send or receive any data, and don't send assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
# any pings either. assert("0x80 rc from subscribe", subqos == 0x80, "subqos was %d", subqos);
succeeded = True
try: rc = MQTTClient_disconnect(aclient, 100);
callback2.clear() assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
aclient.connect(host=host, port=port, cleansession=True, keepalive=5, willFlag=True,
willTopic=topics[4], willMessage=b"keepalive expiry") MQTTClient_destroy(&aclient);
bclient.connect(host=host, port=port, cleansession=True, keepalive=0)
bclient.subscribe([topics[4]], [2]) MyLog(LOGA_INFO, "Subscribe failure test %s", (failures == 0) ? "succeeded" : "failed");
time.sleep(15) return failures;
bclient.disconnect() }
assert len(callback2.messages) == 1, "length should be 1: %s" % callback2.messages # should have the will message
except:
traceback.print_exc()
succeeded = False
print("Keepalive test", "succeeded" if succeeded else "failed")
return succeeded
def redelivery_on_reconnect_test():
# redelivery on reconnect. When a QoS 1 or 2 exchange has not been completed, the server should retry the
# appropriate MQTT packets
succeeded = True
try:
callback.clear()
callback2.clear()
bclient.connect(host=host, port=port, cleansession=False)
bclient.subscribe([wildtopics[6]], [2])
bclient.pause() # stops background processing
bclient.publish(topics[1], b"", 1, retained=False)
bclient.publish(topics[3], b"", 2, retained=False)
time.sleep(1)
bclient.disconnect()
assert len(callback2.messages) == 0
bclient.connect(host=host, port=port, cleansession=False)
bclient.resume()
time.sleep(3)
assert len(callback2.messages) == 2, "length should be 2: %s" % callback2.messages
bclient.disconnect()
except:
traceback.print_exc()
succeeded = False
print("Redelivery on reconnect test", "succeeded" if succeeded else "failed")
return succeeded
# 0 length clientid
def zero_length_clientid_test():
succeeded = True
try:
client0 = mqtt.client.Client("")
fails = False
try:
client0.connect(host=host, port=port, cleansession=False) # should be rejected
except:
fails = True
assert fails == True
fails = False
try:
client0.connect(host=host, port=port, cleansession=True) # should work
except:
fails = True
assert fails == False
client0.disconnect()
except:
traceback.print_exc()
succeeded = False
print("Zero length clientid test", "succeeded" if succeeded else "failed")
return succeeded
def subscribe_failure_test():
# Subscribe failure. A new feature of MQTT 3.1.1 is the ability to send back negative reponses to subscribe
# requests. One way of doing this is to subscribe to a topic which is not allowed to be subscribed to.
succeeded = True
try:
callback.clear()
aclient.connect(host=host, port=port)
aclient.subscribe([nosubscribe_topics[0]], [2])
time.sleep(.2)
# subscribeds is a list of (msgid, [qos])
assert callback.subscribeds[0][1][0] == 0x80, "return code should be 0x80 %s" % callback.subscribeds
except:
traceback.print_exc()
succeeded = False
print("Subscribe failure test", "succeeded" if succeeded else "failed")
return succeeded
def dollar_topics_test():
# $ topics. The specification says that a topic filter which starts with a wildcard does not match topic names that
# begin with a $. Publishing to a topic which starts with a $ may not be allowed on some servers (which is entirely valid),
# so this test will not work and should be omitted in that case.
succeeded = True
try:
callback2.clear()
bclient.connect(host=host, port=port, cleansession=True, keepalive=0)
bclient.subscribe([wildtopics[5]], [2])
time.sleep(1) # wait for all retained messages, hopefully
callback2.clear()
bclient.publish("$"+topics[1], b"", 1, retained=False)
time.sleep(.2)
assert len(callback2.messages) == 0, callback2.messages
bclient.disconnect()
except:
traceback.print_exc()
succeeded = False
print("$ topics test", "succeeded" if succeeded else "failed")
return succeeded
#endif
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
...@@ -879,57 +1079,23 @@ int main(int argc, char** argv) ...@@ -879,57 +1079,23 @@ int main(int argc, char** argv)
for (i = 0; i < options.iterations; ++i) for (i = 0; i < options.iterations; ++i)
{ {
cleanup(); cleanup();
all_failures += basic_test(); all_failures += basic_test() +
all_failures += offline_message_queueing_test(); offline_message_queueing_test() +
all_failures += retained_message_test(); retained_message_test() +
all_failures += will_message_test(); will_message_test() +
overlapping_subscriptions_test() +
keepalive_test() +
redelivery_on_reconnect_test() +
zero_length_clientid_test();
if (options.run_dollar_topics_test)
all_failures += dollar_topics_test();
if (options.run_subscribe_failure_test)
all_failures += subscribe_failure_test();
} }
MyLog(LOGA_INFO, "Test suite %s", (failures == 0) ? "succeeded" : "failed"); MyLog(LOGA_INFO, "Test suite %s", (all_failures == 0) ? "succeeded" : "failed");
#if 0
run_dollar_topics_test = run_zero_length_clientid_test = run_subscribe_failure_test = False
iterations = 1
host = "localhost"
port = 1883
for o, a in opts:
if o in ("--help"):
usage()
sys.exit()
elif o in ("-z", "--zero_length_clientid"):
run_zero_length_clientid_test = True
elif o in ("-d", "--dollar_topics"):
run_dollar_topics_test = True
elif o in ("-s", "--subscribe_failure"):
run_subscribe_failure_test = True
elif o in ("-n", "--nosubscribe_topic_filter"):
nosubscribe_topic_filter = a
elif o in ("-h", "--hostname"):
host = a
elif o in ("-p", "--port"):
port = int(a)
elif o in ("--iterations"):
iterations = int(a)
else:
assert False, "unhandled option"
tests = [basic_test, retained_message_test, offline_message_queueing_test, will_message_test,
overlapping_subscriptions_test, keepalive_test, redelivery_on_reconnect_test]
if run_zero_length_clientid_test:
tests.append(zero_length_clientid_test)
if run_subscribe_failure_test:
tests.append(subscribe_failure_test)
if run_dollar_topics_test:
tests.append(dollar_topics_test)
for i in range(iterations):
print("test suite", "succeeded" if False not in [test() for test in tests] else "failed")
#endif
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment