Commit f5aeca49 authored by Juergen Kosel's avatar Juergen Kosel

Merge branch '373' of https://github.com/eclipse/paho.mqtt.c.git into 373

* '373' of https://github.com/eclipse/paho.mqtt.c.git:
  Forgot to remove test failure code!
  Make sure writeComplete deals with socket error #373
  Some corrections for failing socket writes #385
  Take account of not finding a response in writeComplete #385
  Add another continuous write test
  Add continuous send test
  More memory cleanup on network errors - #373
parents bde04063 c640561c
...@@ -361,7 +361,7 @@ static void MQTTProtocol_checkPendingWrites(void); ...@@ -361,7 +361,7 @@ static void MQTTProtocol_checkPendingWrites(void);
static void MQTTAsync_freeServerURIs(MQTTAsyncs* m); static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command); static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command); static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
static void MQTTAsync_writeComplete(int socket); static void MQTTAsync_writeComplete(int socket, int rc);
static int MQTTAsync_processCommand(void); static int MQTTAsync_processCommand(void);
static void MQTTAsync_checkTimeouts(void); static void MQTTAsync_checkTimeouts(void);
static thread_return_type WINAPI MQTTAsync_sendThread(void* n); static thread_return_type WINAPI MQTTAsync_sendThread(void* n);
...@@ -1050,7 +1050,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command) ...@@ -1050,7 +1050,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
} }
static void MQTTAsync_writeComplete(int socket) static void MQTTAsync_writeComplete(int socket, int rc)
{ {
ListElement* found = NULL; ListElement* found = NULL;
...@@ -1080,7 +1080,11 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1080,7 +1080,11 @@ static void MQTTAsync_writeComplete(int socket)
break; break;
} }
if (cur_response && command->onSuccess) if (cur_response) /* we found a response */
{
if (command->type == PUBLISH)
{
if (rc == 1 && command->onSuccess)
{ {
MQTTAsync_successData data; MQTTAsync_successData data;
...@@ -1093,12 +1097,26 @@ static void MQTTAsync_writeComplete(int socket) ...@@ -1093,12 +1097,26 @@ static void MQTTAsync_writeComplete(int socket)
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data); (*(command->onSuccess))(command->context, &data);
} }
m->pending_write = NULL; else if (rc == -1 && command->onFailure)
{
MQTTAsync_failureData data;
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure))(command->context, &data);
}
}
if (com)
{
ListDetach(m->responses, com); ListDetach(m->responses, com);
MQTTAsync_freeCommand(com); MQTTAsync_freeCommand(com);
} }
} }
m->pending_write = NULL;
}
}
FUNC_EXIT; FUNC_EXIT;
} }
...@@ -1268,7 +1286,8 @@ static int MQTTAsync_processCommand(void) ...@@ -1268,7 +1286,8 @@ static int MQTTAsync_processCommand(void)
} }
else else
{ {
//command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */ if (rc != SOCKET_ERROR)
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
command->client->pending_write = &command->command; command->client->pending_write = &command->command;
} }
} }
......
...@@ -295,7 +295,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc); ...@@ -295,7 +295,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout); static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
/*static int pubCompare(void* a, void* b); */ /*static int pubCompare(void* a, void* b); */
static void MQTTProtocol_checkPendingWrites(void); static void MQTTProtocol_checkPendingWrites(void);
static void MQTTClient_writeComplete(int socket); static void MQTTClient_writeComplete(int socket, int rc);
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId, int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
...@@ -2082,7 +2082,7 @@ static void MQTTProtocol_checkPendingWrites(void) ...@@ -2082,7 +2082,7 @@ static void MQTTProtocol_checkPendingWrites(void)
} }
static void MQTTClient_writeComplete(int socket) static void MQTTClient_writeComplete(int socket, int rc)
{ {
ListElement* found = NULL; ListElement* found = NULL;
......
...@@ -428,6 +428,31 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) ...@@ -428,6 +428,31 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
rc = TCPSOCKET_INTERRUPTED; rc = TCPSOCKET_INTERRUPTED;
} }
#else #else
//#define TESTING
#if defined(TESTING)
static int i = 0;
if (++i >= 10 && i < 21)
{
if (1)
{
printf("Deliberately simulating TCPSOCKET_INTERRUPTED\n");
rc = TCPSOCKET_INTERRUPTED; /* simulate a network wait */
}
else
{
printf("Deliberately simulating SOCKET_ERROR\n");
rc = SOCKET_ERROR;
}
/* should *bytes always be 0? */
if (i == 20)
{
printf("Shutdown socket\n");
shutdown(socket, SHUT_WR);
}
}
else
{
#endif
rc = writev(socket, iovecs, count); rc = writev(socket, iovecs, count);
if (rc == SOCKET_ERROR) if (rc == SOCKET_ERROR)
{ {
...@@ -437,6 +462,9 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes) ...@@ -437,6 +462,9 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
} }
else else
*bytes = rc; *bytes = rc;
#if defined(TESTING)
}
#endif
#endif #endif
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -728,7 +756,7 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete) ...@@ -728,7 +756,7 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
/** /**
* Continue an outstanding write for a particular socket * Continue an outstanding write for a particular socket
* @param socket that socket * @param socket that socket
* @return completion code * @return completion code: 0=incomplete, 1=complete, -1=socket error
*/ */
int Socket_continueWrite(int socket) int Socket_continueWrite(int socket)
{ {
...@@ -779,11 +807,23 @@ int Socket_continueWrite(int socket) ...@@ -779,11 +807,23 @@ int Socket_continueWrite(int socket)
if (pw->frees[i]) if (pw->frees[i])
free(pw->iovecs[i].iov_base); free(pw->iovecs[i].iov_base);
} }
rc = 1; /* signal complete */
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket); Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
} }
else else
{
rc = 0; /* signal not complete */
Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket); Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket);
} }
}
else /* if we got SOCKET_ERROR we need to clean up anyway - a partial write is no good anymore */
{
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
free(pw->iovecs[i].iov_base);
}
}
#if defined(OPENSSL) #if defined(OPENSSL)
exit: exit:
#endif #endif
...@@ -806,7 +846,9 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -806,7 +846,9 @@ int Socket_continueWrites(fd_set* pwset)
while (curpending) while (curpending)
{ {
int socket = *(int*)(curpending->content); int socket = *(int*)(curpending->content);
if (FD_ISSET(socket, pwset) && Socket_continueWrite(socket)) int rc = 0;
if (FD_ISSET(socket, pwset) && ((rc = Socket_continueWrite(socket)) != 0))
{ {
if (!SocketBuffer_writeComplete(socket)) if (!SocketBuffer_writeComplete(socket))
Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list"); Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list");
...@@ -819,7 +861,7 @@ int Socket_continueWrites(fd_set* pwset) ...@@ -819,7 +861,7 @@ int Socket_continueWrites(fd_set* pwset)
curpending = s.write_pending->current; curpending = s.write_pending->current;
if (writecomplete) if (writecomplete)
(*writecomplete)(socket); (*writecomplete)(socket, rc);
} }
else else
ListNextElement(s.write_pending, &curpending); ListNextElement(s.write_pending, &curpending);
......
...@@ -137,7 +137,7 @@ char* Socket_getpeer(int sock); ...@@ -137,7 +137,7 @@ char* Socket_getpeer(int sock);
void Socket_addPendingWrite(int socket); void Socket_addPendingWrite(int socket);
void Socket_clearPendingWrite(int socket); void Socket_clearPendingWrite(int socket);
typedef void Socket_writeComplete(int socket); typedef void Socket_writeComplete(int socket, int rc);
void Socket_setWriteCompleteCallback(Socket_writeComplete*); void Socket_setWriteCompleteCallback(Socket_writeComplete*);
#endif /* SOCKET_H */ #endif /* SOCKET_H */
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2011, 2014 IBM Corp. * Copyright (c) 2011, 2017 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -17,14 +17,9 @@ ...@@ -17,14 +17,9 @@
/** /**
* @file * @file
* Async C client program for the MQTT v3 restart/recovery test suite. * Async C client program for the MQTT restart/recovery test suite
*/ */
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include <string.h> #include <string.h>
...@@ -53,8 +48,8 @@ struct ...@@ -53,8 +48,8 @@ struct
char** connections; /**< HA connection list */ char** connections; /**< HA connection list */
int connection_count; int connection_count;
char* control_connection; /**< MQTT control connection, for test sync */ char* control_connection; /**< MQTT control connection, for test sync */
char* topic; char* topic; /**< test message topic */
char* control_topic; char* control_topic; /**< topic for control messages */
char* clientid; char* clientid;
int slot_no; int slot_no;
int qos; int qos;
...@@ -65,20 +60,20 @@ struct ...@@ -65,20 +60,20 @@ struct
int persistence; int persistence;
} opts = } opts =
{ {
"tcp://localhost:1885", "tcp://localhost:1884",
NULL, NULL,
0, 0,
"tcp://localhost:7777", "tcp://localhost:7777",
"XR9TT3", "Eclipse/Paho/restart_test",
"XR9TT3/control", "Eclipse/Paho/restart_test/control",
"C_broken_client", "C_broken_client",
1, 1, /* slot_no */
2, 2, /* QoS */
0, 0, /* retained */
NULL, NULL,
NULL, NULL,
0, 0,
0, 1,
}; };
void getopts(int argc, char** argv) void getopts(int argc, char** argv)
...@@ -211,7 +206,7 @@ void MyLog(int log_level, char* format, ...) ...@@ -211,7 +206,7 @@ void MyLog(int log_level, char* format, ...)
#if defined(WIN32) || defined(_WINDOWS) #if defined(WIN32) || defined(_WINDOWS)
#define mqsleep(A) Sleep(1000*A) #define mysleep(A) Sleep(1000*A)
#define START_TIME_TYPE DWORD #define START_TIME_TYPE DWORD
static DWORD start_time = 0; static DWORD start_time = 0;
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
...@@ -219,7 +214,7 @@ START_TIME_TYPE start_clock(void) ...@@ -219,7 +214,7 @@ START_TIME_TYPE start_clock(void)
return GetTickCount(); return GetTickCount();
} }
#elif defined(AIX) #elif defined(AIX)
#define mqsleep sleep #define mysleep sleep
#define START_TIME_TYPE struct timespec #define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
{ {
...@@ -228,7 +223,7 @@ START_TIME_TYPE start_clock(void) ...@@ -228,7 +223,7 @@ START_TIME_TYPE start_clock(void)
return start; return start;
} }
#else #else
#define mqsleep sleep #define mysleep sleep
#define START_TIME_TYPE struct timeval #define START_TIME_TYPE struct timeval
static struct timeval start_time; static struct timeval start_time;
START_TIME_TYPE start_clock(void) START_TIME_TYPE start_clock(void)
...@@ -302,8 +297,7 @@ void control_connectionLost(void* context, char* cause) ...@@ -302,8 +297,7 @@ void control_connectionLost(void* context, char* cause)
*/ */
int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int control_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{ {
MyLog(LOGA_DEBUG, "Control message arrived: %.*s %s", MyLog(LOGA_INFO, "Control message arrived: %.*s", m->payloadlen, m->payload);
m->payloadlen, m->payload, wait_message);
if (strcmp(m->payload, "stop") == 0) if (strcmp(m->payload, "stop") == 0)
stopping = 1; stopping = 1;
else if (wait_message != NULL && strncmp(wait_message, m->payload, else if (wait_message != NULL && strncmp(wait_message, m->payload,
...@@ -331,6 +325,7 @@ int control_send(char* message) ...@@ -331,6 +325,7 @@ int control_send(char* message)
int rc = 0; int rc = 0;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MyLog(LOGA_ALWAYS, "Sending control message: %s", message);
sprintf(buf, "%s: %s", opts.clientid, message); sprintf(buf, "%s: %s", opts.clientid, message);
rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf), rc = MQTTAsync_send(control_client, pub_topic, (int)strlen(buf),
buf, 1, 0, &ropts); buf, 1, 0, &ropts);
...@@ -350,6 +345,7 @@ int control_wait(char* message) ...@@ -350,6 +345,7 @@ int control_wait(char* message)
wait_message = message; wait_message = message;
sprintf(buf, "waiting for: %s", message); sprintf(buf, "waiting for: %s", message);
MyLog(LOGA_ALWAYS, "%s", buf);
control_send(buf); control_send(buf);
while (control_found == 0 && stopping == 0) while (control_found == 0 && stopping == 0)
...@@ -360,8 +356,10 @@ int control_wait(char* message) ...@@ -360,8 +356,10 @@ int control_wait(char* message)
MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message); MyLog(LOGA_ALWAYS, "Failed to receive message %s, stopping ", message);
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
} }
mqsleep(1); mysleep(1);
} }
MyLog(LOGA_ALWAYS, (control_found == 0) ?
"Waited... not found" : "Waited... found %d" , control_found);
return control_found; return control_found;
} }
...@@ -378,7 +376,7 @@ int control_which(char* message1, char* message2) ...@@ -378,7 +376,7 @@ int control_which(char* message1, char* message2)
{ {
if (++count == 300) if (++count == 300)
return 0; /* time out and tell the caller the message was not found */ return 0; /* time out and tell the caller the message was not found */
mqsleep(1); mysleep(1);
} }
return control_found; return control_found;
} }
...@@ -483,13 +481,13 @@ void connectionLost(void* context, char* cause) ...@@ -483,13 +481,13 @@ void connectionLost(void* context, char* cause)
{ {
conn_opts.serverURIcount = opts.connection_count; conn_opts.serverURIcount = opts.connection_count;
conn_opts.serverURIs = opts.connections; conn_opts.serverURIs = opts.connections;
printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
} }
else else
{ {
conn_opts.serverURIcount = 0; conn_opts.serverURIcount = 0;
conn_opts.serverURIs = NULL; conn_opts.serverURIs = NULL;
} }
printf("reconnecting to first serverURI %s\n", conn_opts.serverURIs[0]);
rc = MQTTAsync_connect(context, &conn_opts); rc = MQTTAsync_connect(context, &conn_opts);
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
{ {
...@@ -578,7 +576,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -578,7 +576,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
int wait_count = 0; int wait_count = 0;
int limit = 120; int limit = 120;
mqsleep(1); mysleep(1);
while (arrivedCount < expectedCount) while (arrivedCount < expectedCount)
{ {
if (arrivedCount > lastreport) if (arrivedCount > lastreport)
...@@ -587,7 +585,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -587,7 +585,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
lastreport = arrivedCount; lastreport = arrivedCount;
} }
mqsleep(1); mysleep(1);
if (opts.persistence && connection_lost) if (opts.persistence && connection_lost)
recreateReconnect(); recreateReconnect();
if (++wait_count > limit || stopping) if (++wait_count > limit || stopping)
...@@ -595,7 +593,7 @@ int waitForCompletion(START_TIME_TYPE start_time) ...@@ -595,7 +593,7 @@ int waitForCompletion(START_TIME_TYPE start_time)
} }
last_completion_time = elapsed(start_time) / 1000; last_completion_time = elapsed(start_time) / 1000;
MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive"); MyLog(LOGA_ALWAYS, "Extra wait to see if any duplicates arrive");
mqsleep(10); /* check if any duplicate messages arrive */ mysleep(10); /* check if any duplicate messages arrive */
MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds", MyLog(LOGA_ALWAYS, "%d messages arrived out of %d expected, in %d seconds",
arrivedCount, expectedCount, elapsed(start_time) / 1000); arrivedCount, expectedCount, elapsed(start_time) / 1000);
return success(expectedCount); return success(expectedCount);
...@@ -644,7 +642,7 @@ void one_iteration(void) ...@@ -644,7 +642,7 @@ void one_iteration(void)
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, NULL); opts.qos, opts.retained, NULL);
} }
...@@ -654,7 +652,7 @@ void one_iteration(void) ...@@ -654,7 +652,7 @@ void one_iteration(void)
{ {
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
printf("arrivedCount %d\n", arrivedCount); printf("arrivedCount %d\n", arrivedCount);
} }
measuring = 0; measuring = 0;
...@@ -696,13 +694,13 @@ void one_iteration(void) ...@@ -696,13 +694,13 @@ void one_iteration(void)
recreateReconnect(); recreateReconnect();
if (stopping) if (stopping)
goto exit; goto exit;
mqsleep(1); mysleep(1);
rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload, rc = MQTTAsync_send(client, opts.topic, (int)(strlen(payload)+1), payload,
opts.qos, opts.retained, &ropts); opts.qos, opts.retained, &ropts);
} }
//MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload); //MyLog(LOGA_DEBUG, "Successful publish with payload %s", payload);
while (seqno - messagesSent > 2000) while (seqno - messagesSent > 2000)
mqsleep(1); mysleep(1);
} }
MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000); MyLog(LOGA_ALWAYS, "%d messages sent in %d seconds", expectedCount, elapsed(start_time) / 1000);
...@@ -833,7 +831,7 @@ int sendAndReceive(void) ...@@ -833,7 +831,7 @@ int sendAndReceive(void)
} }
while (client_cleaned == 0) while (client_cleaned == 0)
mqsleep(1); mysleep(1);
MyLog(LOGA_ALWAYS, "Client state cleaned up"); MyLog(LOGA_ALWAYS, "Client state cleaned up");
...@@ -851,7 +849,7 @@ int sendAndReceive(void) ...@@ -851,7 +849,7 @@ int sendAndReceive(void)
/* wait until subscribed */ /* wait until subscribed */
while (client_subscribed == 0) while (client_subscribed == 0)
mqsleep(1); mysleep(1);
if (client_subscribed != 1) if (client_subscribed != 1)
goto disconnect_exit; goto disconnect_exit;
...@@ -986,7 +984,7 @@ int main(int argc, char** argv) ...@@ -986,7 +984,7 @@ int main(int argc, char** argv)
} }
while (control_subscribed == 0) while (control_subscribed == 0)
mqsleep(1); mysleep(1);
if (control_subscribed != 1) if (control_subscribed != 1)
goto destroy_exit; goto destroy_exit;
......
...@@ -1820,6 +1820,284 @@ exit: ...@@ -1820,6 +1820,284 @@ exit:
} }
/*********************************************************************
Test7: Fill up TCP buffer with QoS 0 messages
*********************************************************************/
int test7c_connected = 0;
int test7_will_message_received = 0;
int test7_messages_received = 0;
int test7Finished = 0;
int test7OnFailureCalled = 0;
int test7dReady = 0;
int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test7_will_message_received = 1;
else
test7_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test7cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test7c_connected = 1;
}
void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7cOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = (int)strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
//rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
//assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
void test7dOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test7dReady = 1;
}
void test7dOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test7donSubscribe;
opts.context = c;
//rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
//assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
//if (rc != MQTTASYNC_SUCCESS)
// test5Finished = 1;
test7dReady = 1;
}
int test7(struct Options options)
{
char* testname = "test7";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
//MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
test7_will_message_received = 0;
test7_messages_received = 0;
test7Finished = 0;
test7OnFailureCalled = 0;
test7c_connected = 0;
sprintf(willTopic, "paho-test9-7-%s", unique);
sprintf(clientidc, "paho-test9-7-c-%s", unique);
sprintf(clientidd, "paho-test9-7-d-%s", unique);
sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
test7Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test7dOnConnectSuccess;
opts.onFailure = test7dOnConnectFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test7dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test7cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->payload.data = "will message";
opts.will->payload.len = strlen(opts.will->payload.data) + 1;
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test7cOnConnectSuccess;
opts.onFailure = test7cOnConnectFailure;
opts.context = c;
opts.cleansession = 0;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
count = 0;
while (!test7c_connected && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* wait for will message */
//while (test7_will_message_received == 0 && ++count < 10000)
// MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
test7c_connected = 0;
char buf[20000];
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 50000; ++i)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = 0; /*i % 3;*/
sprintf(buf, "QoS %d message", pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = 20000; //(int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != 0)
break;
}
#if 0
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
rc = MQTTAsync_reconnect(c);
assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
/* wait for client to be reconnected */
while (!test5c_connected && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test5_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
#endif
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
exit:
MySleep(200);
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message) void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
...@@ -1831,8 +2109,10 @@ int main(int argc, char** argv) ...@@ -1831,8 +2109,10 @@ int main(int argc, char** argv)
{ {
int* numtests = &tests; int* numtests = &tests;
int rc = 0; int rc = 0;
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6}; int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7};
time_t randtime;
srand((unsigned) time(&randtime));
sprintf(unique, "%u", rand()); sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique); MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment