Commit db037c2d authored by Ian Craggs's avatar Ian Craggs

Check for all msgids used, bug #416747

parent 76684fcb
......@@ -140,6 +140,10 @@
* Return code: A qos parameter is not 0, 1 or 2
*/
#define MQTTASYNC_BAD_QOS -9
/**
* Return code: All 65535 MQTT msgids are being used
*/
#define MQTTASYNC_NO_MORE_MSGIDS -10
/**
* A handle representing an MQTT client. A valid client handle is available
......
......@@ -61,16 +61,28 @@ int messageIDCompare(void* a, void* b)
* Assign a new message id for a client. Make sure it isn't already being used and does
* not exceed the maximum.
* @param client a client structure
* @return the next message id to use
* @return the next message id to use, or 0 if none available
*/
int MQTTProtocol_assignMsgId(Clients* client)
{
int start_msgid = client->msgID;
int msgid = start_msgid;
FUNC_ENTRY;
client->msgID = (client->msgID == MAX_MSG_ID) ? 1 : client->msgID + 1;
while (ListFindItem(client->outboundMsgs, &(client->msgID), messageIDCompare) != NULL)
client->msgID = (client->msgID == MAX_MSG_ID) ? 1 : client->msgID + 1;
FUNC_EXIT_RC(client->msgID);
return client->msgID;
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
while (ListFindItem(client->outboundMsgs, &msgid, messageIDCompare) != NULL)
{
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
if (msgid == start_msgid)
{ /* we've tried them all - none free */
msgid = 0;
break;
}
}
if (msgid != 0)
client->msgID = msgid;
FUNC_EXIT_RC(msgid);
return msgid;
}
......@@ -340,9 +352,17 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
Pubrec* pubrec = (Pubrec*)pack;
Clients* client = NULL;
int rc = TCPSOCKET_COMPLETE;
ListElement* elem = NULL;
FUNC_ENTRY;
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
elem = ListFindItem(bstate->clients, &sock, clientSocketCompare);
if (!elem)
{
printf("pubrec: couldn't find client for socket %d\n", sock);
rc = SOCKET_ERROR;
goto exit;
}
client = (Clients*)(elem->content);
Log(LOG_PROTOCOL, 15, NULL, sock, client->clientID, pubrec->msgId);
/* look for the message by message id in the records of outbound messages for this client */
......@@ -372,6 +392,7 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
time(&(m->lastTouch));
}
}
exit:
free(pack);
FUNC_EXIT_RC(rc);
return rc;
......@@ -449,9 +470,18 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
Pubcomp* pubcomp = (Pubcomp*)pack;
Clients* client = NULL;
int rc = TCPSOCKET_COMPLETE;
ListElement* elem = NULL;
FUNC_ENTRY;
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
elem = ListFindItem(bstate->clients, &sock, clientSocketCompare);
if (!elem)
{
printf("pubrec: couldn't find client for socket %d\n", sock);
rc = SOCKET_ERROR;
goto exit;
}
client = (Clients*)(elem->content);
//client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 19, NULL, sock, client->clientID, pubcomp->msgId);
/* look for the message by message id in the records of outbound messages for this client */
......@@ -481,6 +511,7 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
}
}
}
exit:
free(pack);
FUNC_EXIT_RC(rc);
return rc;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment