Commit f804a18e authored by Ian Craggs's avatar Ian Craggs

More msgid changes

parent 404909dd
...@@ -1048,7 +1048,7 @@ void MQTTAsync_processCommand() ...@@ -1048,7 +1048,7 @@ void MQTTAsync_processCommand()
ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i])); ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int)); ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int));
} }
rc = MQTTProtocol_subscribe(command->client->c, topics, qoss); rc = MQTTProtocol_subscribe(command->client->c, topics, qoss, command->command.token);
ListFreeNoContent(topics); ListFreeNoContent(topics);
ListFreeNoContent(qoss); ListFreeNoContent(qoss);
} }
...@@ -1060,7 +1060,7 @@ void MQTTAsync_processCommand() ...@@ -1060,7 +1060,7 @@ void MQTTAsync_processCommand()
for (i = 0; i < command->command.details.unsub.count; i++) for (i = 0; i < command->command.details.unsub.count; i++)
ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i])); ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
rc = MQTTProtocol_unsubscribe(command->client->c, topics); rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token);
ListFreeNoContent(topics); ListFreeNoContent(topics);
} }
else if (command->command.type == PUBLISH) else if (command->command.type == PUBLISH)
...@@ -1073,7 +1073,7 @@ void MQTTAsync_processCommand() ...@@ -1073,7 +1073,7 @@ void MQTTAsync_processCommand()
p->payload = command->command.details.pub.payload; p->payload = command->command.details.pub.payload;
p->payloadlen = command->command.details.pub.payloadlen; p->payloadlen = command->command.details.pub.payloadlen;
p->topic = command->command.details.pub.destinationName; p->topic = command->command.details.pub.destinationName;
p->msgId = -1; p->msgId = command->command.token;
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg); rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
...@@ -2144,6 +2144,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2144,6 +2144,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
size_t i = 0; size_t i = 0;
int rc = MQTTASYNC_FAILURE; int rc = MQTTASYNC_FAILURE;
MQTTAsync_queuedCommand* sub; MQTTAsync_queuedCommand* sub;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
...@@ -2156,11 +2157,6 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2156,11 +2157,6 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
goto exit; goto exit;
} }
if (MQTTProtocol_assignMsgId(m->c) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
{ {
if (!UTF8_validateString(topic[i])) if (!UTF8_validateString(topic[i]))
...@@ -2174,12 +2170,17 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic, ...@@ -2174,12 +2170,17 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, size_t count, char* const* topic,
goto exit; goto exit;
} }
} }
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
/* Add subscribe request to operation queue */ /* Add subscribe request to operation queue */
sub = malloc(sizeof(MQTTAsync_queuedCommand)); sub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(sub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
sub->client = m; sub->client = m;
sub->command.token = m->c->msgID; sub->command.token = msgid;
if (response) if (response)
{ {
sub->command.onSuccess = response->onSuccess; sub->command.onSuccess = response->onSuccess;
...@@ -2221,6 +2222,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2221,6 +2222,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
size_t i = 0; size_t i = 0;
int rc = SOCKET_ERROR; int rc = SOCKET_ERROR;
MQTTAsync_queuedCommand* unsub; MQTTAsync_queuedCommand* unsub;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
...@@ -2233,11 +2235,6 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2233,11 +2235,6 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
goto exit; goto exit;
} }
if (MQTTProtocol_assignMsgId(m->c) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
{ {
if (!UTF8_validateString(topic[i])) if (!UTF8_validateString(topic[i]))
...@@ -2246,13 +2243,18 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic ...@@ -2246,13 +2243,18 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, size_t count, char* const* topic
goto exit; goto exit;
} }
} }
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
/* Add unsubscribe request to operation queue */ /* Add unsubscribe request to operation queue */
unsub = malloc(sizeof(MQTTAsync_queuedCommand)); unsub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
unsub->client = m; unsub->client = m;
unsub->command.type = UNSUBSCRIBE; unsub->command.type = UNSUBSCRIBE;
unsub->command.token = m->c->msgID; unsub->command.token = msgid;
if (response) if (response)
{ {
unsub->command.onSuccess = response->onSuccess; unsub->command.onSuccess = response->onSuccess;
...@@ -2289,6 +2291,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2289,6 +2291,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
int rc = MQTTASYNC_SUCCESS; int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
MQTTAsync_queuedCommand* pub; MQTTAsync_queuedCommand* pub;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
...@@ -2299,7 +2302,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2299,7 +2302,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
rc = MQTTASYNC_BAD_UTF8_STRING; rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2) else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS; rc = MQTTASYNC_BAD_QOS;
else if (qos > 0 && MQTTProtocol_assignMsgId(m->c) == 0) else if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
...@@ -2310,7 +2313,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload ...@@ -2310,7 +2313,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, size_t payload
memset(pub, '\0', sizeof(MQTTAsync_queuedCommand)); memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
pub->client = m; pub->client = m;
pub->command.type = PUBLISH; pub->command.type = PUBLISH;
pub->command.token = m->c->msgID; pub->command.token = msgid;
if (response) if (response)
{ {
pub->command.onSuccess = response->onSuccess; pub->command.onSuccess = response->onSuccess;
......
...@@ -1207,6 +1207,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1207,6 +1207,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
List* qoss = ListInitialize(); List* qoss = ListInitialize();
int i = 0; int i = 0;
int rc = MQTTCLIENT_FAILURE; int rc = MQTTCLIENT_FAILURE;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
...@@ -1235,7 +1236,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1235,7 +1236,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
goto exit; goto exit;
} }
} }
if (MQTTProtocol_assignMsgId(m->c) == 0) if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
{ {
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT; rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit; goto exit;
...@@ -1247,7 +1248,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i ...@@ -1247,7 +1248,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
ListAppend(qoss, &qos[i], sizeof(int)); ListAppend(qoss, &qos[i], sizeof(int));
} }
rc = MQTTProtocol_subscribe(m->c, topics, qoss); rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid);
ListFreeNoContent(topics); ListFreeNoContent(topics);
ListFreeNoContent(qoss); ListFreeNoContent(qoss);
...@@ -1311,6 +1312,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1311,6 +1312,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
List* topics = ListInitialize(); List* topics = ListInitialize();
int i = 0; int i = 0;
int rc = SOCKET_ERROR; int rc = SOCKET_ERROR;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
...@@ -1333,7 +1335,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1333,7 +1335,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
goto exit; goto exit;
} }
} }
if (MQTTProtocol_assignMsgId(m->c) == 0) if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
{ {
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT; rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit; goto exit;
...@@ -1341,7 +1343,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1341,7 +1343,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
ListAppend(topics, topic[i], strlen(topic[i])); ListAppend(topics, topic[i], strlen(topic[i]));
rc = MQTTProtocol_unsubscribe(m->c, topics); rc = MQTTProtocol_unsubscribe(m->c, topics, msgid);
ListFreeNoContent(topics); ListFreeNoContent(topics);
if (rc == TCPSOCKET_COMPLETE) if (rc == TCPSOCKET_COMPLETE)
...@@ -1393,6 +1395,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1393,6 +1395,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
Messages* msg = NULL; Messages* msg = NULL;
Publish* p = NULL; Publish* p = NULL;
int blocked = 0; int blocked = 0;
int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
...@@ -1426,7 +1429,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1426,7 +1429,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
} }
if (blocked == 1) if (blocked == 1)
Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
if (MQTTProtocol_assignMsgId(m->c) == 0) if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
{ /* this should never happen as we've waited for spaces in the queue */ { /* this should never happen as we've waited for spaces in the queue */
rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT; rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
goto exit; goto exit;
...@@ -1437,7 +1440,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, ...@@ -1437,7 +1440,7 @@ int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen,
p->payload = payload; p->payload = payload;
p->payloadlen = payloadlen; p->payloadlen = payloadlen;
p->topic = (char*)topicName; p->topic = (char*)topicName;
p->msgId = -1; p->msgId = msgid;
rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg); rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
......
...@@ -146,7 +146,6 @@ int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int ...@@ -146,7 +146,6 @@ int MQTTProtocol_startPublish(Clients* pubclient, Publish* publish, int qos, int
FUNC_ENTRY; FUNC_ENTRY;
if (qos > 0) if (qos > 0)
{ {
p.msgId = publish->msgId = pubclient->msgID;
*mm = MQTTProtocol_createMessage(publish, mm, qos, retained); *mm = MQTTProtocol_createMessage(publish, mm, qos, retained);
ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len); ListAppend(pubclient->outboundMsgs, *mm, (*mm)->len);
/* we change these pointers to the saved message location just in case the packet could not be written /* we change these pointers to the saved message location just in case the packet could not be written
......
...@@ -157,13 +157,13 @@ int MQTTProtocol_handlePingresps(void* pack, int sock) ...@@ -157,13 +157,13 @@ int MQTTProtocol_handlePingresps(void* pack, int sock)
* @param qoss corresponding list of QoSs * @param qoss corresponding list of QoSs
* @return completion code * @return completion code
*/ */
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss) int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID)
{ {
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too */ /* we should stack this up for retry processing too */
rc = MQTTPacket_send_subscribe(topics, qoss, client->msgID, 0, &client->net, client->clientID); rc = MQTTPacket_send_subscribe(topics, qoss, msgID, 0, &client->net, client->clientID);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -196,13 +196,13 @@ int MQTTProtocol_handleSubacks(void* pack, int sock) ...@@ -196,13 +196,13 @@ int MQTTProtocol_handleSubacks(void* pack, int sock)
* @param topics list of topics * @param topics list of topics
* @return completion code * @return completion code
*/ */
int MQTTProtocol_unsubscribe(Clients* client, List* topics) int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID)
{ {
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too? */ /* we should stack this up for retry processing too? */
rc = MQTTPacket_send_unsubscribe(topics, client->msgID, 0, &client->net, client->clientID); rc = MQTTPacket_send_unsubscribe(topics, msgID, 0, &client->net, client->clientID);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
......
...@@ -36,9 +36,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int ssl, in ...@@ -36,9 +36,9 @@ int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int ssl, in
int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVersion); int MQTTProtocol_connect(const char* ip_address, Clients* acClients, int MQTTVersion);
#endif #endif
int MQTTProtocol_handlePingresps(void* pack, int sock); int MQTTProtocol_handlePingresps(void* pack, int sock);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss); int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID);
int MQTTProtocol_handleSubacks(void* pack, int sock); int MQTTProtocol_handleSubacks(void* pack, int sock);
int MQTTProtocol_unsubscribe(Clients* client, List* topics); int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID);
int MQTTProtocol_handleUnsubacks(void* pack, int sock); int MQTTProtocol_handleUnsubacks(void* pack, int sock);
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment