Commit ced0d0ef authored by Ian Craggs's avatar Ian Craggs

Make all tests in test45.c work with MQTT 5

parent 167278c0
......@@ -1007,6 +1007,11 @@ static void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* comma
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, NULL);
}
else if (command->onSuccess5)
{
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
(*(command->onSuccess5))(command->context, NULL);
}
}
FUNC_EXIT;
}
......@@ -1141,28 +1146,58 @@ static void MQTTAsync_writeComplete(int socket, int rc)
{
if (command->type == PUBLISH)
{
if (rc == 1 && command->onSuccess)
if (rc == 1)
{
MQTTAsync_successData data;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data);
if (command->onSuccess)
{
MQTTAsync_successData data;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data);
}
else if (command->onSuccess5)
{
MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
data.props = command->properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess5))(command->context, &data);
}
}
else if (rc == -1 && command->onFailure)
else if (rc == -1)
{
MQTTAsync_failureData data;
if (command->onFailure)
{
MQTTAsync_failureData data;
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure))(command->context, &data);
}
else if (command->onFailure5)
{
MQTTAsync_failureData5 data;
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure))(command->context, &data);
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure5))(command->context, &data);
}
}
}
if (com)
......@@ -1269,10 +1304,10 @@ static int MQTTAsync_processCommand(void)
Log(TRACE_PROTOCOL, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion,
NULL, NULL);
command->client->connectProps, command->client->willProps);
#else
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion,
NULL, NULL);
command->client->connectProps, command->client->willProps);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
......@@ -1358,6 +1393,20 @@ static int MQTTAsync_processCommand(void)
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
(*(command->command.onSuccess))(command->command.context, &data);
}
else if (command->command.onSuccess5)
{
MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload;
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
data.props = command->command.properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data);
}
}
else
{
......@@ -1382,11 +1431,21 @@ static int MQTTAsync_processCommand(void)
MQTTAsync_failureData data;
data.token = 0;
data.code = -2;
data.code = MQTTASYNC_OPERATION_INCOMPLETE;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
(*(command->client->connect.onFailure))(command->client->connect.context, &data);
}
else if (command->client->connect.onFailure5)
{
MQTTAsync_failureData5 data;
data.token = 0;
data.code = MQTTASYNC_OPERATION_INCOMPLETE;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
(*(command->client->connect.onFailure5))(command->client->connect.context, &data);
}
}
MQTTAsync_checkDisconnect(command->client, &command->command);
}
......@@ -1446,6 +1505,11 @@ static int MQTTAsync_processCommand(void)
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
(*(command->command.onFailure))(command->command.context, NULL);
}
else if (command->command.onFailure5)
{
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
(*(command->command.onFailure5))(command->command.context, NULL);
}
if (command->command.type == CONNECT)
{
command->client->connect = command->command;
......@@ -1505,6 +1569,16 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
else if (m->connect.onFailure5)
{
MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
data.token = 0;
data.code = rc;
data.message = message;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure5))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
}
......@@ -1558,6 +1632,12 @@ static void MQTTAsync_checkTimeouts(void)
MQTTPacket_name(com->command.type), m->c->clientID);
(*(com->command.onFailure))(com->command.context, NULL);
}
else if (com->command.onFailure5)
{
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(com->command.type), m->c->clientID);
(*(com->command.onFailure5))(com->command.context, NULL);
}
timed_out_count++;
}
}
......@@ -1671,6 +1751,18 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure))(command->command.context, &data);
}
else if (command->command.onFailure5)
{
MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
data.token = command->command.token;
data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
data.message = NULL;
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure5))(command->command.context, &data);
}
MQTTAsync_freeCommand1(command);
count++;
......@@ -1703,6 +1795,18 @@ static void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure))(command->command.context, &data);
}
else if (command->command.onFailure5)
{
MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
data.token = command->command.token;
data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
data.message = NULL;
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure5))(command->command.context, &data);
}
MQTTAsync_freeCommand(command);
count++;
......@@ -3283,6 +3387,20 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->command.onSuccess))(command->command.context, &data);
}
else if (command->command.onSuccess5)
{
MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload;
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
data.props = command->command.properties;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data);
}
MQTTAsync_freeCommand(command);
break;
}
......
......@@ -416,11 +416,13 @@ typedef struct
enum MQTTReasonCodes reasonCode;
/** The MQTT properties on the ack, if any. */
MQTTProperties properties;
/** A numeric code identifying the MQTT client library error. */
int code;
/** Optional further text explaining the error. Can be NULL. */
const char *message;
} MQTTAsync_failureData5;
#define MQTTAsync_failureData5_initializer {{'M', 'Q', 'F', 'D'}, 0, 0, SUCCESS, MQTTProperties_initializer, NULL}
#define MQTTAsync_failureData5_initializer {{'M', 'Q', 'F', 'D'}, 0, 0, SUCCESS, MQTTProperties_initializer, 0, NULL}
/** The data returned on completion of a successful API call in the response callback onSuccess. */
typedef struct
......@@ -959,6 +961,10 @@ typedef struct
int len; /**< binary password length */
const void* data; /**< binary password data */
} binarypwd;
/*
* MQTT V5 clean start flag. Only clears state at the beginning of the session.
*/
int cleanstart;
/**
* MQTT V5 properties for connect
*/
......@@ -983,7 +989,7 @@ typedef struct
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 10, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, 0, 0, 1, 60, {0, NULL}, NULL, NULL, NULL, NULL}
NULL, NULL, NULL, NULL, 0, NULL, 0, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}
/**
* This function attempts to connect a previously-created client (see
......@@ -1179,9 +1185,6 @@ DLLExport int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const
DLLExport int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, void* payload, int qos,
int retained, MQTTAsync_responseOptions* response);
DLLExport int MQTTAsync_send5(MQTTAsync handle, const char* destinationName, int payloadlen, void* payload, int qos,
int retained, MQTTProperties* props, MQTTAsync_responseOptions* response);
/**
* This function attempts to publish a message to a given topic (see also
......
......@@ -66,7 +66,7 @@ int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
if (MQTTVersion >= MQTTVERSION_5)
{
len += MQTTProperties_len(connectProperties);
if (client->will && willProperties)
if (client->will)
len += MQTTProperties_len(willProperties);
}
......
......@@ -289,8 +289,50 @@ ADD_TEST(
COMMAND test45 "--test_no" "1" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-2-connect-timeout
COMMAND test45 "--test_no" "2" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-3-multiple-client-objs-simultaneous-working
COMMAND test45 "--test_no" "3" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-4-send-receive-big-messages
COMMAND test45 "--test_no" "4" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-5-connack-return-codes
COMMAND test45 "--test_no" "5" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-6-ha-connections
COMMAND test45 "--test_no" "6" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-7-pending-tokens
COMMAND test45 "--test_no" "7" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test45-8-incomplete-commands-requests
COMMAND test45 "--test_no" "8" "--connection" ${MQTT_TEST_BROKER}
)
SET_TESTS_PROPERTIES(
test45-1-basic-connect-subscribe-receive
test45-2-connect-timeout
test45-3-multiple-client-objs-simultaneous-working
test45-4-send-receive-big-messages
test45-5-connack-return-codes
test45-6-ha-connections
test45-7-pending-tokens
test45-8-incomplete-commands-requests
PROPERTIES TIMEOUT 540
)
......
......@@ -288,6 +288,44 @@ void logProperties(MQTTProperties *props)
}
}
int getNumericPropertyValue(MQTTProperties *props, int propid)
{
int i = 0;
int rc = 0;
for (i = 0; i < props->count; ++i)
{
int id = props->array[i].identifier;
const char* name = MQTTPropertyName(id);
char* intformat = "Got property name %s value %d";
if (id == propid)
{
switch (MQTTProperty_getType(id))
{
case PROPERTY_TYPE_BYTE:
rc = props->array[i].value.byte;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.byte);
break;
case TWO_BYTE_INTEGER:
rc = props->array[i].value.integer2;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer2);
break;
case FOUR_BYTE_INTEGER:
case VARIABLE_BYTE_INTEGER:
rc = props->array[i].value.integer4;
MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
break;
default:
rc = -999999;
break;
}
}
}
return rc;
}
volatile int test_finished = 0;
char* test_topic = "async test topic";
......@@ -505,7 +543,7 @@ int test1(struct Options options)
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess5 = test1_onConnect;
opts.onFailure = NULL;
opts.onFailure5 = NULL;
opts.context = c;
property.identifier = SESSION_EXPIRY_INTERVAL;
......@@ -524,7 +562,6 @@ int test1(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -549,7 +586,7 @@ exit:
int test2_onFailure_called = 0;
void test2_onFailure(void* context, MQTTAsync_failureData* response)
void test2_onFailure(void* context, MQTTAsync_failureData5* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
......@@ -558,7 +595,7 @@ void test2_onFailure(void* context, MQTTAsync_failureData* response)
}
void test2_onConnect(void* context, MQTTAsync_successData* response)
void test2_onConnect(void* context, MQTTAsync_successData5* response)
{
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
......@@ -614,8 +651,8 @@ int test2(struct Options options)
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test2_onConnect;
opts.onFailure = test2_onFailure;
opts.onSuccess5 = test2_onConnect;
opts.onFailure5 = test2_onFailure;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
......@@ -655,7 +692,7 @@ typedef struct
} client_data;
void test3_onDisconnect(void* context, MQTTAsync_successData* response)
void test3_onDisconnect(void* context, MQTTAsync_successData5* response)
{
client_data* cd = (client_data*)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback for client \"%s\"", cd->clientid);
......@@ -663,21 +700,21 @@ void test3_onDisconnect(void* context, MQTTAsync_successData* response)
}
void test3_onPublish(void* context, MQTTAsync_successData* response)
void test3_onPublish(void* context, MQTTAsync_successData5* response)
{
client_data* cd = (client_data*)context;
MyLog(LOGA_DEBUG, "In QoS 0 onPublish callback for client \"%s\"", cd->clientid);
}
void test3_onUnsubscribe(void* context, MQTTAsync_successData* response)
void test3_onUnsubscribe(void* context, MQTTAsync_successData5* response)
{
client_data* cd = (client_data*)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback \"%s\"", cd->clientid);
opts.onSuccess = test3_onDisconnect;
opts.onSuccess5 = test3_onDisconnect;
opts.context = cd;
rc = MQTTAsync_disconnect(cd->c, &opts);
......@@ -712,7 +749,7 @@ int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
pubmsg.qos = 0;
pubmsg.retained = 0;
opts.context = cd;
opts.onSuccess = test3_onPublish;
opts.onSuccess5 = test3_onPublish;
rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -721,7 +758,7 @@ int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = test3_onUnsubscribe;
opts.onSuccess5 = test3_onUnsubscribe;
opts.context = cd;
rc = MQTTAsync_unsubscribe(cd->c, cd->test_topic, &opts);
assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -731,7 +768,7 @@ int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
return 1;
}
void test3_onSubscribe(void* context, MQTTAsync_successData* response)
void test3_onSubscribe(void* context, MQTTAsync_successData5* response)
{
client_data* cd = (client_data*)context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
......@@ -749,14 +786,14 @@ void test3_onSubscribe(void* context, MQTTAsync_successData* response)
}
void test3_onConnect(void* context, MQTTAsync_successData* response)
void test3_onConnect(void* context, MQTTAsync_successData5* response)
{
client_data* cd = (client_data*)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, \"%s\"", cd->clientid);
opts.onSuccess = test3_onSubscribe;
opts.onSuccess5 = test3_onSubscribe;
opts.context = cd;
rc = MQTTAsync_subscribe(cd->c, cd->test_topic, 2, &opts);
......@@ -766,13 +803,13 @@ void test3_onConnect(void* context, MQTTAsync_successData* response)
}
void test3_onFailure(void* context, MQTTAsync_failureData* response)
void test3_onFailure(void* context, MQTTAsync_failureData5* response)
{
client_data* cd = (client_data*)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
assert("Should have connected", 0, "%s failed to connect\n", cd->clientid);
MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response ? response->code : -999);
MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response ? response->reasonCode : -999);
if (response && response->message)
MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\"\n", response->message);
......@@ -825,8 +862,8 @@ int test3(struct Options options)
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.onSuccess = test3_onConnect;
opts.onFailure = test3_onFailure;
opts.onSuccess5 = test3_onConnect;
opts.onFailure5 = test3_onFailure;
opts.context = &clientdata[i];
MyLog(LOGA_DEBUG, "Connecting");
......@@ -860,7 +897,7 @@ int test3(struct Options options)
void* test4_payload = NULL;
int test4_payloadlen = 0;
void test4_onPublish(void* context, MQTTAsync_successData* response)
void test4_onPublish(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
......@@ -891,13 +928,13 @@ int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
if (++message_count == 1)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
pubmsg.payload = test4_payload;
pubmsg.payloadlen = test4_payloadlen;
pubmsg.qos = 1;
pubmsg.retained = 0;
opts.onSuccess = test4_onPublish;
opts.onSuccess5 = test4_onPublish;
opts.context = c;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
......@@ -905,19 +942,19 @@ int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
else if (message_count == 2)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
pubmsg.payload = test4_payload;
pubmsg.payloadlen = test4_payloadlen;
pubmsg.qos = 0;
pubmsg.retained = 0;
opts.onSuccess = test4_onPublish;
opts.onSuccess5 = test4_onPublish;
opts.context = c;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
}
else
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
opts.onSuccess5 = test1_onUnsubscribe;
opts.context = c;
......@@ -931,20 +968,27 @@ int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
return 1;
}
int test4_packet_size = 10000;
void test4_onSubscribe(void* context, MQTTAsync_successData* response)
#if !defined(min)
#define min(a, b) ((a < b) ? a : b)
#endif
void test4_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc, i;
int max_packet_size = min(test4_packet_size, options.size);
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", c);
pubmsg.payload = test4_payload = malloc(options.size);
pubmsg.payloadlen = test4_payloadlen = options.size;
MyLog(LOGA_INFO, "Max packet size %d", max_packet_size);
srand(33);
for (i = 0; i < options.size; ++i)
for (i = 0; i < (max_packet_size-100); ++i)
((char*)pubmsg.payload)[i] = rand() % 256;
pubmsg.qos = 2;
......@@ -954,14 +998,16 @@ void test4_onSubscribe(void* context, MQTTAsync_successData* response)
}
void test4_onConnect(void* context, MQTTAsync_successData* response)
void test4_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
int rc;
test4_packet_size = getNumericPropertyValue(&response->props, MAXIMUM_PACKET_SIZE);
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test4_onSubscribe;
opts.onSuccess5 = test4_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
......@@ -1014,13 +1060,12 @@ int test4(struct Options options)
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onSuccess = test4_onConnect;
opts.onFailure = NULL;
opts.onSuccess5 = test4_onConnect;
opts.onFailure5 = NULL;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
......@@ -1042,20 +1087,20 @@ exit:
}
void test5_onConnectFailure(void* context, MQTTAsync_failureData* response)
void test5_onConnectFailure(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
MyLog(LOGA_INFO, "Connack rc is %d", response ? response->code : -999);
MyLog(LOGA_INFO, "Connack rc is %d", response ? response->reasonCode : -999);
test_finished = 1;
}
void test5_onConnect(void* context, MQTTAsync_successData* response)
void test5_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
......@@ -1097,8 +1142,8 @@ int test5(struct Options options)
rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.onSuccess = test5_onConnect;
opts.onFailure = test5_onConnectFailure;
opts.onSuccess5 = test5_onConnect;
opts.onFailure5 = test5_onConnectFailure;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
......@@ -1131,14 +1176,14 @@ typedef struct
int should_fail;
} test6_client_info;
void test6_onConnectFailure(void* context, MQTTAsync_failureData* response)
void test6_onConnectFailure(void* context, MQTTAsync_failureData5* response)
{
test6_client_info cinfo = *(test6_client_info*)context;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
if (response)
MyLog(LOGA_INFO, "Connack rc is %d", response->code);
MyLog(LOGA_INFO, "Connack rc is %d", response->reasonCode);
assert("Should fail to connect", cinfo.should_fail, "should_fail was %d", cinfo.should_fail);
......@@ -1146,7 +1191,7 @@ void test6_onConnectFailure(void* context, MQTTAsync_failureData* response)
}
void test6_onConnect(void* context, MQTTAsync_successData* response)
void test6_onConnect(void* context, MQTTAsync_successData5* response)
{
test6_client_info cinfo = *(test6_client_info*)context;
......@@ -1192,8 +1237,8 @@ int test6(struct Options options)
rc = MQTTAsync_setCallbacks(cinfo.c, cinfo.c, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.onSuccess = test6_onConnect;
opts.onFailure = test6_onConnectFailure;
opts.onSuccess5 = test6_onConnect;
opts.onFailure5 = test6_onConnectFailure;
opts.context = &cinfo;
opts.MQTTVersion = options.MQTTVersion;
......@@ -1225,8 +1270,8 @@ int test6(struct Options options)
rc = MQTTAsync_setCallbacks(cinfo.c, cinfo.c, NULL, test1_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.onSuccess = test6_onConnect;
opts.onFailure = test6_onConnectFailure;
opts.onSuccess5 = test6_onConnect;
opts.onFailure5 = test6_onConnectFailure;
opts.context = &cinfo;
opts.serverURIs = uris;
opts.serverURIcount = 2;
......@@ -1265,7 +1310,7 @@ Test7: Persistence
char* test7_topic = "C client test7";
int test7_messageCount = 0;
void test7_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
void test7_onDisconnectFailure(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
......@@ -1275,7 +1320,7 @@ void test7_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
test_finished = 1;
}
void test7_onDisconnect(void* context, MQTTAsync_successData* response)
void test7_onDisconnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
......@@ -1283,14 +1328,14 @@ void test7_onDisconnect(void* context, MQTTAsync_successData* response)
}
void test7_onUnsubscribe(void* context, MQTTAsync_successData* response)
void test7_onUnsubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
opts.onSuccess = test7_onDisconnect;
opts.onSuccess5 = test7_onDisconnect;
opts.context = c;
rc = MQTTAsync_disconnect(c, &opts);
......@@ -1316,7 +1361,7 @@ int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
static int test7_subscribed = 0;
void test7_onSubscribe(void* context, MQTTAsync_successData* response)
void test7_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
......@@ -1326,14 +1371,14 @@ void test7_onSubscribe(void* context, MQTTAsync_successData* response)
}
void test7_onConnect(void* context, MQTTAsync_successData* response)
void test7_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test7_onSubscribe;
opts.onSuccess5 = test7_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test7_topic, 2, &opts);
......@@ -1343,7 +1388,7 @@ void test7_onConnect(void* context, MQTTAsync_successData* response)
}
void test7_onConnectOnly(void* context, MQTTAsync_successData* response)
void test7_onConnectOnly(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
......@@ -1352,7 +1397,7 @@ void test7_onConnectOnly(void* context, MQTTAsync_successData* response)
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
dopts.context = context;
dopts.timeout = 1000;
dopts.onSuccess = test7_onDisconnect;
dopts.onSuccess5 = test7_onDisconnect;
rc = MQTTAsync_disconnect(c, &dopts);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1378,6 +1423,8 @@ int test7(struct Options options)
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_token* tokens = NULL;
int msg_count = 6;
MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;
MyLog(LOGA_INFO, "Starting test 7 - pending tokens");
fprintf(xml, "<testcase classname=\"test4\" name=\"pending tokens\"");
......@@ -1408,11 +1455,12 @@ int test7(struct Options options)
opts.will->topicName = "will topic";
opts.will = NULL;
opts.onFailure = NULL;
opts.onFailure5 = NULL;
opts.context = c;
/* connect to clean up state only */
opts.cleansession = 1;
opts.onSuccess = test7_onConnectOnly;
opts.onSuccess5 = test7_onConnectOnly;
MyLog(LOGA_DEBUG, "Connecting to clean up");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
......@@ -1427,10 +1475,15 @@ int test7(struct Options options)
usleep(10000L);
#endif
/* now connect and leave messages lying around */
test_finished = 0;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.onSuccess = test7_onConnect;
opts.connectProperties = &props;
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 999999;
MQTTProperties_add(opts.connectProperties, &property);
opts.onSuccess5 = test7_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1448,7 +1501,7 @@ int test7(struct Options options)
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &ropts);
rc = MQTTAsync_send(c, test7_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, &ropts);
MyLog(LOGA_DEBUG, "Token was %d", ropts.token);
rc = MQTTAsync_isComplete(c, ropts.token);
/*assert("0 rc from isComplete", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);*/
......@@ -1466,11 +1519,11 @@ int test7(struct Options options)
pubmsg.payloadlen = 11;
//pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &ropts);
rc = MQTTAsync_sendMessage(c, test7_topic, &pubmsg, &ropts);
}
/* disconnect immediately without receiving the incoming messages */
dopts.timeout = 0;
dopts.onSuccess = test7_onDisconnect;
dopts.onSuccess5 = test7_onDisconnect;
dopts.context = c;
MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */
......@@ -1487,6 +1540,7 @@ int test7(struct Options options)
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
MQTTAsync_free(tokens);
MQTTProperties_free(opts.connectProperties);
MQTTAsync_destroy(&c); /* force re-reading persistence on create */
......@@ -1509,8 +1563,7 @@ int test7(struct Options options)
while (tokens[i] != -1)
MyLog(LOGA_DEBUG, "Delivery token %d", tokens[i++]);
MQTTAsync_free(tokens);
//The following assertion should work, does with RSMB, but not Mosquitto
//assert1("no of tokens should be count", i == msg_count, "no of tokens %d count %d", i, msg_count);
assert("no of tokens should be > 0", i > 0, "no of tokens %d", i);
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
......@@ -1518,6 +1571,7 @@ int test7(struct Options options)
MyLog(LOGA_DEBUG, "Reconnecting");
opts.context = c;
opts.cleansession = 0;
if (MQTTAsync_connect(c, &opts) != 0)
{
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1532,17 +1586,18 @@ int test7(struct Options options)
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
assert1("no of messages should be count", test7_messageCount == msg_count, "no of tokens %d count %d",
test7_messageCount, msg_count);
/* following assertions fail against Mosquitto - needs testing */
assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
assertions fail against Mosquitto - needs testing */
assert("no of messages should be count", test7_messageCount == msg_count, "messages received %d\n",
test7_messageCount);
dopts.onFailure = test7_onDisconnectFailure;
dopts.onSuccess = test7_onDisconnect;
dopts.onFailure5 = test7_onDisconnectFailure;
dopts.onSuccess5 = test7_onDisconnect;
dopts.timeout = 1000;
MQTTAsync_disconnect(c, &dopts);
rc = MQTTAsync_disconnect(c, &dopts);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
while (!test_finished)
#if defined(WIN32)
......@@ -1573,7 +1628,7 @@ int test8_messageCount = 0;
int test8_subscribed = 0;
int test8_publishFailures = 0;
void test8_onPublish(void* context, MQTTAsync_successData* response)
void test8_onPublish(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
......@@ -1581,7 +1636,7 @@ void test8_onPublish(void* context, MQTTAsync_successData* response)
}
void test8_onPublishFailure(void* context, MQTTAsync_failureData* response)
void test8_onPublishFailure(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onPublish failure callback %p", c);
......@@ -1593,7 +1648,7 @@ void test8_onPublishFailure(void* context, MQTTAsync_failureData* response)
}
void test8_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
void test8_onDisconnectFailure(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
......@@ -1604,7 +1659,7 @@ void test8_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
}
void test8_onDisconnect(void* context, MQTTAsync_successData* response)
void test8_onDisconnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
......@@ -1612,7 +1667,7 @@ void test8_onDisconnect(void* context, MQTTAsync_successData* response)
}
void test8_onSubscribe(void* context, MQTTAsync_successData* response)
void test8_onSubscribe(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
......@@ -1622,14 +1677,14 @@ void test8_onSubscribe(void* context, MQTTAsync_successData* response)
}
void test8_onConnect(void* context, MQTTAsync_successData* response)
void test8_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test8_onSubscribe;
opts.onSuccess5 = test8_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test8_topic, 2, &opts);
......@@ -1693,7 +1748,7 @@ int test8(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 1;
opts.onSuccess = test8_onConnect;
opts.onSuccess5 = test8_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1709,8 +1764,8 @@ int test8(struct Options options)
int i = 0;
pubmsg.qos = 2;
ropts.onSuccess = test8_onPublish;
ropts.onFailure = test8_onPublishFailure;
ropts.onSuccess5 = test8_onPublish;
ropts.onFailure5 = test8_onPublishFailure;
ropts.context = c;
for (i = 0; i < msg_count; ++i)
{
......@@ -1723,7 +1778,7 @@ int test8(struct Options options)
}
/* disconnect immediately without completing the commands */
dopts.timeout = 0;
dopts.onSuccess = test8_onDisconnect;
dopts.onSuccess5 = test8_onDisconnect;
dopts.context = c;
rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1749,7 +1804,7 @@ int test8(struct Options options)
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.onSuccess = test8_onConnect;
opts.onSuccess5 = test8_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......@@ -1765,8 +1820,8 @@ int test8(struct Options options)
i = 0;
pubmsg.qos = 2;
ropts.onSuccess = test8_onPublish;
ropts.onFailure = test8_onPublishFailure;
ropts.onSuccess5 = test8_onPublish;
ropts.onFailure5 = test8_onPublishFailure;
ropts.context = c;
for (i = 0; i < msg_count; ++i)
{
......@@ -1779,7 +1834,7 @@ int test8(struct Options options)
}
/* disconnect immediately without completing the commands */
dopts.timeout = 0;
dopts.onSuccess = test8_onDisconnect;
dopts.onSuccess5 = test8_onDisconnect;
dopts.context = c;
rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment