Commit 2bc9c403 authored by Ian Craggs's avatar Ian Craggs

MQTTV5 flow control (receive maximum)

parent 5bfa672b
...@@ -1194,7 +1194,7 @@ static void MQTTAsync_writeComplete(int socket, int rc) ...@@ -1194,7 +1194,7 @@ static void MQTTAsync_writeComplete(int socket, int rc)
data.alt.pub.message.payloadlen = command->details.pub.payloadlen; data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos; data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained; data.alt.pub.message.retained = command->details.pub.retained;
data.props = command->properties; data.properties = command->properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess5))(command->context, &data); (*(command->onSuccess5))(command->context, &data);
} }
...@@ -1269,6 +1269,14 @@ static int MQTTAsync_processCommand(void) ...@@ -1269,6 +1269,14 @@ static int MQTTAsync_processCommand(void)
{ {
; /* no more message ids available */ ; /* no more message ids available */
} }
else if (cmd->client->c->MQTTVersion >= MQTTVERSION_5 &&
((cmd->command.type == PUBLISH && cmd->command.details.pub.qos > 0) ||
cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
(cmd->client->c->outboundMsgs->count >= cmd->client->c->maxInflightMessages))
{
Log(TRACE_MIN, -1, "Blocking on server receive maximum for client %s",
cmd->client->c->clientID); /* flow control */
}
else else
{ {
command = cmd; command = cmd;
...@@ -1437,7 +1445,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1437,7 +1445,7 @@ static int MQTTAsync_processCommand(void)
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen; data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos; data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained; data.alt.pub.message.retained = command->command.details.pub.retained;
data.props = command->command.properties; data.properties = command->command.properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data); (*(command->command.onSuccess5))(command->command.context, &data);
} }
...@@ -2100,7 +2108,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -2100,7 +2108,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
data.alt.connect.serverURI = m->serverURI; data.alt.connect.serverURI = m->serverURI;
data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion; data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
data.alt.connect.sessionPresent = sessionPresent; data.alt.connect.sessionPresent = sessionPresent;
data.props = connack->properties; data.properties = connack->properties;
data.reasonCode = connack->rc; data.reasonCode = connack->rc;
(*(m->connect.onSuccess5))(m->connect.context, &data); (*(m->connect.onSuccess5))(m->connect.context, &data);
m->connect.onSuccess5 = NULL; /* don't accidentally call it again */ m->connect.onSuccess5 = NULL; /* don't accidentally call it again */
...@@ -2111,6 +2119,15 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -2111,6 +2119,15 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID);
(*(m->connected))(m->connected_context, reason); (*(m->connected))(m->connected_context, reason);
} }
if (m->c->MQTTVersion >= MQTTVERSION_5)
{
if (MQTTProperties_hasProperty(&connack->properties, RECEIVE_MAXIMUM))
{
int recv_max = MQTTProperties_getNumericValue(&connack->properties, RECEIVE_MAXIMUM);
if (m->c->maxInflightMessages > recv_max)
m->c->maxInflightMessages = recv_max;
}
}
MQTTPacket_freeConnack(connack); MQTTPacket_freeConnack(connack);
} }
else else
...@@ -2166,7 +2183,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -2166,7 +2183,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
*element++ = *(int*)(cur_qos->content); *element++ = *(int*)(cur_qos->content);
} }
data.token = command->command.token; data.token = command->command.token;
data.props = sub->properties; data.properties = sub->properties;
Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data); (*(command->command.onSuccess5))(command->command.context, &data);
if (array) if (array)
...@@ -2245,7 +2262,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -2245,7 +2262,7 @@ static thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
*element++ = *(enum MQTTReasonCodes*)(cur_rc->content); *element++ = *(enum MQTTReasonCodes*)(cur_rc->content);
} }
data.token = command->command.token; data.token = command->command.token;
data.props = unsub->properties; data.properties = unsub->properties;
Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data); (*(command->command.onSuccess5))(command->command.context, &data);
if (array) if (array)
...@@ -3539,7 +3556,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3539,7 +3556,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen; data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos; data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained; data.alt.pub.message.retained = command->command.details.pub.retained;
data.props = command->command.properties; data.properties = command->command.properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data); (*(command->command.onSuccess5))(command->command.context, &data);
} }
......
...@@ -495,7 +495,7 @@ typedef struct ...@@ -495,7 +495,7 @@ typedef struct
/** MQTT V5 reason code returned */ /** MQTT V5 reason code returned */
enum MQTTReasonCodes reasonCode; enum MQTTReasonCodes reasonCode;
/** MQTT V5 properties returned, if any */ /** MQTT V5 properties returned, if any */
MQTTProperties props; MQTTProperties properties;
/** A union of the different values that can be returned for subscribe, unsubscribe and publish. */ /** A union of the different values that can be returned for subscribe, unsubscribe and publish. */
union union
{ {
...@@ -1018,10 +1018,10 @@ typedef struct ...@@ -1018,10 +1018,10 @@ typedef struct
} MQTTAsync_connectOptions; } MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 10, NULL, NULL, NULL, 30, 0,\ #define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL} NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 6, 60, 0, 10, NULL, NULL, NULL, 30, 0,\ #define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 6, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL} NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL}
......
...@@ -1302,8 +1302,13 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO ...@@ -1302,8 +1302,13 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO
m->c->keepAliveInterval = options->keepAliveInterval; m->c->keepAliveInterval = options->keepAliveInterval;
setRetryLoopInterval(options->keepAliveInterval); setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession; m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
m->c->MQTTVersion = options->MQTTVersion; m->c->MQTTVersion = options->MQTTVersion;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
if (options->struct_version >= 6)
{
if (options->maxInflightMessages > 0)
m->c->maxInflightMessages = options->maxInflightMessages;
}
if (m->c->will) if (m->c->will)
{ {
...@@ -1458,7 +1463,7 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -1458,7 +1463,7 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
goto exit; goto exit;
} }
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 5) if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 6)
{ {
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE; rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit; goto exit;
...@@ -1545,10 +1550,19 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o ...@@ -1545,10 +1550,19 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
} }
#endif #endif
rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties); rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
if (rc.reasonCode == MQTTCLIENT_SUCCESS) if (rc.reasonCode == SUCCESS)
break; break;
} }
} }
if (rc.reasonCode == SUCCESS)
{
if (rc.properties && MQTTProperties_hasProperty(rc.properties, RECEIVE_MAXIMUM))
{
int recv_max = MQTTProperties_getNumericValue(rc.properties, RECEIVE_MAXIMUM);
if (m->c->maxInflightMessages > recv_max)
m->c->maxInflightMessages = recv_max;
}
}
exit: exit:
if (m && m->c && m->c->will) if (m && m->c && m->c->will)
...@@ -2470,6 +2484,19 @@ exit: ...@@ -2470,6 +2484,19 @@ exit:
return rc; return rc;
} }
void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
{
Log_setTraceLevel((enum LOG_LEVELS)level);
}
void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
{
Log_setTraceCallback((Log_traceCallback*)callback);
}
MQTTClient_nameValue* MQTTClient_getVersionInfo(void) MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
{ {
#define MAX_INFO_STRINGS 8 #define MAX_INFO_STRINGS 8
......
...@@ -672,12 +672,13 @@ typedef struct ...@@ -672,12 +672,13 @@ typedef struct
{ {
/** The eyecatcher for this structure. must be MQTC. */ /** The eyecatcher for this structure. must be MQTC. */
char struct_id[4]; char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3, 4 or 5. /** The version number of this structure. Must be 0, 1, 2, 3, 4, 5 or 6.
* 0 signifies no SSL options and no serverURIs * 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs * 1 signifies no serverURIs
* 2 signifies no MQTTVersion * 2 signifies no MQTTVersion
* 3 signifies no returned values * 3 signifies no returned values
* 4 signifies no binary password option * 4 signifies no binary password option
* 5 signifies no maxInflightMessages
*/ */
int struct_version; int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time /** The "keep alive" interval, measured in seconds, defines the maximum time
...@@ -718,7 +719,7 @@ typedef struct ...@@ -718,7 +719,7 @@ typedef struct
* message must be completed (acknowledgements received) before another * message must be completed (acknowledgements received) before another
* can be sent. Attempts to publish additional messages receive an * can be sent. Attempts to publish additional messages receive an
* ::MQTTCLIENT_MAX_MESSAGES_INFLIGHT return code. Setting this flag to * ::MQTTCLIENT_MAX_MESSAGES_INFLIGHT return code. Setting this flag to
* false allows up to 10 messages to be in-flight. This can increase * false allows up to 10 messages to be in-flight. This can increase
* overall throughput in some circumstances. * overall throughput in some circumstances.
*/ */
int reliable; int reliable;
...@@ -793,9 +794,13 @@ typedef struct ...@@ -793,9 +794,13 @@ typedef struct
int len; /**< binary password length */ int len; /**< binary password length */
const void* data; /**< binary password data */ const void* data; /**< binary password data */
} binarypwd; } binarypwd;
/**
* The maximum number of messages in flight
*/
int maxInflightMessages;
} MQTTClient_connectOptions; } MQTTClient_connectOptions;
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 5, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, 0, {NULL, 0, 0}, {0, NULL} } #define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 1, NULL, NULL, NULL, 30, 20, NULL, 0, NULL, 0, {NULL, 0, 0}, {0, NULL}, -1}
/** /**
* MQTTClient_libraryInfo is used to store details relating to the currently used * MQTTClient_libraryInfo is used to store details relating to the currently used
...@@ -1118,6 +1123,45 @@ DLLExport void MQTTClient_free(void* ptr); ...@@ -1118,6 +1123,45 @@ DLLExport void MQTTClient_free(void* ptr);
*/ */
DLLExport void MQTTClient_destroy(MQTTClient* handle); DLLExport void MQTTClient_destroy(MQTTClient* handle);
enum MQTTCLIENT_TRACE_LEVELS
{
MQTTCLIENT_TRACE_MAXIMUM = 1,
MQTTCLIENT_TRACE_MEDIUM,
MQTTCLIENT_TRACE_MINIMUM,
MQTTCLIENT_TRACE_PROTOCOL,
MQTTCLIENT_TRACE_ERROR,
MQTTCLIENT_TRACE_SEVERE,
MQTTCLIENT_TRACE_FATAL,
};
/**
* This function sets the level of trace information which will be
* returned in the trace callback.
* @param level the trace level required
*/
DLLExport void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level);
/**
* This is a callback function prototype which must be implemented if you want
* to receive trace information.
* @param level the trace level of the message returned
* @param meesage the trace message. This is a pointer to a static buffer which
* will be overwritten on each call. You must copy the data if you want to keep
* it for later.
*/
typedef void MQTTClient_traceCallback(enum MQTTCLIENT_TRACE_LEVELS level, char* message);
/**
* This function sets the trace callback if needed. If set to NULL,
* no trace information will be returned. The default trace level is
* MQTTASYNC_TRACE_MINIMUM.
* @param callback a pointer to the function which will handle the trace information
*/
DLLExport void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback);
/** /**
* Returns a pointer to the string representation of the error or NULL. * Returns a pointer to the string representation of the error or NULL.
* *
......
...@@ -589,10 +589,16 @@ ADD_TEST( ...@@ -589,10 +589,16 @@ ADD_TEST(
COMMAND "test10" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test10" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test10-4-flow_control
COMMAND "test10" "--test_no" "4" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test10-1-client_topic_aliases test10-1-client_topic_aliases
test10-2-server_topic_aliases test10-2-server_topic_aliases
test10-3-subscription_ids test10-3-subscription_ids
test10-4-flow_control
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
...@@ -621,10 +627,16 @@ ADD_TEST( ...@@ -621,10 +627,16 @@ ADD_TEST(
COMMAND "test11" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test11" "--test_no" "3" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test11-4-flow_control
COMMAND "test11" "--test_no" "4" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test11-1-client_topic_aliases test11-1-client_topic_aliases
test11-2-server_topic_aliases test11-2-server_topic_aliases
test11-3-subscription_ids test11-3-subscription_ids
test11-4-flow_control
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
......
...@@ -333,7 +333,7 @@ void logProperties(MQTTProperties *props) ...@@ -333,7 +333,7 @@ void logProperties(MQTTProperties *props)
} }
} }
struct aa struct
{ {
int disconnected; int disconnected;
} test_topic_aliases_globals = } test_topic_aliases_globals =
...@@ -875,6 +875,133 @@ exit: ...@@ -875,6 +875,133 @@ exit:
} }
int test_flow_control_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
{
static int received = 0;
static int first_topic_alias = 0;
int topicAlias = 0;
received++;
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
messages_arrived++;
MQTTClient_free(topicName);
MQTTClient_freeMessage(&message);
return 1;
}
static int blocking_found = 0;
void test_flow_control_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{
static char* msg = "Blocking publish on queue full";
if (strstr(message, msg) != NULL)
blocking_found = 1;
}
int test_flow_control(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTProperties connect_props = MQTTProperties_initializer;
MQTTProperty property;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTResponse response = {SUCCESS, NULL};
MQTTClient_deliveryToken dt;
int rc = 0, i = 0, count = 0;
char* test_topic = "test_flow_control";
int receive_maximum = 65535;
fprintf(xml, "<testcase classname=\"test_flow_control\" name=\"flow control\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test - flow control");
//MQTTClient_setTraceCallback(test_flow_control_trace_callback);
rc = MQTTClient_create(&c, options.connection, "flow_control",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
goto exit;
rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.MQTTVersion = options.MQTTVersion;
opts.reliable = 0;
opts.maxInflightMessages = 100;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
MyLog(LOGA_DEBUG, "Connecting");
response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
if (response.reasonCode != MQTTCLIENT_SUCCESS)
goto exit;
if (response.properties)
{
if (MQTTProperties_hasProperty(response.properties, RECEIVE_MAXIMUM))
receive_maximum = MQTTProperties_getNumericValue(response.properties, RECEIVE_MAXIMUM);
logProperties(response.properties);
MQTTResponse_free(response);
}
response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
assert("Good rc from subscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 2;
for (i = 0; i < receive_maximum + 2; ++i)
{
response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
}
/* should get responses */
while (messages_arrived < receive_maximum + 2 && ++count < 10)
{
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("messages should have arrived", messages_arrived == receive_maximum + 2, "was %d", messages_arrived);
assert("should have blocked", blocking_found == 1, "was %d\n", blocking_found);
rc = MQTTClient_disconnect5(c, 1000, SUCCESS, NULL);
exit:
MQTTClient_setTraceCallback(NULL);
MQTTProperties_free(&pubmsg.properties);
MQTTProperties_free(&connect_props);
MQTTClient_destroy(&c);
MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0, int rc = 0,
...@@ -882,13 +1009,16 @@ int main(int argc, char** argv) ...@@ -882,13 +1009,16 @@ int main(int argc, char** argv)
int (*tests[])() = {NULL, int (*tests[])() = {NULL,
test_client_topic_aliases, test_client_topic_aliases,
test_server_topic_aliases, test_server_topic_aliases,
test_subscription_ids}; test_subscription_ids,
test_flow_control,
};
xml = fopen("TEST-test1.xml", "w"); xml = fopen("TEST-test1.xml", "w");
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
setenv("MQTT_C_CLIENT_TRACE", "ON", 1); //setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1); //setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1);
MQTTClient_setTraceCallback(test_flow_control_trace_callback);
getopts(argc, argv); getopts(argc, argv);
......
...@@ -349,7 +349,7 @@ void test_client_topic_aliases_onSubscribe(void* context, MQTTAsync_successData5 ...@@ -349,7 +349,7 @@ void test_client_topic_aliases_onSubscribe(void* context, MQTTAsync_successData5
int rc; int rc;
MyLog(LOGA_INFO, "Suback properties:"); MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->props); logProperties(&response->properties);
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11"; pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11; pubmsg.payloadlen = 11;
...@@ -401,7 +401,7 @@ void test_client_topic_aliases_onConnect(void* context, MQTTAsync_successData5* ...@@ -401,7 +401,7 @@ void test_client_topic_aliases_onConnect(void* context, MQTTAsync_successData5*
"Reason code was %d\n", response->reasonCode); "Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:"); MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->props); logProperties(&response->properties);
if (first) if (first)
{ {
...@@ -606,7 +606,7 @@ void test_server_topic_aliases_onSubscribe(void* context, MQTTAsync_successData5 ...@@ -606,7 +606,7 @@ void test_server_topic_aliases_onSubscribe(void* context, MQTTAsync_successData5
int qos = 0, rc; int qos = 0, rc;
MyLog(LOGA_INFO, "Suback properties:"); MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->props); logProperties(&response->properties);
test_server_topic_aliases_globals.messages_arrived = 0; test_server_topic_aliases_globals.messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11"; pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
...@@ -639,7 +639,7 @@ void test_server_topic_aliases_onConnect(void* context, MQTTAsync_successData5* ...@@ -639,7 +639,7 @@ void test_server_topic_aliases_onConnect(void* context, MQTTAsync_successData5*
"Reason code was %d\n", response->reasonCode); "Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:"); MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->props); logProperties(&response->properties);
opts.onSuccess5 = test_server_topic_aliases_onSubscribe; opts.onSuccess5 = test_server_topic_aliases_onSubscribe;
opts.context = c; opts.context = c;
...@@ -779,7 +779,7 @@ void test_subscription_ids_onSubscribe(void* context, MQTTAsync_successData5* re ...@@ -779,7 +779,7 @@ void test_subscription_ids_onSubscribe(void* context, MQTTAsync_successData5* re
static int subs_count = 0; static int subs_count = 0;
MyLog(LOGA_INFO, "Suback properties:"); MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->props); logProperties(&response->properties);
if (++subs_count == 1) if (++subs_count == 1)
{ {
...@@ -826,7 +826,7 @@ void test_subscription_ids_onConnect(void* context, MQTTAsync_successData5* resp ...@@ -826,7 +826,7 @@ void test_subscription_ids_onConnect(void* context, MQTTAsync_successData5* resp
"Reason code was %d\n", response->reasonCode); "Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:"); MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->props); logProperties(&response->properties);
opts.onSuccess5 = test_subscription_ids_onSubscribe; opts.onSuccess5 = test_subscription_ids_onSubscribe;
opts.context = c; opts.context = c;
...@@ -893,6 +893,169 @@ exit: ...@@ -893,6 +893,169 @@ exit:
return failures; return failures;
} }
/*********************************************************************
Test: flow control
*********************************************************************/
struct
{
char* test_topic;
int test_finished;
int messages_arrived;
int receive_maximum;
int blocking_found;
} test_flow_control_globals =
{
"flow control topic", 0, 0, 65535, 0
};
int test_flow_control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
test_flow_control_globals.messages_arrived++;
assert("Message structure version should be 1", message->struct_version == 1,
"message->struct_version was %d", message->struct_version);
MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
topicName, message->payloadlen, (char*)(message->payload));
if (message->struct_version == 1)
logProperties(&message->properties);
if (test_flow_control_globals.messages_arrived == test_flow_control_globals.receive_maximum + 2)
test_flow_control_globals.test_finished = 1;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test_flow_control_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
int i = 0;
MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->properties);
test_flow_control_globals.messages_arrived = 0;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.retained = 0;
pubmsg.qos = 2;
for (i = 0; i < test_flow_control_globals.receive_maximum + 2; ++i)
{
rc = MQTTAsync_sendMessage(c, test_flow_control_globals.test_topic, &pubmsg, &opts);
assert("Good rc from send", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
test_flow_control_globals.test_finished = 1;
}
}
void test_flow_control_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->properties);
if (MQTTProperties_hasProperty(&response->properties, RECEIVE_MAXIMUM))
test_flow_control_globals.receive_maximum = MQTTProperties_getNumericValue(&response->properties, RECEIVE_MAXIMUM);
opts.onSuccess5 = test_flow_control_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test_flow_control_globals.test_topic, 2, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_flow_control_globals.test_finished = 1;
MQTTProperties_free(&opts.properties);
}
void flow_control_trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
static char* msg = "Blocking on server receive maximum";
if (strstr(message, msg) != NULL)
test_flow_control_globals.blocking_found = 1;
}
int test_flow_control(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MyLog(LOGA_INFO, "Starting V5 test - flow control");
fprintf(xml, "<testcase classname=\"test11\" name=\"flow control\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "flow_control",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
MQTTAsync_setTraceCallback(flow_control_trace_callback);
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_MINIMUM);
rc = MQTTAsync_setCallbacks(c, c, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_flow_control_onConnect;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_flow_control_globals.test_finished == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
assert("should have blocked", test_flow_control_globals.blocking_found == 1, "was %d\n",
test_flow_control_globals.blocking_found);
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST4: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
...@@ -907,6 +1070,7 @@ int main(int argc, char** argv) ...@@ -907,6 +1070,7 @@ int main(int argc, char** argv)
test_client_topic_aliases, test_client_topic_aliases,
test_server_topic_aliases, test_server_topic_aliases,
test_subscription_ids, test_subscription_ids,
test_flow_control,
}; /* indexed starting from 1 */ }; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
......
...@@ -315,7 +315,7 @@ void test1_onUnsubscribe(void* context, MQTTAsync_successData5* response) ...@@ -315,7 +315,7 @@ void test1_onUnsubscribe(void* context, MQTTAsync_successData5* response)
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c); MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
MyLog(LOGA_INFO, "Unsuback properties:"); MyLog(LOGA_INFO, "Unsuback properties:");
logProperties(&response->props); logProperties(&response->properties);
opts.onSuccess = test1_onDisconnect; opts.onSuccess = test1_onDisconnect;
opts.context = c; opts.context = c;
...@@ -410,7 +410,7 @@ void test1_onSubscribe(void* context, MQTTAsync_successData5* response) ...@@ -410,7 +410,7 @@ void test1_onSubscribe(void* context, MQTTAsync_successData5* response)
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos); MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
MyLog(LOGA_INFO, "Suback properties:"); MyLog(LOGA_INFO, "Suback properties:");
logProperties(&response->props); logProperties(&response->properties);
property.identifier = USER_PROPERTY; property.identifier = USER_PROPERTY;
property.value.data.data = "test user property"; property.value.data.data = "test user property";
...@@ -444,7 +444,7 @@ void test1_onConnect(void* context, MQTTAsync_successData5* response) ...@@ -444,7 +444,7 @@ void test1_onConnect(void* context, MQTTAsync_successData5* response)
"Reason code was %d\n", response->reasonCode); "Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:"); MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->props); logProperties(&response->properties);
opts.onSuccess5 = test1_onSubscribe; opts.onSuccess5 = test1_onSubscribe;
opts.context = c; opts.context = c;
...@@ -967,7 +967,7 @@ void test4_onConnect(void* context, MQTTAsync_successData5* response) ...@@ -967,7 +967,7 @@ void test4_onConnect(void* context, MQTTAsync_successData5* response)
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer; MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
int rc; int rc;
test4_packet_size = MQTTProperties_getNumericValue(&response->props, MAXIMUM_PACKET_SIZE); test4_packet_size = MQTTProperties_getNumericValue(&response->properties, MAXIMUM_PACKET_SIZE);
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context); MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess5 = test4_onSubscribe; opts.onSuccess5 = test4_onSubscribe;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment