Commit f563d800 authored by Ian Craggs's avatar Ian Craggs

Assign msgids at command entry time

parent 31e1f25b
...@@ -486,13 +486,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd) ...@@ -486,13 +486,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
switch (command->type) switch (command->type)
{ {
case SUBSCRIBE: case SUBSCRIBE:
nbufs = 2 + (command->details.sub.count * 2); nbufs = 3 + (command->details.sub.count * 2);
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = &command->details.sub.count; bufs[bufindex] = &command->details.sub.count;
lens[bufindex++] = sizeof(command->details.sub.count); lens[bufindex++] = sizeof(command->details.sub.count);
...@@ -508,13 +511,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd) ...@@ -508,13 +511,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
break; break;
case UNSUBSCRIBE: case UNSUBSCRIBE:
nbufs = 2 + command->details.unsub.count; nbufs = 3 + command->details.unsub.count;
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = &command->details.unsub.count; bufs[bufindex] = &command->details.unsub.count;
lens[bufindex++] = sizeof(command->details.unsub.count); lens[bufindex++] = sizeof(command->details.unsub.count);
...@@ -528,13 +534,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd) ...@@ -528,13 +534,16 @@ int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
break; break;
case PUBLISH: case PUBLISH:
nbufs = 6; nbufs = 7;
lens = (int*)malloc(nbufs * sizeof(int)); lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *)); bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type; bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type); lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->token;
lens[bufindex++] = sizeof(command->token);
bufs[bufindex] = command->details.pub.destinationName; bufs[bufindex] = command->details.pub.destinationName;
lens[bufindex++] = strlen(command->details.pub.destinationName) + 1; lens[bufindex++] = strlen(command->details.pub.destinationName) + 1;
...@@ -584,6 +593,9 @@ MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen) ...@@ -584,6 +593,9 @@ MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen)
command->type = *(int*)ptr; command->type = *(int*)ptr;
ptr += sizeof(int); ptr += sizeof(int);
command->token = *(MQTTAsync_token*)ptr;
ptr += sizeof(MQTTAsync_token);
switch (command->type) switch (command->type)
{ {
case SUBSCRIBE: case SUBSCRIBE:
...@@ -1146,12 +1158,8 @@ void MQTTAsync_processCommand() ...@@ -1146,12 +1158,8 @@ void MQTTAsync_processCommand()
MQTTAsync_freeCommand(command); /* free up the command if necessary */ MQTTAsync_freeCommand(command); /* free up the command if necessary */
} }
} }
else else /* put the command into a waiting for response queue for each client, indexed by msgid */
{
/* put the command into a waiting for response queue for each client, indexed by msgid */
command->command.token = command->client->c->msgID;
ListAppend(command->client->responses, command, sizeof(command)); ListAppend(command->client->responses, command, sizeof(command));
}
exit: exit:
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
...@@ -2147,7 +2155,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2147,7 +2155,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
goto exit; goto exit;
} }
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1) if (MQTTProtocol_assignMsgId(m->c) == 0)
{ {
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
...@@ -2170,11 +2178,13 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2170,11 +2178,13 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
sub = malloc(sizeof(MQTTAsync_queuedCommand)); sub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(sub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
sub->client = m; sub->client = m;
sub->command.token = m->c->msgID;
if (response) if (response)
{ {
sub->command.onSuccess = response->onSuccess; sub->command.onSuccess = response->onSuccess;
sub->command.onFailure = response->onFailure; sub->command.onFailure = response->onFailure;
sub->command.context = response->context; sub->command.context = response->context;
response->token = sub->command.token;
} }
sub->command.type = SUBSCRIBE; sub->command.type = SUBSCRIBE;
sub->command.details.sub.count = count; sub->command.details.sub.count = count;
...@@ -2222,7 +2232,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2222,7 +2232,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
goto exit; goto exit;
} }
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1) if (MQTTProtocol_assignMsgId(m->c) == 0)
{ {
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit; goto exit;
...@@ -2241,11 +2251,13 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2241,11 +2251,13 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
unsub->client = m; unsub->client = m;
unsub->command.type = UNSUBSCRIBE; unsub->command.type = UNSUBSCRIBE;
unsub->command.token = m->c->msgID;
if (response) if (response)
{ {
unsub->command.onSuccess = response->onSuccess; unsub->command.onSuccess = response->onSuccess;
unsub->command.onFailure = response->onFailure; unsub->command.onFailure = response->onFailure;
unsub->command.context = response->context; unsub->command.context = response->context;
response->token = unsub->command.token;
} }
unsub->command.details.unsub.count = count; unsub->command.details.unsub.count = count;
unsub->command.details.unsub.topics = malloc(sizeof(char*) * count); unsub->command.details.unsub.topics = malloc(sizeof(char*) * count);
...@@ -2286,7 +2298,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2286,7 +2298,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
rc = MQTTASYNC_BAD_UTF8_STRING; rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2) else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS; rc = MQTTASYNC_BAD_QOS;
else if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1) else if (MQTTProtocol_assignMsgId(m->c) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
...@@ -2297,11 +2309,13 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2297,11 +2309,13 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
memset(pub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
pub->client = m; pub->client = m;
pub->command.type = PUBLISH; pub->command.type = PUBLISH;
pub->command.token = m->c->msgID;
if (response) if (response)
{ {
pub->command.onSuccess = response->onSuccess; pub->command.onSuccess = response->onSuccess;
pub->command.onFailure = response->onFailure; pub->command.onFailure = response->onFailure;
pub->command.context = response->context; pub->command.context = response->context;
response->token = pub->command.token;
} }
pub->command.details.pub.destinationName = MQTTStrdup(destinationName); pub->command.details.pub.destinationName = MQTTStrdup(destinationName);
pub->command.details.pub.payloadlen = payloadlen; pub->command.details.pub.payloadlen = payloadlen;
......
...@@ -1235,12 +1235,18 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1235,12 +1235,18 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
goto exit; goto exit;
} }
} }
if (MQTTProtocol_assignMsgId(m->c) == 0)
{
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit;
}
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
{ {
ListAppend(topics, topic[i], strlen(topic[i])); ListAppend(topics, topic[i], strlen(topic[i]));
ListAppend(qoss, &qos[i], sizeof(int)); ListAppend(qoss, &qos[i], sizeof(int));
} }
rc = MQTTProtocol_subscribe(m->c, topics, qoss); rc = MQTTProtocol_subscribe(m->c, topics, qoss);
ListFreeNoContent(topics); ListFreeNoContent(topics);
ListFreeNoContent(qoss); ListFreeNoContent(qoss);
...@@ -1319,7 +1325,6 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1319,7 +1325,6 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
rc = MQTTCLIENT_DISCONNECTED; rc = MQTTCLIENT_DISCONNECTED;
goto exit; goto exit;
} }
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
{ {
if (!UTF8_validateString(topic[i])) if (!UTF8_validateString(topic[i]))
...@@ -1328,6 +1333,11 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1328,6 +1333,11 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
goto exit; goto exit;
} }
} }
if (MQTTProtocol_assignMsgId(m->c) == 0)
{
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit;
}
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
ListAppend(topics, topic[i], strlen(topic[i])); ListAppend(topics, topic[i], strlen(topic[i]));
...@@ -1416,6 +1426,11 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1416,6 +1426,11 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
} }
if (blocked == 1) if (blocked == 1)
Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
if (MQTTProtocol_assignMsgId(m->c) == 0)
{ /* this should never happen as we've waited for spaces in the queue */
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit;
}
p = malloc(sizeof(Publish)); p = malloc(sizeof(Publish));
......
...@@ -146,7 +146,7 @@ int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int ...@@ -146,7 +146,7 @@ int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int
FUNC_ENTRY; FUNC_ENTRY;
if (qos > 0) if (qos > 0)
{ {
p.msgId = publish->msgId = MQTTProtocol_assignMsgId(pubclient); p.msgId = publish->msgId = pubclient->msgID;
*mm = MQTTProtocol_createMessage(publish, mm, qos, retained); *mm = MQTTProtocol_createMessage(publish, mm, qos, retained);
ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len); ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
/* we change these pointers to the saved message location just in case the packet could not be written /* we change these pointers to the saved message location just in case the packet could not be written
......
...@@ -163,7 +163,7 @@ int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss) ...@@ -163,7 +163,7 @@ int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss)
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too */ /* we should stack this up for retry processing too */
rc = MQTTPacket_send_subscribe(topics, qoss, MQTTProtocol_assignMsgId(client), 0, &client->net, client->clientID); rc = MQTTPacket_send_subscribe(topics, qoss, client->msgID, 0, &client->net, client->clientID);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -202,7 +202,7 @@ int MQTTProtocol_unsubscribe(Clients* client, List* topics) ...@@ -202,7 +202,7 @@ int MQTTProtocol_unsubscribe(Clients* client, List* topics)
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too? */ /* we should stack this up for retry processing too? */
rc = MQTTPacket_send_unsubscribe(topics, MQTTProtocol_assignMsgId(client), 0, &client->net, client->clientID); rc = MQTTPacket_send_unsubscribe(topics, client->msgID, 0, &client->net, client->clientID);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment