Commit 1104e0e5 authored by Ian Craggs's avatar Ian Craggs

Persist the message queue in the synchronous client

Bug: 432903
parent 5cb1dc07
...@@ -15,9 +15,10 @@ ...@@ -15,9 +15,10 @@
* Ian Craggs, Allan Stockdill-Mander - SSL support * Ian Craggs, Allan Stockdill-Mander - SSL support
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called * Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug# 415042 - using already freed structure * Ian Craggs - fix for bug 415042 - using already freed structure
* Ian Craggs - fix for bug 419233 - mutexes not reporting errors * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
* Ian Craggs - fix for bug #420851 * Ian Craggs - fix for bug 420851
* Ian Craggs - fix for bug 432903 - queue persistence
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -289,8 +290,6 @@ void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command); ...@@ -289,8 +290,6 @@ void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm); int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
int MQTTAsync_restoreCommands(MQTTAsyncs* client); int MQTTAsync_restoreCommands(MQTTAsyncs* client);
int MQTTAsync_unpersistQueueEntry(Clients*, qEntry*);
int MQTTAsync_restoreMessageQueue(MQTTAsyncs* client);
#endif #endif
void MQTTAsync_sleep(long milliseconds) void MQTTAsync_sleep(long milliseconds)
...@@ -405,7 +404,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId, ...@@ -405,7 +404,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
if (rc == 0) if (rc == 0)
{ {
MQTTAsync_restoreCommands(m); MQTTAsync_restoreCommands(m);
MQTTAsync_restoreMessageQueue(m); MQTTPersistence_restoreMessageQueue(m->c);
} }
} }
#endif #endif
...@@ -1460,7 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1460,7 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
ListRemove(m->c->messageQueue, qe); ListRemove(m->c->messageQueue, qe);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
if (m->c->persistence) if (m->c->persistence)
MQTTAsync_unpersistQueueEntry(m->c, qe); MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
#endif #endif
} }
else else
...@@ -1749,161 +1748,7 @@ int MQTTAsync_cleanSession(Clients* client) ...@@ -1749,161 +1748,7 @@ int MQTTAsync_cleanSession(Clients* client)
} }
#if !defined(NO_PERSISTENCE)
int MQTTAsync_unpersistQueueEntry(Clients* client, qEntry* qe)
{
int rc = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY;
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, qe->seqno);
if ((rc = client->persistence->premove(client->phandle, key)) != 0)
Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_persistQueueEntry(Clients* aclient, qEntry* qe)
{
int rc = 0;
int nbufs = 8;
int bufindex = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
int* lens = NULL;
void** bufs = NULL;
FUNC_ENTRY;
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &qe->msg->payloadlen;
lens[bufindex++] = sizeof(qe->msg->payloadlen);
bufs[bufindex] = qe->msg->payload;
lens[bufindex++] = qe->msg->payloadlen;
bufs[bufindex] = &qe->msg->qos;
lens[bufindex++] = sizeof(qe->msg->qos);
bufs[bufindex] = &qe->msg->retained;
lens[bufindex++] = sizeof(qe->msg->retained);
bufs[bufindex] = &qe->msg->dup;
lens[bufindex++] = sizeof(qe->msg->dup);
bufs[bufindex] = &qe->msg->msgid;
lens[bufindex++] = sizeof(qe->msg->msgid);
bufs[bufindex] = qe->topicName;
lens[bufindex++] = strlen(qe->topicName) + 1;
bufs[bufindex] = &qe->topicLen;
lens[bufindex++] = sizeof(qe->topicLen);
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);
qe->seqno = aclient->qentry_seqno;
if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0)
Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
free(lens);
free(bufs);
FUNC_EXIT_RC(rc);
return rc;
}
qEntry* MQTTAsync_restoreQueueEntry(char* buffer, int buflen)
{
qEntry* qe = NULL;
char* ptr = buffer;
int data_size;
FUNC_ENTRY;
qe = malloc(sizeof(qEntry));
memset(qe, '\0', sizeof(qEntry));
qe->msg = malloc(sizeof(MQTTAsync_message));
memset(qe->msg, '\0', sizeof(MQTTAsync_message));
qe->msg->payloadlen = *(int*)ptr;
ptr += sizeof(int);
data_size = qe->msg->payloadlen;
qe->msg->payload = malloc(data_size);
memcpy(qe->msg->payload, ptr, data_size);
ptr += data_size;
qe->msg->qos = *(int*)ptr;
ptr += sizeof(int);
qe->msg->retained = *(int*)ptr;
ptr += sizeof(int);
qe->msg->dup = *(int*)ptr;
ptr += sizeof(int);
qe->msg->msgid = *(int*)ptr;
ptr += sizeof(int);
data_size = strlen(ptr) + 1;
qe->topicName = malloc(data_size);
strcpy(qe->topicName, ptr);
ptr += data_size;
qe->topicLen = *(int*)ptr;
ptr += sizeof(int);
FUNC_EXIT;
return qe;
}
int MQTTAsync_restoreMessageQueue(MQTTAsyncs* client)
{
int rc = 0;
char **msgkeys;
int nkeys;
int i = 0;
Clients* c = client->c;
int entries_restored = 0;
FUNC_ENTRY;
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
{
while (rc == 0 && i < nkeys)
{
char *buffer = NULL;
int buflen;
if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0)
;
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{
qEntry* qe = MQTTAsync_restoreQueueEntry(buffer, buflen);
if (qe)
{
qe->seqno = atoi(msgkeys[i]+2);
MQTTPersistence_insertInOrder(c->messageQueue, qe, sizeof(qEntry));
free(buffer);
c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
entries_restored++;
}
}
if (msgkeys[i])
free(msgkeys[i]);
i++;
}
if (msgkeys != NULL)
free(msgkeys);
}
Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
FUNC_EXIT_RC(rc);
return rc;
}
#endif
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm) int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm)
...@@ -1973,7 +1818,7 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -1973,7 +1818,7 @@ void Protocol_processPublication(Publish* publish, Clients* client)
ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1); ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
if (client->persistence) if (client->persistence)
MQTTAsync_persistQueueEntry(client, qe); MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
#endif #endif
} }
publish->topic = NULL; publish->topic = NULL;
......
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
* Ian Craggs - fix for bug 413429 - connectionLost not called * Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
* Ian Craggs - fix for bug 419233 - mutexes not reporting errors * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
* Ian Craggs - fix for bug #420851 * Ian Craggs - fix for bug 420851
* Ian Craggs - fix for bug 432903 - queue persistence
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -34,10 +35,11 @@ ...@@ -34,10 +35,11 @@
#include <sys/time.h> #include <sys/time.h>
#endif #endif
#include "MQTTClient.h"
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
#include "MQTTPersistence.h" #include "MQTTPersistence.h"
#endif #endif
#include "MQTTClient.h"
#include "utf-8.h" #include "utf-8.h"
#include "MQTTProtocol.h" #include "MQTTProtocol.h"
#include "MQTTProtocolOut.h" #include "MQTTProtocolOut.h"
...@@ -134,6 +136,7 @@ typedef struct ...@@ -134,6 +136,7 @@ typedef struct
MQTTClient_message* msg; MQTTClient_message* msg;
char* topicName; char* topicName;
int topicLen; int topicLen;
unsigned int seqno; /* only used on restore */
} qEntry; } qEntry;
...@@ -290,7 +293,11 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId, ...@@ -290,7 +293,11 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId,
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context); rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
if (rc == 0) if (rc == 0)
{
rc = MQTTPersistence_initialize(m->c, m->serverURI); rc = MQTTPersistence_initialize(m->c, m->serverURI);
if (rc == 0)
MQTTPersistence_restoreMessageQueue(m->c);
}
#endif #endif
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List)); ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
...@@ -417,6 +424,10 @@ int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* top ...@@ -417,6 +424,10 @@ int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* top
if (strlen(*topicName) != *topicLen) if (strlen(*topicName) != *topicLen)
rc = MQTTCLIENT_TOPICNAME_TRUNCATED; rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
ListRemove(m->c->messageQueue, m->c->messageQueue->first->content); ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
#if !defined(NO_PERSISTENCE)
if (m->c->persistence)
MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
#endif
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -722,6 +733,10 @@ void Protocol_processPublication(Publish* publish, Clients* client) ...@@ -722,6 +733,10 @@ void Protocol_processPublication(Publish* publish, Clients* client)
mm->msgid = publish->msgId; mm->msgid = publish->msgId;
ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1); ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
#if !defined(NO_PERSISTENCE)
if (client->persistence)
MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
#endif
FUNC_EXIT; FUNC_EXIT;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - async client updates * Ian Craggs - async client updates
* Ian Craggs - fix for bug 432903 - queue persistence
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -462,3 +463,181 @@ void MQTTPersistence_wrapMsgID(Clients *client) ...@@ -462,3 +463,181 @@ void MQTTPersistence_wrapMsgID(Clients *client)
} }
FUNC_EXIT; FUNC_EXIT;
} }
#if !defined(NO_PERSISTENCE)
int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
{
int rc = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY;
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, qe->seqno);
if ((rc = client->persistence->premove(client->phandle, key)) != 0)
Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
{
int rc = 0;
int nbufs = 8;
int bufindex = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
int* lens = NULL;
void** bufs = NULL;
FUNC_ENTRY;
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &qe->msg->payloadlen;
lens[bufindex++] = sizeof(qe->msg->payloadlen);
bufs[bufindex] = qe->msg->payload;
lens[bufindex++] = qe->msg->payloadlen;
bufs[bufindex] = &qe->msg->qos;
lens[bufindex++] = sizeof(qe->msg->qos);
bufs[bufindex] = &qe->msg->retained;
lens[bufindex++] = sizeof(qe->msg->retained);
bufs[bufindex] = &qe->msg->dup;
lens[bufindex++] = sizeof(qe->msg->dup);
bufs[bufindex] = &qe->msg->msgid;
lens[bufindex++] = sizeof(qe->msg->msgid);
bufs[bufindex] = qe->topicName;
lens[bufindex++] = strlen(qe->topicName) + 1;
bufs[bufindex] = &qe->topicLen;
lens[bufindex++] = sizeof(qe->topicLen);
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);
qe->seqno = aclient->qentry_seqno;
if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0)
Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
free(lens);
free(bufs);
FUNC_EXIT_RC(rc);
return rc;
}
MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, int buflen)
{
MQTTPersistence_qEntry* qe = NULL;
char* ptr = buffer;
int data_size;
FUNC_ENTRY;
qe = malloc(sizeof(MQTTPersistence_qEntry));
memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
qe->msg = malloc(sizeof(MQTTPersistence_message));
memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
qe->msg->payloadlen = *(int*)ptr;
ptr += sizeof(int);
data_size = qe->msg->payloadlen;
qe->msg->payload = malloc(data_size);
memcpy(qe->msg->payload, ptr, data_size);
ptr += data_size;
qe->msg->qos = *(int*)ptr;
ptr += sizeof(int);
qe->msg->retained = *(int*)ptr;
ptr += sizeof(int);
qe->msg->dup = *(int*)ptr;
ptr += sizeof(int);
qe->msg->msgid = *(int*)ptr;
ptr += sizeof(int);
data_size = strlen(ptr) + 1;
qe->topicName = malloc(data_size);
strcpy(qe->topicName, ptr);
ptr += data_size;
qe->topicLen = *(int*)ptr;
ptr += sizeof(int);
FUNC_EXIT;
return qe;
}
void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, int size)
{
ListElement* index = NULL;
ListElement* current = NULL;
FUNC_ENTRY;
while (ListNextElement(list, &current) != NULL && index == NULL)
{
if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
index = current;
}
ListInsert(list, qEntry, size, index);
FUNC_EXIT;
}
/**
* Restores a queue of messages from persistence to memory
* @param c the client as ::Clients - the client object to restore the messages to
* @return return code, 0 if successful
*/
int MQTTPersistence_restoreMessageQueue(Clients* c)
{
int rc = 0;
char **msgkeys;
int nkeys;
int i = 0;
int entries_restored = 0;
FUNC_ENTRY;
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
{
while (rc == 0 && i < nkeys)
{
char *buffer = NULL;
int buflen;
if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0)
;
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{
MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen);
if (qe)
{
qe->seqno = atoi(msgkeys[i]+2);
MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
free(buffer);
c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
entries_restored++;
}
}
if (msgkeys[i])
free(msgkeys[i]);
i++;
}
if (msgkeys != NULL)
free(msgkeys);
}
Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
FUNC_EXIT_RC(rc);
return rc;
}
#endif
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - async client updates * Ian Craggs - async client updates
* Ian Craggs - fix for bug 432903 - queue persistence
*******************************************************************************/ *******************************************************************************/
#include "Clients.h" #include "Clients.h"
...@@ -40,3 +41,27 @@ int MQTTPersistence_put(int socket, char* buf0, int buf0len, int count, ...@@ -40,3 +41,27 @@ int MQTTPersistence_put(int socket, char* buf0, int buf0len, int count,
char** buffers, int* buflens, int htype, int msgId, int scr); char** buffers, int* buflens, int htype, int msgId, int scr);
int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId); int MQTTPersistence_remove(Clients* c, char* type, int qos, int msgId);
void MQTTPersistence_wrapMsgID(Clients *c); void MQTTPersistence_wrapMsgID(Clients *c);
typedef struct
{
char struct_id[4];
int struct_version;
int payloadlen;
void* payload;
int qos;
int retained;
int dup;
int msgid;
} MQTTPersistence_message;
typedef struct
{
MQTTPersistence_message* msg;
char* topicName;
int topicLen;
unsigned int seqno; /* only used on restore */
} MQTTPersistence_qEntry;
int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe);
int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe);
int MQTTPersistence_restoreMessageQueue(Clients* c);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment