Commit d57c84b5 authored by Ian Craggs's avatar Ian Craggs

Add QoS 2 error reporting #466

parent 9bc5fbc4
...@@ -1170,7 +1170,7 @@ static void MQTTAsync_writeComplete(int socket, int rc) ...@@ -1170,7 +1170,7 @@ static void MQTTAsync_writeComplete(int socket, int rc)
{ {
if (command->type == PUBLISH) if (command->type == PUBLISH)
{ {
if (rc == 1) if (rc == 1 && command->details.pub.qos == 0)
{ {
if (command->onSuccess) if (command->onSuccess)
{ {
...@@ -3523,17 +3523,32 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3523,17 +3523,32 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
/* Note that these handle... functions free the packet structure that they are dealing with */ /* Note that these handle... functions free the packet structure that they are dealing with */
if (pack->header.bits.type == PUBLISH) if (pack->header.bits.type == PUBLISH)
*rc = MQTTProtocol_handlePublishes(pack, *sock); *rc = MQTTProtocol_handlePublishes(pack, *sock);
else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP) else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP ||
pack->header.bits.type == PUBREC)
{ {
int msgid; int msgid;
ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack; ack = *(Ack*)pack;
msgid = ack.msgId; msgid = ack.msgId;
*rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock); if (pack->header.bits.type == PUBCOMP)
{
//ack = *(Pubcomp*)pack;
*rc = MQTTProtocol_handlePubcomps(pack, *sock);
}
else if (pack->header.bits.type == PUBREC)
{
//ack = *(Pubrec*)pack;
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
}
else if (pack->header.bits.type == PUBACK)
{
//ack = *(Puback*)pack;
*rc = MQTTProtocol_handlePubacks(pack, *sock);
}
if (!m) if (!m)
Log(LOG_ERROR, -1, "PUBCOMP or PUBACK received for no client, msgid %d", msgid); Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
if (m) if (m && (pack->header.bits.type != PUBREC || ack.rc >= UNSPECIFIED_ERROR))
{ {
ListElement* current = NULL; ListElement* current = NULL;
...@@ -3563,7 +3578,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3563,7 +3578,7 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->command.onSuccess))(command->command.context, &data); (*(command->command.onSuccess))(command->command.context, &data);
} }
else if (command->command.onSuccess5) else if (command->command.onSuccess5 && ack.rc < UNSPECIFIED_ERROR)
{ {
MQTTAsync_successData5 data = MQTTAsync_successData5_initializer; MQTTAsync_successData5 data = MQTTAsync_successData5_initializer;
...@@ -3577,14 +3592,22 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -3577,14 +3592,22 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->command.onSuccess5))(command->command.context, &data); (*(command->command.onSuccess5))(command->command.context, &data);
} }
else if (command->command.onFailure5 && ack.rc >= UNSPECIFIED_ERROR)
{
MQTTAsync_failureData5 data = MQTTAsync_failureData5_initializer;
data.token = command->command.token;
data.reasonCode = ack.rc;
data.properties = ack.properties;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->command.onFailure5))(command->command.context, &data);
}
MQTTAsync_freeCommand(command); MQTTAsync_freeCommand(command);
break; break;
} }
} }
} }
} }
else if (pack->header.bits.type == PUBREC)
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
else if (pack->header.bits.type == PUBREL) else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock); *rc = MQTTProtocol_handlePubrels(pack, *sock);
else if (pack->header.bits.type == PINGRESP) else if (pack->header.bits.type == PINGRESP)
......
...@@ -420,6 +420,11 @@ typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties, ...@@ -420,6 +420,11 @@ typedef void MQTTAsync_disconnected(void* context, MQTTProperties* properties,
*/ */
DLLExport int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* co); DLLExport int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* co);
typedef void MQTTAsync_published(void* context, int msgid, int packet_type, MQTTProperties* properties,
enum MQTTReasonCodes reasonCode);
DLLExport int MQTTAsync_setPublished(MQTTAsync handle, void* context, MQTTAsync_published* co);
/** The data returned on completion of an unsuccessful API call in the response callback onFailure. */ /** The data returned on completion of an unsuccessful API call in the response callback onFailure. */
typedef struct typedef struct
......
...@@ -208,8 +208,13 @@ typedef struct ...@@ -208,8 +208,13 @@ typedef struct
MQTTClient_disconnected* disconnected; MQTTClient_disconnected* disconnected;
void* disconnected_context; /* the context to be associated with the disconnected callback*/ void* disconnected_context; /* the context to be associated with the disconnected callback*/
MQTTClient_published* published;
void* published_context; /* the context to be associated with the disconnected callback*/
#if 0
MQTTClient_authHandle* auth_handle; MQTTClient_authHandle* auth_handle;
void* auth_handle_context; /* the context to be associated with the authHandle callback*/ void* auth_handle_context; /* the context to be associated with the authHandle callback*/
#endif
sem_type connect_sem; sem_type connect_sem;
int rc; /* getsockopt return code in connect */ int rc; /* getsockopt return code in connect */
...@@ -643,6 +648,7 @@ int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disc ...@@ -643,6 +648,7 @@ int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disc
} }
/** /**
* Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
* disconnected function to make API calls (e.g. connect) * disconnected function to make API calls (e.g. connect)
...@@ -661,7 +667,30 @@ static thread_return_type WINAPI call_disconnected(void* context) ...@@ -661,7 +667,30 @@ static thread_return_type WINAPI call_disconnected(void* context)
} }
int MQTTClient_setAuthHandle(MQTTClient handle, void* context, MQTTClient_authHandle* auth_handle) int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
{
int rc = MQTTCLIENT_SUCCESS;
MQTTClients* m = handle;
FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex);
if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
rc = MQTTCLIENT_FAILURE;
else
{
m->published_context = context;
m->published = published;
}
Thread_unlock_mutex(mqttclient_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
#if 0
int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
{ {
int rc = MQTTCLIENT_SUCCESS; int rc = MQTTCLIENT_SUCCESS;
MQTTClients* m = handle; MQTTClients* m = handle;
...@@ -694,8 +723,12 @@ static thread_return_type WINAPI call_auth_handle(void* context) ...@@ -694,8 +723,12 @@ static thread_return_type WINAPI call_auth_handle(void* context)
struct props_rc_parms* pr = (struct props_rc_parms*)context; struct props_rc_parms* pr = (struct props_rc_parms*)context;
(*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode); (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
MQTTProperties_free(pr->properties);
free(pr->properties);
free(pr);
return 0; return 0;
} }
#endif
/* This is the thread function that handles the calling of callback functions if set */ /* This is the thread function that handles the calling of callback functions if set */
...@@ -820,6 +853,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -820,6 +853,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
Thread_start(call_disconnected, dp); Thread_start(call_disconnected, dp);
} }
#if 0
if (pack->header.bits.type == AUTH && m->auth_handle) if (pack->header.bits.type == AUTH && m->auth_handle)
{ {
struct props_rc_parms dp; struct props_rc_parms dp;
...@@ -832,6 +866,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n) ...@@ -832,6 +866,7 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
Thread_start(call_auth_handle, &dp); Thread_start(call_auth_handle, &dp);
} }
#endif
} }
} }
else if (m->c->connect_state == TCP_IN_PROGRESS && !Thread_check_sem(m->connect_sem)) else if (m->c->connect_state == TCP_IN_PROGRESS && !Thread_check_sem(m->connect_sem))
...@@ -2091,6 +2126,7 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p ...@@ -2091,6 +2126,7 @@ MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int p
{ {
while (m->c->connected == 1 && SocketBuffer_getWrite(m->c->net.socket)) while (m->c->connected == 1 && SocketBuffer_getWrite(m->c->net.socket))
{ {
printf("getwrite %p\n", SocketBuffer_getWrite(m->c->net.socket));
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_yield(); MQTTClient_yield();
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
...@@ -2255,8 +2291,13 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2255,8 +2291,13 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack; ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
msgid = ack.msgId; msgid = ack.msgId;
if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
{
Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
(*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
}
*rc = (pack->header.bits.type == PUBCOMP) ? *rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock); MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
if (m && m->dc) if (m && m->dc)
{ {
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid); Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
...@@ -2264,7 +2305,17 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2264,7 +2305,17 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc)
} }
} }
else if (pack->header.bits.type == PUBREC) else if (pack->header.bits.type == PUBREC)
{
Pubrec* pubrec = (Pubrec*)pack;
if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= UNSPECIFIED_ERROR)
{
Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
(*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
&pubrec->properties, pubrec->rc);
}
*rc = MQTTProtocol_handlePubrecs(pack, *sock); *rc = MQTTProtocol_handlePubrecs(pack, *sock);
}
else if (pack->header.bits.type == PUBREL) else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock); *rc = MQTTProtocol_handlePubrels(pack, *sock);
else if (pack->header.bits.type == PINGRESP) else if (pack->header.bits.type == PINGRESP)
......
...@@ -448,37 +448,10 @@ typedef void MQTTClient_disconnected(void* context, MQTTProperties* properties, ...@@ -448,37 +448,10 @@ typedef void MQTTClient_disconnected(void* context, MQTTProperties* properties,
*/ */
DLLExport int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* co); DLLExport int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* co);
typedef void MQTTClient_published(void* context, int msgid, int packet_type, MQTTProperties* properties,
/**
* This is a callback function, which will be called when the client
* library successfully connects. This is superfluous when the connection
* is made in response to a MQTTAsync_connect call, because the onSuccess
* callback can be used. It is intended for use when automatic reconnect
* is enabled, so that when a reconnection attempt succeeds in the background,
* the application is notified and can take any required actions.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param cause The reason for the disconnection.
* Currently, <i>cause</i> is always set to NULL.
*/
typedef void MQTTClient_authHandle(void* context, MQTTProperties* properties,
enum MQTTReasonCodes reasonCode); enum MQTTReasonCodes reasonCode);
DLLExport int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* co);
/**
* Sets the MQTTClient_authHandle() callback function for a client.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param context A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to each of the callback functions to
* provide access to the context information in the callback.
* @param co A pointer to an MQTTAsync_connected() callback
* function. NULL removes the callback setting.
* @return ::MQTTASYNC_SUCCESS if the callbacks were correctly set,
* ::MQTTASYNC_FAILURE if an error occurred.
*/
DLLExport int MQTTClient_setAuthHandle(MQTTClient handle, void* context, MQTTClient_authHandle* ah);
/** /**
* This function creates an MQTT client ready for connection to the * This function creates an MQTT client ready for connection to the
......
...@@ -426,9 +426,25 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock) ...@@ -426,9 +426,25 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
} }
else else
{ {
rc = MQTTPacket_send_pubrel(pubrec->msgId, 0, &client->net, client->clientID); if (pubrec->MQTTVersion >= MQTTVERSION_5 && pubrec->rc >= UNSPECIFIED_ERROR)
m->nextMessageType = PUBCOMP; {
time(&(m->lastTouch)); Log(TRACE_MIN, -1, "Pubrec error %d received for client %s msgid %d, not sending PUBREL",
pubrec->rc, client->clientID, pubrec->msgId);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_remove(client, PERSISTENCE_PUBLISH_SENT, m->qos, pubrec->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
(++state.msgs_sent);
}
else
{
rc = MQTTPacket_send_pubrel(pubrec->msgId, 0, &client->net, client->clientID);
m->nextMessageType = PUBCOMP;
time(&(m->lastTouch));
}
} }
} }
if (pubrec->MQTTVersion >= MQTTVERSION_5) if (pubrec->MQTTVersion >= MQTTVERSION_5)
......
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp. * Copyright (c) 2009, 2018 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -80,7 +80,6 @@ static const char* UTF8_char_validate(int len, const char* data) ...@@ -80,7 +80,6 @@ static const char* UTF8_char_validate(int len, const char* data)
int i, j; int i, j;
const char *rc = NULL; const char *rc = NULL;
FUNC_ENTRY;
/* first work out how many bytes this char is encoded in */ /* first work out how many bytes this char is encoded in */
if ((data[0] & 128) == 0) if ((data[0] & 128) == 0)
charlen = 1; charlen = 1;
...@@ -114,7 +113,6 @@ static const char* UTF8_char_validate(int len, const char* data) ...@@ -114,7 +113,6 @@ static const char* UTF8_char_validate(int len, const char* data)
if (good) if (good)
rc = data + charlen; rc = data + charlen;
exit: exit:
FUNC_EXIT;
return rc; return rc;
} }
......
...@@ -599,12 +599,18 @@ ADD_TEST( ...@@ -599,12 +599,18 @@ ADD_TEST(
COMMAND "test10" "--test_no" "5" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test10" "--test_no" "5" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test10-6-qos_1_2_errors
COMMAND "test10" "--test_no" "6" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test10-1-client_topic_aliases test10-1-client_topic_aliases
test10-2-server_topic_aliases test10-2-server_topic_aliases
test10-3-subscription_ids test10-3-subscription_ids
test10-4-flow_control test10-4-flow_control
test10-5-error_handling test10-5-error_handling
test10-6-qos_1_2_errors
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
...@@ -643,12 +649,18 @@ ADD_TEST( ...@@ -643,12 +649,18 @@ ADD_TEST(
COMMAND "test11" "--test_no" "5" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY} COMMAND "test11" "--test_no" "5" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
) )
ADD_TEST(
NAME test11-6-qos_1_2_errors
COMMAND "test11" "--test_no" "6" "--connection" ${MQTT_TEST_BROKER} "--proxy_connection" ${MQTT_TEST_PROXY}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test11-1-client_topic_aliases test11-1-client_topic_aliases
test11-2-server_topic_aliases test11-2-server_topic_aliases
test11-3-subscription_ids test11-3-subscription_ids
test11-4-flow_control test11-4-flow_control
test11-5-error_handling test11-5-error_handling
test11-6-qos_1_2_errors
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
......
...@@ -1090,6 +1090,188 @@ exit: ...@@ -1090,6 +1090,188 @@ exit:
return failures; return failures;
} }
struct
{
int published;
int packet_type;
enum MQTTReasonCodes rc;
} test_qos_1_2_errors_globals =
{
0, -1, SUCCESS
};
void published(void* context, int msgid, int packet_type, MQTTProperties* props, enum MQTTReasonCodes rc)
{
MQTTClient c = (MQTTClient)context;
MyLog(LOGA_INFO, "Callback: published, reason code \"%s\" msgid: %d packet type: %d",
MQTTReasonCodeString(rc), msgid, packet_type);
test_qos_1_2_errors_globals.packet_type = packet_type;
test_qos_1_2_errors_globals.rc = rc;
if (props)
{
MyLog(LOGA_INFO, "Callback: published, properties:");
logProperties(props);
}
test_qos_1_2_errors_globals.published = 1;
}
void test_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT, AUTH
};
int test_qos_1_2_errors(struct Options options)
{
int subsqos = 2;
MQTTClient c;
MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property;
MQTTResponse response = MQTTResponse_initializer;
MQTTClient_deliveryToken dt;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
int rc = 0, i = 0, count = 0;
char* test_topic = "test_qos_1_2_errors";
int receive_maximum = 65535;
fprintf(xml, "<testcase classname=\"test_qos_1_2_errors\" name=\"qos 1 2 errors\"");
global_start_time = start_clock();
failures = 0;
MyLog(LOGA_INFO, "Starting test - qos 1 and 2 errors");
//MQTTClient_setTraceCallback(test_trace_callback);
rc = MQTTClient_create(&c, options.connection, "error_reporting",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTCLIENT_SUCCESS)
goto exit;
rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_setPublished(c, c, published);
assert("Good rc from setPublished", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
if (options.haconnections != NULL)
{
opts.serverURIs = options.haconnections;
opts.serverURIcount = options.hacount;
}
MyLog(LOGA_DEBUG, "Connecting");
response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
if (response.reasonCode != MQTTCLIENT_SUCCESS)
goto exit;
if (response.properties)
{
if (MQTTProperties_hasProperty(response.properties, RECEIVE_MAXIMUM))
receive_maximum = MQTTProperties_getNumericValue(response.properties, RECEIVE_MAXIMUM);
logProperties(response.properties);
MQTTResponse_free(response);
}
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 1;
pubmsg.retained = 0;
property.identifier = USER_PROPERTY;
property.value.data.data = "unsub user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "unsub user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&pubmsg.properties, &property);
response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
count = 0;
while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
{
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("Published called", test_qos_1_2_errors_globals.published == 1,
"published was %d", test_qos_1_2_errors_globals.published);
assert("Reason code was packet identifier not found",
test_qos_1_2_errors_globals.rc == NOT_AUTHORIZED,
"Reason code was %d", test_qos_1_2_errors_globals.rc);
assert("Packet type was PUBACK", test_qos_1_2_errors_globals.packet_type == PUBACK,
"packet type was %d", test_qos_1_2_errors_globals.packet_type);
test_qos_1_2_errors_globals.published = 0;
pubmsg.qos = 2;
response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
count = 0;
while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
{
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("Published called", test_qos_1_2_errors_globals.published == 1,
"published was %d", test_qos_1_2_errors_globals.published);
assert("Reason code was packet identifier not found",
test_qos_1_2_errors_globals.rc == NOT_AUTHORIZED,
"Reason code was %d", test_qos_1_2_errors_globals.rc);
assert("Packet type was PUBREC", test_qos_1_2_errors_globals.packet_type == PUBREC,
"packet type was %d", test_qos_1_2_errors_globals.packet_type);
test_qos_1_2_errors_globals.published = 0;
response = MQTTClient_publishMessage5(c, "test_qos_1_2_errors_pubcomp", &pubmsg, &dt);
assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
count = 0;
while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
{
#if defined(WIN32)
Sleep(1000);
#else
usleep(1000000L);
#endif
}
assert("Published called", test_qos_1_2_errors_globals.published == 1,
"published was %d", test_qos_1_2_errors_globals.published);
assert("Reason code was packet identifier not found",
test_qos_1_2_errors_globals.rc == PACKET_IDENTIFIER_NOT_FOUND,
"Reason code was %d", test_qos_1_2_errors_globals.rc);
assert("Packet type was PUBCOMP", test_qos_1_2_errors_globals.packet_type == PUBCOMP,
"packet type was %d", test_qos_1_2_errors_globals.packet_type);
rc = MQTTClient_disconnect5(c, 1000, SUCCESS, NULL);
exit:
MQTTClient_setTraceCallback(NULL);
MQTTProperties_free(&props);
MQTTClient_destroy(&c);
MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
...@@ -1100,14 +1282,13 @@ int main(int argc, char** argv) ...@@ -1100,14 +1282,13 @@ int main(int argc, char** argv)
test_server_topic_aliases, test_server_topic_aliases,
test_subscription_ids, test_subscription_ids,
test_flow_control, test_flow_control,
test_error_reporting test_error_reporting,
test_qos_1_2_errors
}; };
xml = fopen("TEST-test1.xml", "w"); xml = fopen("TEST-test1.xml", "w");
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
//setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
//setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 1);
MQTTClient_setTraceCallback(test_flow_control_trace_callback); MQTTClient_setTraceCallback(test_flow_control_trace_callback);
getopts(argc, argv); getopts(argc, argv);
......
...@@ -1137,7 +1137,6 @@ void test_error_reporting_onSubscribe(void* context, MQTTAsync_successData5* res ...@@ -1137,7 +1137,6 @@ void test_error_reporting_onSubscribe(void* context, MQTTAsync_successData5* res
MQTTProperties_free(&opts.properties); MQTTProperties_free(&opts.properties);
} }
void test_error_reporting_onConnect(void* context, MQTTAsync_successData5* response) void test_error_reporting_onConnect(void* context, MQTTAsync_successData5* response)
{ {
MQTTAsync c = (MQTTAsync)context; MQTTAsync c = (MQTTAsync)context;
...@@ -1228,6 +1227,211 @@ exit: ...@@ -1228,6 +1227,211 @@ exit:
} }
struct
{
int test_finished;
char* test_topic;
} test_qos_1_2_errors_globals =
{
0, "test_qos_1_2_errors"
};
void test_qos_1_2_errors_onPublishSuccess(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_INFO, "Callback: publish success, reason code \"%s\" msgid: %d packet type: ",
MQTTReasonCodeString(response->reasonCode), response->token);
logProperties(&response->properties);
test_qos_1_2_errors_globals.test_finished = 1;
}
void test_qos_1_2_errors_onPublishFailure3(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_INFO, "Callback: publish failure, reason code \"%s\" msgid: %d packet type: ",
MQTTReasonCodeString(response->reasonCode), response->token);
logProperties(&response->properties);
test_qos_1_2_errors_globals.test_finished = 1;
}
void test_qos_1_2_errors_onPublishFailure2(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_INFO, "Callback: publish failure, reason code \"%s\" msgid: %d packet type: ",
MQTTReasonCodeString(response->reasonCode), response->token);
logProperties(&response->properties);
opts.onSuccess5 = test_qos_1_2_errors_onPublishSuccess;
opts.onFailure5 = test_qos_1_2_errors_onPublishFailure3;
opts.context = c;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
property.identifier = USER_PROPERTY;
property.value.data.data = "pub user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "pub user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&pubmsg.properties, &property);
rc = MQTTAsync_sendMessage(c, "test_qos_1_2_errors_pubcomp", &pubmsg, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != SUCCESS)
test_qos_1_2_errors_globals.test_finished = 1;
MQTTProperties_free(&pubmsg.properties);
}
void test_qos_1_2_errors_onPublishFailure(void* context, MQTTAsync_failureData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTProperty property;
int rc;
MyLog(LOGA_INFO, "Callback: publish failure, reason code \"%s\" msgid: %d packet type: ",
MQTTReasonCodeString(response->reasonCode), response->token);
logProperties(&response->properties);
opts.onSuccess5 = test_qos_1_2_errors_onPublishSuccess;
opts.onFailure5 = test_qos_1_2_errors_onPublishFailure2;
opts.context = c;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
property.identifier = USER_PROPERTY;
property.value.data.data = "pub user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "pub user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&pubmsg.properties, &property);
rc = MQTTAsync_sendMessage(c, test_qos_1_2_errors_globals.test_topic, &pubmsg, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != SUCCESS)
test_qos_1_2_errors_globals.test_finished = 1;
MQTTProperties_free(&pubmsg.properties);
}
void test_qos_1_2_errors_onConnect(void* context, MQTTAsync_successData5* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTProperty property;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
assert("Reason code should be 0", response->reasonCode == SUCCESS,
"Reason code was %d\n", response->reasonCode);
MyLog(LOGA_INFO, "Connack properties:");
logProperties(&response->properties);
opts.onSuccess5 = test_qos_1_2_errors_onPublishSuccess;
opts.onFailure5 = test_qos_1_2_errors_onPublishFailure;
opts.context = c;
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 1;
pubmsg.retained = 0;
property.identifier = USER_PROPERTY;
property.value.data.data = "pub user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "pub user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&pubmsg.properties, &property);
rc = MQTTAsync_sendMessage(c, test_qos_1_2_errors_globals.test_topic, &pubmsg, &opts);
assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != SUCCESS)
test_qos_1_2_errors_globals.test_finished = 1;
MQTTProperties_free(&pubmsg.properties);
}
int test_qos_1_2_errors(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
int rc = 0;
MyLog(LOGA_INFO, "Starting V5 test - qos 1 and 2 errors");
fprintf(xml, "<testcase classname=\"test11\" name=\"qos 1 and 2 errors\"");
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "qos 1 and 2 errors",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test_flow_control_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.MQTTVersion = options.MQTTVersion;
opts.onSuccess5 = test_qos_1_2_errors_onConnect;
opts.context = c;
opts.cleanstart = 1;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (test_qos_1_2_errors_globals.test_finished == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
...@@ -1243,6 +1447,7 @@ int main(int argc, char** argv) ...@@ -1243,6 +1447,7 @@ int main(int argc, char** argv)
test_subscription_ids, test_subscription_ids,
test_flow_control, test_flow_control,
test_error_reporting, test_error_reporting,
test_qos_1_2_errors,
}; /* indexed starting from 1 */ }; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
...@@ -1267,7 +1472,7 @@ int main(int argc, char** argv) ...@@ -1267,7 +1472,7 @@ int main(int argc, char** argv)
{ /* run all the tests */ { /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no) for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{ {
failures = 0; failures = rc = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR); MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */ rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
} }
......
...@@ -1851,7 +1851,7 @@ int main(int argc, char** argv) ...@@ -1851,7 +1851,7 @@ int main(int argc, char** argv)
{ /* run all the tests */ { /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no) for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{ {
failures = 0; failures = rc = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR); MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */ rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment