Commit e93f8006 authored by Ian Craggs's avatar Ian Craggs

Call failure callbacks when commands are incomplete

Bug: 444103
parent ae855dd5
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp. * Copyright (c) 2009, 2015 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1 * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
* Ian Craggs - fix for bug 465369 - longer latency than expected * Ian Craggs - fix for bug 465369 - longer latency than expected
* Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -1327,11 +1328,25 @@ void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m) ...@@ -1327,11 +1328,25 @@ void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
FUNC_ENTRY; FUNC_ENTRY;
if (m->responses) if (m->responses)
{ {
ListElement* elem = NULL; ListElement* cur_response = NULL;
while (ListNextElement(m->responses, &cur_response))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(cur_response->content);
while (ListNextElement(m->responses, &elem)) if (command->command.onFailure)
{ {
MQTTAsync_freeCommand1((MQTTAsync_queuedCommand*) (elem->content)); MQTTAsync_failureData data;
data.token = command->command.token;
data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure))(command->command.context, &data);
}
MQTTAsync_freeCommand1(command);
count++; count++;
} }
} }
...@@ -1344,12 +1359,25 @@ void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m) ...@@ -1344,12 +1359,25 @@ void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
ListNextElement(commands, &next); ListNextElement(commands, &next);
while (current) while (current)
{ {
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content); MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m) if (command->client == m)
{
ListDetach(commands, command);
if (command->command.onFailure)
{ {
ListDetach(commands, cmd); MQTTAsync_failureData data;
MQTTAsync_freeCommand(cmd);
data.token = command->command.token;
data.code = MQTTASYNC_OPERATION_INCOMPLETE; /* interrupted return code */
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(command->command.type), m->c->clientID);
(*(command->command.onFailure))(command->command.context, &data);
}
MQTTAsync_freeCommand(command);
count++; count++;
} }
current = next; current = next;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* Ian Craggs, Allan Stockdill-Mander - SSL connections * Ian Craggs, Allan Stockdill-Mander - SSL connections
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - MQTT 3.1.1 support * Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
*******************************************************************************/ *******************************************************************************/
/********************************************************************/ /********************************************************************/
...@@ -149,6 +150,10 @@ ...@@ -149,6 +150,10 @@
* Return code: All 65535 MQTT msgids are being used * Return code: All 65535 MQTT msgids are being used
*/ */
#define MQTTASYNC_NO_MORE_MSGIDS -10 #define MQTTASYNC_NO_MORE_MSGIDS -10
/**
* Return code: the request is being discarded when not complete
*/
#define MQTTASYNC_OPERATION_INCOMPLETE -11
/** /**
* Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1 * Default MQTT version to connect with. Use 3.1.1 then fall back to 3.1
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - MQTT 3.1.1 support * Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - test8 - failure callbacks
*******************************************************************************/ *******************************************************************************/
...@@ -22,12 +23,6 @@ ...@@ -22,12 +23,6 @@
*/ */
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -1440,6 +1435,258 @@ exit: ...@@ -1440,6 +1435,258 @@ exit:
/*********************************************************************
Test8: Incomplete commands and requests
*********************************************************************/
char* test8_topic = "C client test8";
int test8_messageCount = 0;
int test8_subscribed = 0;
int test8_publishFailures = 0;
void test8_onPublish(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In publish onSuccess callback %p token %d", c, response->token);
}
void test8_onPublishFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onPublish failure callback %p", c);
assert("Response code should be interrupted", response->code == MQTTASYNC_OPERATION_INCOMPLETE,
"rc was %d", response->code);
test8_publishFailures++;
}
void test8_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
assert("Successful disconnect", 0, "disconnect failed", 0);
test_finished = 1;
}
void test8_onDisconnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
test_finished = 1;
}
void test8_onSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
test8_subscribed = 1;
}
void test8_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
opts.onSuccess = test8_onSubscribe;
opts.context = c;
rc = MQTTAsync_subscribe(c, test8_topic, 2, &opts);
assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
test_finished = 1;
}
int test8_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
MyLog(LOGA_DEBUG, "Test8: received message id %d", message->msgid);
test8_messageCount++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
int test8(struct Options options)
{
int subsqos = 2;
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_token* tokens = NULL;
int msg_count = 6;
MyLog(LOGA_INFO, "Starting test 8 - incomplete commands");
fprintf(xml, "<testcase classname=\"test4\" name=\"incomplete commands\"");
global_start_time = start_clock();
test_finished = 0;
rc = MQTTAsync_create(&c, options.connection, "async_test8",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_setCallbacks(c, c, NULL, test8_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.keepAliveInterval = 20;
opts.username = "testuser";
opts.password = "testpassword";
opts.MQTTVersion = options.MQTTVersion;
opts.onFailure = NULL;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 1;
opts.onSuccess = test8_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test8_subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
int i = 0;
pubmsg.qos = 2;
ropts.onSuccess = test8_onPublish;
ropts.onFailure = test8_onPublishFailure;
ropts.context = c;
for (i = 0; i < msg_count; ++i)
{
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2; /* alternate */
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test8_topic, &pubmsg, &ropts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
/* disconnect immediately without completing the commands */
dopts.timeout = 0;
dopts.onSuccess = test8_onDisconnect;
dopts.context = c;
rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
test_finished = 0;
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get no tokens back", tokens == NULL, "tokens was %p", tokens);
assert("test8_publishFailures > 0", test8_publishFailures > 0,
"test8_publishFailures = %d", test8_publishFailures);
/* Now elicit failure callbacks on destroy */
test8_subscribed = test8_publishFailures = 0;
MyLog(LOGA_DEBUG, "Connecting");
opts.cleansession = 0;
opts.onSuccess = test8_onConnect;
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test8_subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
i = 0;
pubmsg.qos = 2;
ropts.onSuccess = test8_onPublish;
ropts.onFailure = test8_onPublishFailure;
ropts.context = c;
for (i = 0; i < msg_count; ++i)
{
pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = (pubmsg.qos == 2) ? 1 : 2; /* alternate */
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test8_topic, &pubmsg, &ropts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
/* disconnect immediately without completing the commands */
dopts.timeout = 0;
dopts.onSuccess = test8_onDisconnect;
dopts.context = c;
rc = MQTTAsync_disconnect(c, &dopts); /* now there should be incomplete commands */
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
test_finished = 0;
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("getPendingTokens rc == 0", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
assert("should get some tokens back", tokens != NULL, "tokens was %p", tokens);
MQTTAsync_free(tokens);
assert("test8_publishFailures == 0", test8_publishFailures == 0,
"test8_publishFailures = %d", test8_publishFailures);
MQTTAsync_destroy(&c);
assert("test8_publishFailures > 0", test8_publishFailures > 0,
"test8_publishFailures = %d", test8_publishFailures);
exit:
MyLog(LOGA_INFO, "TEST8: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
write_test_result();
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); printf("Trace : %d, %s\n", level, message);
...@@ -1451,7 +1698,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) ...@@ -1451,7 +1698,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0; int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7}; /* indexed starting from 1 */ int (*tests[])() = {NULL, test1, test2, test3, test4, test5, test6, test7, test8}; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
int i; int i;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment