Commit 0ec33aee authored by Ian Craggs's avatar Ian Craggs

Add subscription id test for MQTTClient

parent 00ae8572
......@@ -56,7 +56,7 @@ static struct nameToType
{USER_PROPERTY, UTF_8_STRING_PAIR},
{MAXIMUM_PACKET_SIZE, FOUR_BYTE_INTEGER},
{WILDCARD_SUBSCRIPTION_AVAILABLE, PROPERTY_TYPE_BYTE},
{SUBSCRIPTION_IDENTIFIER_AVAILABLE, PROPERTY_TYPE_BYTE},
{SUBSCRIPTION_IDENTIFIERS_AVAILABLE, PROPERTY_TYPE_BYTE},
{SHARED_SUBSCRIPTION_AVAILABLE, PROPERTY_TYPE_BYTE}
};
......@@ -345,7 +345,7 @@ struct {
{USER_PROPERTY, "USER_PROPERTY"},
{MAXIMUM_PACKET_SIZE, "MAXIMUM_PACKET_SIZE"},
{WILDCARD_SUBSCRIPTION_AVAILABLE, "WILDCARD_SUBSCRIPTION_AVAILABLE"},
{SUBSCRIPTION_IDENTIFIER_AVAILABLE, "SUBSCRIPTION_IDENTIFIER_AVAILABLE"},
{SUBSCRIPTION_IDENTIFIERS_AVAILABLE, "SUBSCRIPTION_IDENTIFIERS_AVAILABLE"},
{SHARED_SUBSCRIPTION_AVAILABLE, "SHARED_SUBSCRIPTION_AVAILABLE"}
};
......@@ -433,10 +433,25 @@ int MQTTProperties_hasProperty(MQTTProperties *props, int propid)
}
int MQTTProperties_getNumericValue(MQTTProperties *props, int propid)
int MQTTProperties_propertyCount(MQTTProperties *props, int propid)
{
int i = 0;
int count = 0;
for (i = 0; i < props->count; ++i)
{
if (propid == props->array[i].identifier)
count++;
}
return count;
}
int MQTTProperties_getNumericValueAt(MQTTProperties *props, int propid, int index)
{
int i = 0;
int rc = -9999999;
int cur_index = 0;
for (i = 0; i < props->count; ++i)
{
......@@ -444,6 +459,11 @@ int MQTTProperties_getNumericValue(MQTTProperties *props, int propid)
if (id == propid)
{
if (cur_index < index)
{
cur_index++;
continue;
}
switch (MQTTProperty_getType(id))
{
case PROPERTY_TYPE_BYTE:
......@@ -460,7 +480,45 @@ int MQTTProperties_getNumericValue(MQTTProperties *props, int propid)
rc = -999999;
break;
}
break;
}
}
return rc;
}
int MQTTProperties_getNumericValue(MQTTProperties *props, int propid)
{
return MQTTProperties_getNumericValueAt(props, propid, 0);
}
MQTTProperty* MQTTProperties_getPropertyAt(MQTTProperties *props, int propid, int index)
{
int i = 0;
MQTTProperty* result = NULL;
int cur_index = 0;
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
if (id == propid)
{
if (cur_index == index)
{
result = &props->array[i];
break;
}
else
cur_index++;
}
}
return result;
}
MQTTProperty* MQTTProperties_getProperty(MQTTProperties *props, int propid)
{
return MQTTProperties_getPropertyAt(props, propid, 0);
}
......@@ -45,7 +45,7 @@ enum PropertyNames {
USER_PROPERTY = 38,
MAXIMUM_PACKET_SIZE = 39,
WILDCARD_SUBSCRIPTION_AVAILABLE = 40,
SUBSCRIPTION_IDENTIFIER_AVAILABLE = 41,
SUBSCRIPTION_IDENTIFIERS_AVAILABLE = 41,
SHARED_SUBSCRIPTION_AVAILABLE = 42
};
......@@ -120,6 +120,10 @@ DLLExport void MQTTProperties_free(MQTTProperties* properties);
MQTTProperties MQTTProperties_copy(const MQTTProperties* props);
DLLExport int MQTTProperties_hasProperty(MQTTProperties *props, int propid);
DLLExport int MQTTProperties_propertyCount(MQTTProperties *props, int propid);
DLLExport int MQTTProperties_getNumericValue(MQTTProperties *props, int propid);
DLLExport int MQTTProperties_getNumericValueAt(MQTTProperties *props, int propid, int index);
DLLExport MQTTProperty* MQTTProperties_getProperty(MQTTProperties *props, int propid);
DLLExport MQTTProperty* MQTTProperties_getPropertyAt(MQTTProperties *props, int propid, int index);
#endif /* MQTTPROPERTIES_H */
......@@ -584,9 +584,15 @@ ADD_TEST(
COMMAND "test10" "--test_no" "2" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
ADD_TEST(
NAME test10-3-subscription_ids
COMMAND "test10" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES(
test10-1-client_topic_aliases
test10-2-server_topic_aliases
test10-3-subscription_ids
PROPERTIES TIMEOUT 540
)
......
......@@ -648,7 +648,7 @@ int test_server_topic_aliases(struct Options options)
int qos = 0;
const int msg_count = 3;
fprintf(xml, "<testcase classname=\"test_server_topic_aliases\" name=\"client topic aliases\"");
fprintf(xml, "<testcase classname=\"test_server_topic_aliases\" name=\"server topic aliases\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test 2 - server topic aliases");
......@@ -734,11 +734,155 @@ exit:
}
int main(int argc, char** argv)
int test_subscription_ids_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
{
static int received = 0;
static int first_topic_alias = 0;
int topicAlias = 0;
received++;
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
if (message->struct_version == 1)
{
int subsidcount = 0, i = 0;
subsidcount = MQTTProperties_propertyCount(&message->properties, SUBSCRIPTION_IDENTIFIER);
for (i = 0; i < subsidcount; ++i)
{
int subsid = MQTTProperties_getNumericValueAt(&message->properties, SUBSCRIPTION_IDENTIFIER, i);
assert("Subsid is i+1", subsid == i+1, "subsid is not correct %d\n", subsid);
}
logProperties(&message->properties);
}
messages_arrived++;
MQTTClient_free(topicName);
MQTTClient_freeMessage(&message);
return 1;
}
int test_subscription_ids(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperties subs_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTClient_deliveryToken dt;
int rc = 0;
int (*tests[])() = {NULL, test_client_topic_aliases, test_server_topic_aliases};
int i;
int count = 0;
char* test_topic = "test_subscription_ids";
const int msg_count = 1;
int subsids = 1;
fprintf(xml, "<testcase classname=\"test_subscription_ids\" name=\"subscription ids\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test 3 - subscription ids");
rc = MQTTClient_create(&c, options.connection, "subscription_ids",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
{
MQTTClient_destroy(&c);
goto exit;
}
rc = MQTTClient_setCallbacks(c, NULL, NULL, test_subscription_ids_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
MyLog(LOGA_DEBUG, "Connecting");
response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
if (response.reasonCode != MQTTCLIENT_SUCCESS)
goto exit;
if (response.properties)
{
if (MQTTProperties_hasProperty(response.properties, SUBSCRIPTION_IDENTIFIERS_AVAILABLE))
subsids = MQTTProperties_getNumericValue(response.properties, SUBSCRIPTION_IDENTIFIERS_AVAILABLE);
logProperties(response.properties);
MQTTResponse_free(response);
}
assert("Subscription ids must be available", subsids == 1, "subsids is %d", subsids);
/* subscribe to the test topic */
property.identifier = SUBSCRIPTION_IDENTIFIER;
property.value.integer4 = 1;
MQTTProperties_add(&subs_props, &property);
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &subs_props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
/* now to an overlapping topic */
property.value.integer4 = 2;
subs_props.array[0].value.integer4 = 2;
response = MQTTClient_subscribe5(c, "+", 2, NULL, &subs_props);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 2;
response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
/* should get responses */
while (messages_arrived < msg_count && ++count < 10)
{
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("1 message should have arrived", messages_arrived == msg_count, "was %d", messages_arrived);
rc = MQTTClient_disconnect5(c, 1000, SUCCESS, NULL);
MQTTProperties_free(&pubmsg.properties);
MQTTProperties_free(&subs_props);
MQTTProperties_free(&connect_props);
MQTTClient_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
int main(int argc, char** argv)
{
int rc = 0,
i;
int (*tests[])() = {NULL,
test_client_topic_aliases,
test_server_topic_aliases,
test_subscription_ids};
xml = fopen("TEST-test1.xml", "w");
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment