Commit fef7f880 authored by Ian Craggs's avatar Ian Craggs

Bug #402245: pings sent unnecessarily and messages can be reordered

parent 82239cfb
......@@ -144,6 +144,7 @@ BE*/
typedef struct
{
int socket;
time_t lastContact;
#if defined(OPENSSL)
SSL* ssl;
SSL_CTX* ctx;
......@@ -168,7 +169,6 @@ typedef struct
int keepAliveInterval;
int retryInterval;
int maxInflightMessages;
time_t lastContact;
willMessages* will;
List* inboundMsgs;
List* outboundMsgs; /**< in flight */
......
......@@ -25,6 +25,7 @@
#include "Log.h"
#include "StackTrace.h"
#include "Thread.h"
char* Broker_recordFFDC(char* symptoms);
#include <memory.h>
#include <stdlib.h>
......@@ -430,6 +431,12 @@ void Log(int log_level, int msgno, char* format, ...)
printf("Log %s", format);
}
char* Broker_recordFFDC(char* symptoms)
{
printf("recordFFDC");
return "";
}
#define malloc(x) mymalloc(__FILE__, __LINE__, x)
#define realloc(a, b) myrealloc(__FILE__, __LINE__, a, b)
#define free(x) myfree(__FILE__, __LINE__, x)
......
......@@ -31,7 +31,7 @@
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "0.9.0.0"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
......@@ -248,6 +248,7 @@ typedef struct
void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm);
#if !defined(NO_PERSISTENCE)
int MQTTAsync_restoreCommands(MQTTAsyncs* client);
int MQTTAsync_unpersistQueueEntry(Clients*, qEntry*);
......@@ -803,6 +804,8 @@ void MQTTAsync_writeComplete(int socket)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
time(&(m->c->net.lastContact));
/* see if there is a pending write flagged */
if (m->pending_write)
{
......@@ -845,22 +848,35 @@ void MQTTAsync_processCommand()
int rc = 0;
MQTTAsync_queuedCommand* command = NULL;
ListElement* cur_command = NULL;
List* ignored_clients = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttcommand_mutex);
/* only the first command in the list must be processed for any particular client, so if we skip
a command for a client, we must skip all following commands for that client. Use a list of
ignored clients to keep track
*/
ignored_clients = ListInitialize();
/* don't try a command until there isn't a pending write for that client, and we are not connecting */
while (ListNextElement(commands, &cur_command))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(cur_command->content);
if (ListFind(ignored_clients, cmd->client))
continue;
if (cmd->command.type == CONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket)))
{
command = cmd;
break;
}
else
ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
}
ListFreeNoContent(ignored_clients);
if (command)
{
ListDetach(commands, command);
......@@ -1103,14 +1119,14 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
Thread_unlock_mutex(mqttasync_mutex);
while (!tostop)
{
/* int rc; */
/*int rc;*/
while (commands->count > 0)
MQTTAsync_processCommand();
#if !defined(WIN32)
/*rc = */Thread_wait_cond_timeout(send_cond, 1);
/*rc =*/ Thread_wait_cond_timeout(send_cond, 1);
#else
/*rc = */Thread_wait_sem_timeout(send_sem, 1);
/*rc =*/ Thread_wait_sem_timeout(send_sem, 1);
#endif
MQTTAsync_checkTimeouts();
......@@ -1255,12 +1271,12 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */
{
Connack* connack = (Connack*)pack;
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
{
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0;
time(&(m->c->lastContact));
if (m->c->cleansession)
rc = MQTTAsync_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
......@@ -1271,7 +1287,7 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0;
}
MQTTProtocol_retry(m->c->lastContact, 1);
MQTTProtocol_retry(m->c->net.lastContact, 1);
if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED;
}
......@@ -1336,19 +1352,10 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
topicLen = 0;
if (m->ma)
{
Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
m->c->clientID, m->c->messageQueue->count);
Thread_unlock_mutex(mqttasync_mutex);
rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
Thread_lock_mutex(mqttasync_mutex);
/* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
* the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
* so we must be careful how we use it.
*/
}
rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
else
rc = 1;
if (rc)
{
ListRemove(m->c->messageQueue, qe);
......@@ -1411,9 +1418,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
else if (sub->qoss->count > 1)
{
ListElement* cur_qos = NULL;
int* element = array;
array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
while (ListNextElement(sub->qoss, &cur_qos))
*element++ = *(int*)(cur_qos->content);
}
......@@ -2287,10 +2292,22 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
#if defined(OPENSSL)
if (m->ssl)
{
m->c->net.ssl = SSLSocket_setSocketForSSL(m->c->net.socket, m->c->sslopts);
if ((m->c->net.ssl = SSLSocket_setSocketForSSL(m->c->net.socket, m->c->sslopts)) != NULL)
{
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == -1)
m->c->connect_state = 2;
else if (rc == SSL_FATAL)
{
rc = SOCKET_ERROR;
goto exit;
}
}
else
{
rc = SOCKET_ERROR;
goto exit;
}
}
else
{
......
// Version: %Z% %W% %I% %E% %U%
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
*
......
......@@ -36,8 +36,8 @@
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP __DATE__ " " __TIME__ /* __TIMESTAMP__ */
#define CLIENT_VERSION "1.0.0.8"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
......@@ -101,6 +101,7 @@ MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTClient_cleanSession(Clients* client);
void MQTTClient_stop();
int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
void MQTTClient_writeComplete(int socket);
typedef struct
{
......@@ -224,6 +225,7 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId,
Log_initialize(NULL);
bstate->clients = ListInitialize();
Socket_outInitialize();
Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
handles = ListInitialize();
#if defined(OPENSSL)
SSLSocket_initialize();
......@@ -881,8 +883,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0; //3;
time(&(m->c->lastContact));
m->c->connect_state = 0;
if (m->c->cleansession)
rc = MQTTClient_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
......@@ -894,7 +895,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0;
}
MQTTProtocol_retry(m->c->lastContact, 1);
MQTTProtocol_retry(m->c->net.lastContact, 1);
if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED;
}
......@@ -904,14 +905,14 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
}
}
if (rc == SOCKET_ERROR || rc == MQTTCLIENT_PERSISTENCE_ERROR)
exit:
if (rc != MQTTCLIENT_SUCCESS)
{
Thread_unlock_mutex(mqttclient_mutex);
MQTTClient_disconnect(handle, 0); /* not "internal" because we don't want to call connection lost */
Thread_lock_mutex(mqttclient_mutex);
}
exit:
if (m->c->will)
{
free(m->c->will);
......@@ -938,12 +939,14 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal)
rc = MQTTCLIENT_FAILURE;
goto exit;
}
if (m->c->connected == 0)
if (m->c->connected == 0 && m->c->connect_state == 0)
{
rc = MQTTCLIENT_DISCONNECTED;
goto exit;
}
was_connected = m->c->connected; /* should be 1 */
if (m->c->connected != 0)
{
start = MQTTClient_start_clock();
m->c->connect_state = -2; /* indicate disconnecting */
while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
......@@ -954,6 +957,7 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal)
MQTTClient_yield();
Thread_lock_mutex(mqttclient_mutex);
}
}
MQTTProtocol_closeSession(m->c, 0);
......@@ -1664,3 +1668,51 @@ MQTTClient_nameValue* MQTTClient_getVersionInfo()
libinfo[i].value = NULL;
return libinfo;
}
/**
* See if any pending writes have been completed, and cleanup if so.
* Cleaning up means removing any publication data that was stored because the write did
* not originally complete.
*/
void MQTTProtocol_checkPendingWrites()
{
FUNC_ENTRY;
if (state.pending_writes.count > 0)
{
ListElement* le = state.pending_writes.first;
while (le)
{
if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
{
MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
state.pending_writes.current = le;
ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
le = state.pending_writes.current;
}
else
ListNextElement(&(state.pending_writes), &le);
}
}
FUNC_EXIT;
}
void MQTTClient_writeComplete(int socket)
{
ListElement* found = NULL;
FUNC_ENTRY;
/* a partial write is now complete for a socket - this will be on a publish*/
MQTTProtocol_checkPendingWrites();
/* find the client using this socket */
if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
{
MQTTClients* m = (MQTTClients*)(found->content);
time(&(m->c->net.lastContact));
}
FUNC_EXIT;
}
// Version: %Z% %W% %I% %E% %U%
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
*
......
......@@ -189,6 +189,9 @@ int MQTTPacket_send(networkHandles* net, Header header, char* buffer, int buflen
#endif
rc = Socket_putdatas(net->socket, buf, buf0len, 1, &buffer, &buflen);
if (rc == TCPSOCKET_COMPLETE)
time(&(net->lastContact));
if (rc != TCPSOCKET_INTERRUPTED)
free(buf);
......@@ -232,6 +235,10 @@ int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffe
else
#endif
rc = Socket_putdatas(net->socket, buf, buf0len, count, buffers, buflens);
if (rc == TCPSOCKET_COMPLETE)
time(&(net->lastContact));
if (rc != TCPSOCKET_INTERRUPTED)
free(buf);
FUNC_EXIT_RC(rc);
......@@ -533,6 +540,7 @@ int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
* Send an MQTT PUBACK packet down a socket.
* @param msgid the MQTT message id to use
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_puback(int msgid, networkHandles* net, char* clientID)
......@@ -565,6 +573,7 @@ void MQTTPacket_freeSuback(Suback* pack)
* Send an MQTT PUBREC packet down a socket.
* @param msgid the MQTT message id to use
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_pubrec(int msgid, networkHandles* net, char* clientID)
......@@ -582,8 +591,9 @@ int MQTTPacket_send_pubrec(int msgid, networkHandles* net, char* clientID)
/**
* Send an MQTT PUBREL packet down a socket.
* @param msgid the MQTT message id to use
* @param dup the MQTT DUP flag
* @param dup boolean - whether to set the MQTT DUP flag
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, char* clientID)
......@@ -602,6 +612,7 @@ int MQTTPacket_send_pubrel(int msgid, int dup, networkHandles* net, char* client
* Send an MQTT PUBCOMP packet down a socket.
* @param msgid the MQTT message id to use
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_pubcomp(int msgid, networkHandles* net, char* clientID)
......@@ -643,6 +654,7 @@ void* MQTTPacket_ack(unsigned char aHeader, char* data, int datalen)
* @param qos the value to use for the MQTT QoS setting
* @param retained boolean - whether to set the MQTT retained flag
* @param socket the open socket to send the data to
* @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, char* clientID)
......
......@@ -128,7 +128,7 @@ int MQTTPacket_send_pingreq(networkHandles* net, char* clientID)
header.byte = 0;
header.bits.type = PINGREQ;
rc = MQTTPacket_send(net, header, NULL, 0);
Log(LOG_PROTOCOL, 20, NULL, socket, clientID, rc);
Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
FUNC_EXIT_RC(rc);
return rc;
}
......
......@@ -498,11 +498,12 @@ void MQTTProtocol_keepalive(time_t now)
{
Clients* client = (Clients*)(current->content);
ListNextElement(bstate->clients, &current);
if (client->connected && client->keepAliveInterval > 0 && Socket_noPendingWrites(client->net.socket)
&& (difftime(now, client->lastContact) >= client->keepAliveInterval))
if (client->connected && client->keepAliveInterval > 0 && client->ping_outstanding == 0 &&
Socket_noPendingWrites(client->net.socket) &&
(difftime(now, client->net.lastContact) >= client->keepAliveInterval))
{
MQTTPacket_send_pingreq(&client->net, client->clientID);
client->lastContact = now;
client->net.lastContact = now;
client->ping_outstanding = 1;
}
}
......
......@@ -92,7 +92,6 @@ int MQTTProtocol_connect(char* ip_address, Clients* aClient)
FUNC_ENTRY;
aClient->good = 1;
time(&(aClient->lastContact));
addr = MQTTProtocol_addressPort(ip_address, &port);
rc = Socket_new(addr, port, &(aClient->net.socket));
......
......@@ -18,18 +18,19 @@
#if defined(OPENSSL)
#include <openssl/err.h>
#include "Socket.h"
#include "SSLSocket.h"
#include "SocketBuffer.h"
#include "MQTTClient.h"
#include "SSLSocket.h"
#include "Log.h"
#include "StackTrace.h"
#include "Socket.h"
#include "Heap.h"
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/crypto.h>
extern Sockets s;
void SSLSocket_addPendingRead(int sock);
......
......@@ -20,15 +20,6 @@
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
/* undefine TCP constants which are defined (differently) in <openssl/err.h> */
#undef EAGAIN
#undef EINTR
#undef EINPROGRESS
#undef EWOULDBLOCK
#undef ENOTCONN
#undef ECONNRESET
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINPROGRESS WSAEINPROGRESS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment