Commit bc1399a6 authored by Ian Craggs's avatar Ian Craggs

Merge branch '373' into develop

parents 6aa07f57 edab27ad
......@@ -96,7 +96,7 @@ SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
TEST_FILES_CS = test3
SYNC_SSL_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_CS}}
TEST_FILES_A = test4 test9 test_mqtt4async
TEST_FILES_A = test4 test6 test9 test_mqtt4async test_issue373
ASYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_A}}
TEST_FILES_AS = test5
......
......@@ -361,7 +361,7 @@ static void MQTTProtocol_checkPendingWrites(void);
static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
static void MQTTAsync_writeComplete(int socket);
static void MQTTAsync_writeComplete(int socket, int rc);
static int MQTTAsync_processCommand(void);
static void MQTTAsync_checkTimeouts(void);
static thread_return_type WINAPI MQTTAsync_sendThread(void* n);
......@@ -1061,7 +1061,7 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
}
static void MQTTAsync_writeComplete(int socket)
static void MQTTAsync_writeComplete(int socket, int rc)
{
ListElement* found = NULL;
......@@ -1091,23 +1091,41 @@ static void MQTTAsync_writeComplete(int socket)
break;
}
if (cur_response && command->onSuccess)
if (cur_response) /* we found a response */
{
MQTTAsync_successData data;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data);
if (command->type == PUBLISH)
{
if (rc == 1 && command->onSuccess)
{
MQTTAsync_successData data;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data);
}
else if (rc == -1 && command->onFailure)
{
MQTTAsync_failureData data;
data.token = command->token;
data.code = rc;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->onFailure))(command->context, &data);
}
}
if (com)
{
ListDetach(m->responses, com);
MQTTAsync_freeCommand(com);
}
}
m->pending_write = NULL;
ListDetach(m->responses, com);
MQTTAsync_freeCommand(com);
}
}
FUNC_EXIT;
......@@ -1279,7 +1297,8 @@ static int MQTTAsync_processCommand(void)
}
else
{
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
if (rc != SOCKET_ERROR)
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
command->client->pending_write = &command->command;
}
}
......@@ -1291,7 +1310,20 @@ static int MQTTAsync_processCommand(void)
{
if (command->client->c->connect_state != 0 || command->client->c->connected != 0)
{
command->client->c->connect_state = -2;
if (command->client->c->connect_state != 0)
{
command->client->c->connect_state = -2;
if (command->client->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = -2;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling connect failure for client %s", command->client->c->clientID);
(*(command->client->connect.onFailure))(command->client->connect.context, &data);
}
}
MQTTAsync_checkDisconnect(command->client, &command->command);
}
}
......@@ -1306,7 +1338,8 @@ static int MQTTAsync_processCommand(void)
command->client->disconnect = command->command;
MQTTAsync_freeCommand(command);
}
else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0)
else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0 &&
rc != SOCKET_ERROR && rc != MQTTASYNC_PERSISTENCE_ERROR)
{
if (rc == TCPSOCKET_INTERRUPTED)
ListAppend(command->client->responses, command, sizeof(command));
......@@ -1324,25 +1357,25 @@ static int MQTTAsync_processCommand(void)
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client))
if (command->command.type == CONNECT
&& MQTTAsync_checkConn(&command->command, command->client))
{
Log(TRACE_MIN, -1, "Connect failed, more to try");
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
command->command.details.conn.currentURI++;
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
} else
command->command.details.conn.currentURI++;
/* put the connect command back to the head of the command queue, using the next serverURI */
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
}
else
rc = MQTTAsync_addCommand(command,
sizeof(command->command.details.conn));
} else
{
if (command->command.onFailure)
{
......@@ -1377,16 +1410,16 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try");
if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
conn->command.details.conn.currentURI++;
conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
conn->command.details.conn.currentURI++;
if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
conn->command.details.conn.currentURI++;
conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
conn->command.details.conn.currentURI++;
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
......@@ -1623,6 +1656,8 @@ void MQTTAsync_destroy(MQTTAsync* handle)
if (m == NULL)
goto exit;
MQTTAsync_closeSession(m->c);
MQTTAsync_removeResponsesAndCommands(m);
ListFree(m->responses);
......@@ -2040,7 +2075,8 @@ static void MQTTAsync_closeOnly(Clients* client)
client->ping_outstanding = 0;
if (client->net.socket > 0)
{
if (client->connected)
MQTTProtocol_checkPendingWrites();
if (client->connected && Socket_noPendingWrites(client->net.socket))
MQTTPacket_send_disconnect(&client->net, client->clientID);
Thread_lock_mutex(socket_mutex);
#if defined(OPENSSL)
......
......@@ -295,7 +295,7 @@ static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
/*static int pubCompare(void* a, void* b); */
static void MQTTProtocol_checkPendingWrites(void);
static void MQTTClient_writeComplete(int socket);
static void MQTTClient_writeComplete(int socket, int rc);
int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
......@@ -2082,7 +2082,7 @@ static void MQTTProtocol_checkPendingWrites(void)
}
static void MQTTClient_writeComplete(int socket)
static void MQTTClient_writeComplete(int socket, int rc)
{
ListElement* found = NULL;
......
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -17,6 +17,7 @@
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
* Ian Craggs - ensure that acks are not sent if write is outstanding on socket
*******************************************************************************/
/**
......@@ -83,7 +84,7 @@ int MQTTProtocol_assignMsgId(Clients* client)
while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
{
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
if (msgid == start_msgid)
if (msgid == start_msgid)
{ /* we've tried them all - none free */
msgid = 0;
break;
......@@ -275,12 +276,14 @@ int MQTTProtocol_handlePublishes(void* pack, int sock)
if (publish->header.bits.qos == 0)
Protocol_processPublication(publish, client);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else if (publish->header.bits.qos == 1)
{
/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_puback(publish->msgId, &client->net, client->clientID);
/* if we get a socket error from sending the puback, should we ignore the publication? */
Protocol_processPublication(publish, client);
/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_puback(publish->msgId, &client->net, client->clientID);
/* if we get a socket error from sending the puback, should we ignore the publication? */
Protocol_processPublication(publish, client);
}
else if (publish->header.bits.qos == 2)
{
......@@ -420,6 +423,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
{
if (pubrel->header.bits.dup == 0)
Log(TRACE_MIN, 3, NULL, "PUBREL", client->clientID, pubrel->msgId);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else
/* Apparently this is "normal" behaviour, so we don't need to issue a warning */
rc = MQTTPacket_send_pubcomp(pubrel->msgId, &client->net, client->clientID);
......@@ -431,6 +436,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
Log(TRACE_MIN, 4, NULL, "PUBREL", client->clientID, pubrel->msgId, m->qos);
else if (m->nextMessageType != PUBREL)
Log(TRACE_MIN, 5, NULL, "PUBREL", client->clientID, pubrel->msgId);
else if (!Socket_noPendingWrites(sock))
rc = SOCKET_ERROR; /* queue acks? */
else
{
Publish publish;
......@@ -521,7 +528,7 @@ void MQTTProtocol_keepalive(time_t now)
while (current)
{
Clients* client = (Clients*)(current->content);
ListNextElement(bstate->clients, &current);
ListNextElement(bstate->clients, &current);
if (client->connected && client->keepAliveInterval > 0 &&
(difftime(now, client->net.lastSent) >= client->keepAliveInterval ||
difftime(now, client->net.lastReceived) >= client->keepAliveInterval))
......@@ -740,7 +747,7 @@ char* MQTTStrncpy(char *dest, const char *src, size_t dest_size)
size_t count = dest_size;
char *temp = dest;
FUNC_ENTRY;
FUNC_ENTRY;
if (dest_size < strlen(src))
Log(TRACE_MIN, -1, "the src string is truncated");
......
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -15,6 +15,7 @@
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - MQTT 3.1.1 updates
* Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - add debug definition of MQTTStrdup for when needed
*******************************************************************************/
#if !defined(MQTTPROTOCOLCLIENT_H)
......@@ -52,4 +53,7 @@ void MQTTProtocol_freeMessageList(List* msgList);
char* MQTTStrncpy(char *dest, const char* src, size_t num);
char* MQTTStrdup(const char* src);
//#define MQTTStrdup(src) MQTTStrncpy(malloc(strlen(src)+1), src, strlen(src)+1)
#endif
......@@ -52,6 +52,7 @@ int Socket_close_only(int socket);
int Socket_continueWrite(int socket);
int Socket_continueWrites(fd_set* pwset);
char* Socket_getaddrname(struct sockaddr* sa, int sock);
int Socket_abortWrite(int socket);
#if defined(WIN32) || defined(WIN64)
#define iov_len len
......@@ -418,6 +419,7 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
int rc;
FUNC_ENTRY;
*bytes = 0L;
#if defined(WIN32) || defined(WIN64)
rc = WSASend(socket, iovecs, count, (LPDWORD)bytes, 0, NULL, NULL);
if (rc == SOCKET_ERROR)
......@@ -427,7 +429,34 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
rc = TCPSOCKET_INTERRUPTED;
}
#else
*bytes = 0L;
/*#define TCPSOCKET_INTERRUPTED_TESTING
This section forces the occasional return of TCPSOCKET_INTERRUPTED,
for testing purposes only!
*/
#if defined(TCPSOCKET_INTERRUPTED_TESTING)
static int i = 0;
if (++i >= 10 && i < 21)
{
if (1)
{
printf("Deliberately simulating TCPSOCKET_INTERRUPTED\n");
rc = TCPSOCKET_INTERRUPTED; /* simulate a network wait */
}
else
{
printf("Deliberately simulating SOCKET_ERROR\n");
rc = SOCKET_ERROR;
}
/* should *bytes always be 0? */
if (i == 20)
{
printf("Shutdown socket\n");
shutdown(socket, SHUT_WR);
}
}
else
{
#endif
rc = writev(socket, iovecs, count);
if (rc == SOCKET_ERROR)
{
......@@ -437,6 +466,9 @@ int Socket_writev(int socket, iobuf* iovecs, int count, unsigned long* bytes)
}
else
*bytes = rc;
#if defined(TCPSOCKET_INTERRUPTED_TESTING)
}
#endif
#endif
FUNC_EXIT_RC(rc);
return rc;
......@@ -466,6 +498,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
if (!Socket_noPendingWrites(socket))
{
Log(LOG_SEVERE, -1, "Trying to write to socket %d for which there is already pending output", socket);
StackTrace_printStack(stdout);
rc = SOCKET_ERROR;
goto exit;
}
......@@ -475,7 +508,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
iovecs[0].iov_base = buf0;
iovecs[0].iov_len = (ULONG)buf0len;
frees1[0] = 1;
frees1[0] = 1; /* this buffer should be freed by SocketBuffer if the write is interrupted */
for (i = 0; i < count; i++)
{
iovecs[i+1].iov_base = buffers[i];
......@@ -495,6 +528,7 @@ int Socket_putdatas(int socket, char* buf0, size_t buf0len, int count, char** bu
#if defined(OPENSSL)
SocketBuffer_pendingWrite(socket, NULL, count+1, iovecs, frees1, total, bytes);
#else
StackTrace_printStack(stdout);
SocketBuffer_pendingWrite(socket, count+1, iovecs, frees1, total, bytes);
#endif
*sockmem = socket;
......@@ -574,9 +608,10 @@ void Socket_close(int socket)
FD_CLR(socket, &(s.pending_wset));
if (s.cur_clientsds != NULL && *(int*)(s.cur_clientsds->content) == socket)
s.cur_clientsds = s.cur_clientsds->next;
Socket_abortWrite(socket);
SocketBuffer_cleanup(socket);
ListRemoveItem(s.connect_pending, &socket, intcompare);
ListRemoveItem(s.write_pending, &socket, intcompare);
SocketBuffer_cleanup(socket);
if (ListRemoveItem(s.clientsds, &socket, intcompare))
Log(TRACE_MIN, -1, "Removed socket %d", socket);
......@@ -679,7 +714,19 @@ int Socket_new(char* addr, int port, int* sock)
if (setsockopt(*sock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_NOSIGPIPE for socket %d", *sock);
#endif
/*#define SMALL_TCP_BUFFER_TESTING
This section sets the TCP send buffer to a small amount to provoke TCPSOCKET_INTERRUPTED
return codes from send, for testing only!
*/
#if defined(SMALL_TCP_BUFFER_TESTING)
if (1)
{
int optsend = 100; //2 * 1440;
printf("Setting optsend to %d\n", optsend);
if (setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (void*)&optsend, sizeof(optsend)) != 0)
Log(LOG_ERROR, -1, "Could not set SO_SNDBUF for socket %d", *sock);
}
#endif
Log(TRACE_MIN, -1, "New socket %d for %s, port %d", *sock, addr, port);
if (Socket_addSocket(*sock) == SOCKET_ERROR)
rc = Socket_error("addSocket", *sock);
......@@ -725,17 +772,19 @@ void Socket_setWriteCompleteCallback(Socket_writeComplete* mywritecomplete)
writecomplete = mywritecomplete;
}
/**
* Continue an outstanding write for a particular socket
* @param socket that socket
* @return completion code
* @return completion code: 0=incomplete, 1=complete, -1=socket error
*/
int Socket_continueWrite(int socket)
{
int rc = 0;
pending_writes* pw;
unsigned long curbuflen = 0L, /* cumulative total of buffer lengths */
bytes;
bytes = 0L;
int curbuf = -1, i;
iobuf iovecs1[5];
......@@ -779,10 +828,22 @@ int Socket_continueWrite(int socket)
if (pw->frees[i])
free(pw->iovecs[i].iov_base);
}
rc = 1; /* signal complete */
Log(TRACE_MIN, -1, "ContinueWrite: partial write now complete for socket %d", socket);
}
else
{
rc = 0; /* signal not complete */
Log(TRACE_MIN, -1, "ContinueWrite wrote +%lu bytes on socket %d", bytes, socket);
}
}
else /* if we got SOCKET_ERROR we need to clean up anyway - a partial write is no good anymore */
{
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
free(pw->iovecs[i].iov_base);
}
}
#if defined(OPENSSL)
exit:
......@@ -792,6 +853,40 @@ exit:
}
/**
* Continue an outstanding write for a particular socket
* @param socket that socket
* @return completion code: 0=incomplete, 1=complete, -1=socket error
*/
int Socket_abortWrite(int socket)
{
int i = -1, rc = 0;
pending_writes* pw;
FUNC_ENTRY;
if ((pw = SocketBuffer_getWrite(socket)) == NULL)
goto exit;
#if defined(OPENSSL)
if (pw->ssl)
goto exit;
#endif
for (i = 0; i < pw->count; i++)
{
if (pw->frees[i])
{
printf("cleaning in abortwrite for socket %d\n", socket);
free(pw->iovecs[i].iov_base);
}
}
exit:
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Continue any outstanding writes for a socket set
* @param pwset the set of sockets
......@@ -806,7 +901,9 @@ int Socket_continueWrites(fd_set* pwset)
while (curpending)
{
int socket = *(int*)(curpending->content);
if (FD_ISSET(socket, pwset) && Socket_continueWrite(socket))
int rc = 0;
if (FD_ISSET(socket, pwset) && ((rc = Socket_continueWrite(socket)) != 0))
{
if (!SocketBuffer_writeComplete(socket))
Log(LOG_SEVERE, -1, "Failed to remove pending write from socket buffer list");
......@@ -819,7 +916,7 @@ int Socket_continueWrites(fd_set* pwset)
curpending = s.write_pending->current;
if (writecomplete)
(*writecomplete)(socket);
(*writecomplete)(socket, rc);
}
else
ListNextElement(s.write_pending, &curpending);
......
......@@ -137,7 +137,7 @@ char* Socket_getpeer(int sock);
void Socket_addPendingWrite(int socket);
void Socket_clearPendingWrite(int socket);
typedef void Socket_writeComplete(int socket);
typedef void Socket_writeComplete(int socket, int rc);
void Socket_setWriteCompleteCallback(Socket_writeComplete*);
#endif /* SOCKET_H */
......@@ -318,6 +318,7 @@ void SocketBuffer_queueChar(int socket, char c)
* @param socket the socket for which the write was interrupted
* @param count the number of iovec buffers
* @param iovecs buffer array
* @param frees a set of flags indicating which of the iovecs array should be freed
* @param total total data length to be written
* @param bytes actual data length that was written
*/
......
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -170,13 +170,11 @@ void StackTrace_printStack(FILE* dest)
}
char* StackTrace_get(thread_id_type threadid)
char* StackTrace_get(thread_id_type threadid, char* buf, int bufsize)
{
int bufsize = 256;
char* buf = NULL;
int t = 0;
if ((buf = malloc(bufsize)) == NULL)
if (bufsize < 100)
goto exit;
buf[0] = '\0';
for (t = 0; t < thread_count; ++t)
......@@ -204,4 +202,3 @@ char* StackTrace_get(thread_id_type threadid)
exit:
return buf;
}
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
......@@ -66,6 +66,6 @@ void StackTrace_entry(const char* name, int line, enum LOG_LEVELS trace);
void StackTrace_exit(const char* name, int line, void* return_value, enum LOG_LEVELS trace);
void StackTrace_printStack(FILE* dest);
char* StackTrace_get(thread_id_type);
char* StackTrace_get(thread_id_type, char* buf, int bufsize);
#endif /* STACKTRACE_H_ */
This diff is collapsed.
......@@ -1656,13 +1656,13 @@ int test6(struct Options options)
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
test5_will_message_received = 0;
test5_messages_received = 0;
test5Finished = 0;
test5OnFailureCalled = 0;
test5c_connected = 0;
sprintf(willTopic, "paho-test9-6-%s", unique);
sprintf(clientidc, "paho-test9-6-c-%s", unique);
sprintf(clientidd, "paho-test9-6-d-%s", unique);
......@@ -1820,6 +1820,296 @@ exit:
}
/*********************************************************************
Test7: Fill up TCP buffer with QoS 0 messages
*********************************************************************/
int test7c_connected = 0;
int test7_will_message_received = 0;
int test7_messages_received = 0;
int test7Finished = 0;
int test7OnFailureCalled = 0;
int test7dReady = 0;
int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
MQTTAsync c = (MQTTAsync)context;
static int message_count = 0;
MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
if (memcmp(message->payload, "will message", message->payloadlen) == 0)
test7_will_message_received = 1;
else
test7_messages_received++;
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void test7cConnected(void* context, char* cause)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
test7c_connected = 1;
}
void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7cOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
/* send a message to the proxy to break the connection */
pubmsg.payload = "TERMINATE";
pubmsg.payloadlen = (int)strlen(pubmsg.payload);
pubmsg.qos = 0;
pubmsg.retained = 0;
//rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
//assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
void test7dOnConnectFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test7OnFailureCalled++;
test7Finished = 1;
}
void test7donSubscribe(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
test7dReady = 1;
}
void test7dOnConnectSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
int qoss[2] = {2, 2};
char* topics[2] = {willTopic, test_topic};
MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
opts.onSuccess = test7donSubscribe;
opts.context = c;
//rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
//assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
//if (rc != MQTTASYNC_SUCCESS)
// test5Finished = 1;
test7dReady = 1;
}
int test7(struct Options options)
{
char* testname = "test7";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
//MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
int rc = 0;
int count = 0;
char clientidc[50];
char clientidd[50];
int i = 0;
MQTTAsync_token *tokens;
test7_will_message_received = 0;
test7_messages_received = 0;
test7Finished = 0;
test7OnFailureCalled = 0;
test7c_connected = 0;
sprintf(willTopic, "paho-test9-7-%s", unique);
sprintf(clientidc, "paho-test9-7-c-%s", unique);
sprintf(clientidd, "paho-test9-7-d-%s", unique);
sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
test7Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test7dOnConnectSuccess;
opts.onFailure = test7dOnConnectFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test7dReady && ++count < 10000)
{
if (test7Finished)
goto exit;
MySleep(100);
}
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
rc = MQTTAsync_setConnected(c, c, test7cConnected);
assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
/* let client c go: connect, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->payload.data = "will message";
opts.will->payload.len = strlen(opts.will->payload.data) + 1;
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = willTopic;
opts.onSuccess = test7cOnConnectSuccess;
opts.onFailure = test7cOnConnectFailure;
opts.context = c;
opts.cleansession = 0;
/*opts.automaticReconnect = 1;
opts.minRetryInterval = 3;
opts.maxRetryInterval = 6;*/
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
count = 0;
while (!test7c_connected && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* wait for will message */
//while (test7_will_message_received == 0 && ++count < 10000)
// MySleep(100);
MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
test7c_connected = 0;
char buf[5000000];
/* send some messages. Then reconnect (check connected callback), and check that those messages are received */
for (i = 0; i < 50000; ++i)
{
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions pubopts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = 0; /*i % 3;*/
sprintf(buf, "QoS %d message", pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = 5000000; //(int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != 0)
{
//MyLog(LOGA_DEBUG, "Connecting client c");
//rc = MQTTAsync_connect(c, &opts);
//MySleep(1000);
break;
}
}
#if 0
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
rc = MQTTAsync_reconnect(c);
assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
/* wait for client to be reconnected */
while (!test5c_connected && ++count < 10000)
MySleep(100);
/* wait for success or failure callback */
while (test5_messages_received < 3 && ++count < 10000)
MySleep(100);
rc = MQTTAsync_getPendingTokens(c, &tokens);
assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
i = 0;
if (tokens)
{
while (tokens[i] != -1)
++i;
MQTTAsync_free(tokens);
}
assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
#endif
exit:
rc = MQTTAsync_disconnect(c, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
MySleep(200);
MQTTAsync_destroy(&c);
MQTTAsync_destroy(&d);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
......@@ -1831,8 +2121,10 @@ int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6};
int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7};
time_t randtime;
srand((unsigned) time(&randtime));
sprintf(unique, "%u", rand());
MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment