Commit bc8a5cb7 authored by Ian Craggs's avatar Ian Craggs

Publish qos 1 and 2, unsubscribe

parent 3c0e9dba
...@@ -1280,7 +1280,7 @@ static int MQTTAsync_processCommand(void) ...@@ -1280,7 +1280,7 @@ static int MQTTAsync_processCommand(void)
for (i = 0; i < command->command.details.unsub.count; i++) for (i = 0; i < command->command.details.unsub.count; i++)
ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i])); ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token); rc = MQTTProtocol_unsubscribe(command->client->c, topics, command->command.token, NULL);
ListFreeNoContent(topics); ListFreeNoContent(topics);
} }
else if (command->command.type == PUBLISH) else if (command->command.type == PUBLISH)
......
...@@ -1563,12 +1563,13 @@ int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos) ...@@ -1563,12 +1563,13 @@ int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
} }
int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
{ {
MQTTClients* m = handle; MQTTClients* m = handle;
List* topics = NULL; List* topics = NULL;
int i = 0; int i = 0;
int rc = SOCKET_ERROR; int rc = SOCKET_ERROR;
MQTTResponse resp = {MQTTCLIENT_FAILURE, NULL};
int msgid = 0; int msgid = 0;
FUNC_ENTRY; FUNC_ENTRY;
...@@ -1602,7 +1603,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1602,7 +1603,7 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
topics = ListInitialize(); topics = ListInitialize();
for (i = 0; i < count; i++) for (i = 0; i < count; i++)
ListAppend(topics, topic[i], strlen(topic[i])); ListAppend(topics, topic[i], strlen(topic[i]));
rc = MQTTProtocol_unsubscribe(m->c, topics, msgid); rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
ListFreeNoContent(topics); ListFreeNoContent(topics);
if (rc == TCPSOCKET_COMPLETE) if (rc == TCPSOCKET_COMPLETE)
...@@ -1625,24 +1626,40 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic) ...@@ -1625,24 +1626,40 @@ int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
MQTTClient_disconnect_internal(handle, 0); MQTTClient_disconnect_internal(handle, 0);
exit: exit:
resp.reasonCode = rc;
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(unsubscribe_mutex); Thread_unlock_mutex(unsubscribe_mutex);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(resp.reasonCode);
return rc; return resp;
} }
int MQTTClient_unsubscribe(MQTTClient handle, const char* topic) int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
{ {
int rc = 0; MQTTResponse response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
return response.reasonCode;
}
MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
{
MQTTResponse rc;
char *const topics[] = {(char*)topic}; char *const topics[] = {(char*)topic};
FUNC_ENTRY;
rc = MQTTClient_unsubscribeMany(handle, 1, topics); rc = MQTTClient_unsubscribeMany5(handle, 1, topics, props);
FUNC_EXIT_RC(rc);
return rc; return rc;
} }
int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
{
MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
return response.reasonCode;
}
MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, void* payload, MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, void* payload,
int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken) int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
{ {
......
...@@ -880,6 +880,8 @@ DLLExport MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, c ...@@ -880,6 +880,8 @@ DLLExport MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, c
*/ */
DLLExport int MQTTClient_unsubscribe(MQTTClient handle, const char* topic); DLLExport int MQTTClient_unsubscribe(MQTTClient handle, const char* topic);
DLLExport MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props);
/** /**
* This function attempts to remove existing subscriptions to a list of topics * This function attempts to remove existing subscriptions to a list of topics
* made by the specified client. * made by the specified client.
...@@ -893,6 +895,8 @@ DLLExport int MQTTClient_unsubscribe(MQTTClient handle, const char* topic); ...@@ -893,6 +895,8 @@ DLLExport int MQTTClient_unsubscribe(MQTTClient handle, const char* topic);
*/ */
DLLExport int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic); DLLExport int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic);
DLLExport MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props);
/** /**
* This function attempts to publish a message to a given topic (see also * This function attempts to publish a message to a given topic (see also
* MQTTClient_publishMessage()). An ::MQTTClient_deliveryToken is issued when * MQTTClient_publishMessage()). An ::MQTTClient_deliveryToken is issued when
......
...@@ -144,7 +144,10 @@ void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error) ...@@ -144,7 +144,10 @@ void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
else else
{ {
if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL) if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
{
*error = SOCKET_ERROR; // was BAD_MQTT_PACKET; *error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
}
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
else if (header.bits.type == PUBLISH && header.bits.qos == 2) else if (header.bits.type == PUBLISH && header.bits.qos == 2)
{ {
...@@ -695,15 +698,22 @@ void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t ...@@ -695,15 +698,22 @@ void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t
if (MQTTVersion >= MQTTVERSION_5) if (MQTTVersion >= MQTTVERSION_5)
{ {
MQTTProperties props = MQTTProperties_initializer; MQTTProperties props = MQTTProperties_initializer;
pack->rc = readChar(&curdata); /* reason code */
pack->rc = SUCCESS;
pack->properties = props; pack->properties = props;
pack->properties.max_count = 10;
pack->properties.array = malloc(sizeof(MQTTProperty) * pack->properties.max_count); if (datalen > 2)
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1) pack->rc = readChar(&curdata); /* reason code */
if (datalen > 3)
{ {
free(pack); pack->properties.max_count = 10;
pack = NULL; /* signal protocol error */ pack->properties.array = malloc(sizeof(MQTTProperty) * pack->properties.max_count);
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
{
free(pack);
pack = NULL; /* signal protocol error */
}
} }
} }
FUNC_EXIT; FUNC_EXIT;
......
...@@ -316,7 +316,7 @@ void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size ...@@ -316,7 +316,7 @@ void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size
* @param clientID the string client identifier, only used for tracing * @param clientID the string client identifier, only used for tracing
* @return the completion code (e.g. TCPSOCKET_COMPLETE) * @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/ */
int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID) int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
{ {
Header header; Header header;
char *data, *ptr; char *data, *ptr;
...@@ -333,14 +333,20 @@ int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles ...@@ -333,14 +333,20 @@ int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles
datalen = 2 + topics->count * 2; /* utf length == 2 */ datalen = 2 + topics->count * 2; /* utf length == 2 */
while (ListNextElement(topics, &elem)) while (ListNextElement(topics, &elem))
datalen += (int)strlen((char*)(elem->content)); datalen += (int)strlen((char*)(elem->content));
if (client->MQTTVersion >= 5)
datalen += MQTTProperties_len(props);
ptr = data = malloc(datalen); ptr = data = malloc(datalen);
writeInt(&ptr, msgid); writeInt(&ptr, msgid);
if (client->MQTTVersion >= 5)
MQTTProperties_write(&ptr, props);
elem = NULL; elem = NULL;
while (ListNextElement(topics, &elem)) while (ListNextElement(topics, &elem))
writeUTF(&ptr, (char*)(elem->content)); writeUTF(&ptr, (char*)(elem->content));
rc = MQTTPacket_send(net, header, data, datalen, 1); rc = MQTTPacket_send(&client->net, header, data, datalen, 1);
Log(LOG_PROTOCOL, 25, NULL, net->socket, clientID, msgid, rc); Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
if (rc != TCPSOCKET_INTERRUPTED) if (rc != TCPSOCKET_INTERRUPTED)
free(data); free(data);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
......
...@@ -33,6 +33,6 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o ...@@ -33,6 +33,6 @@ int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* o
int msgid, int dup, Clients* client); int msgid, int dup, Clients* client);
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen); void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_unsubscribe(List* topics, int msgid, int dup, networkHandles* net, const char* clientID); int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client);
#endif #endif
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
* Rong Xiang, Ian Craggs - C++ compatibility * Rong Xiang, Ian Craggs - C++ compatibility
* Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1 * Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
* Ian Craggs - ensure that acks are not sent if write is outstanding on socket * Ian Craggs - ensure that acks are not sent if write is outstanding on socket
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -351,6 +352,8 @@ int MQTTProtocol_handlePubacks(void* pack, int sock) ...@@ -351,6 +352,8 @@ int MQTTProtocol_handlePubacks(void* pack, int sock)
ListRemove(client->outboundMsgs, m); ListRemove(client->outboundMsgs, m);
} }
} }
if (puback->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&puback->properties);
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -400,6 +403,8 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock) ...@@ -400,6 +403,8 @@ int MQTTProtocol_handlePubrecs(void* pack, int sock)
time(&(m->lastTouch)); time(&(m->lastTouch));
} }
} }
if (pubrec->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pubrec->properties);
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -464,6 +469,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock) ...@@ -464,6 +469,8 @@ int MQTTProtocol_handlePubrels(void* pack, int sock)
++(state.msgs_received); ++(state.msgs_received);
} }
} }
if (pubrel->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pubrel->properties);
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -513,6 +520,8 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock) ...@@ -513,6 +520,8 @@ int MQTTProtocol_handlePubcomps(void* pack, int sock)
} }
} }
} }
if (pubcomp->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&pubcomp->properties);
free(pack); free(pack);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
......
...@@ -181,7 +181,6 @@ int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID, ...@@ -181,7 +181,6 @@ int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too */
rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client); rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -215,13 +214,12 @@ int MQTTProtocol_handleSubacks(void* pack, int sock) ...@@ -215,13 +214,12 @@ int MQTTProtocol_handleSubacks(void* pack, int sock)
* @param topics list of topics * @param topics list of topics
* @return completion code * @return completion code
*/ */
int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID) int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
{ {
int rc = 0; int rc = 0;
FUNC_ENTRY; FUNC_ENTRY;
/* we should stack this up for retry processing too? */ rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
rc = MQTTPacket_send_unsubscribe(topics, msgID, 0, &client->net, client->clientID);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -242,6 +240,8 @@ int MQTTProtocol_handleUnsubacks(void* pack, int sock) ...@@ -242,6 +240,8 @@ int MQTTProtocol_handleUnsubacks(void* pack, int sock)
FUNC_ENTRY; FUNC_ENTRY;
client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content); client = (Clients*)(ListFindItem(bstate->clients, &sock, clientSocketCompare)->content);
Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId); Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
if (unsuback->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&unsuback->properties);
free(unsuback); free(unsuback);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
......
...@@ -44,7 +44,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock); ...@@ -44,7 +44,7 @@ int MQTTProtocol_handlePingresps(void* pack, int sock);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID, int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
MQTTSubscribe_options* opts, MQTTProperties* props); MQTTSubscribe_options* opts, MQTTProperties* props);
int MQTTProtocol_handleSubacks(void* pack, int sock); int MQTTProtocol_handleSubacks(void* pack, int sock);
int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID); int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props);
int MQTTProtocol_handleUnsubacks(void* pack, int sock); int MQTTProtocol_handleUnsubacks(void* pack, int sock);
#endif #endif
...@@ -486,13 +486,13 @@ int test1(struct Options options) ...@@ -486,13 +486,13 @@ int test1(struct Options options)
} }
test1_sendAndReceive(c, 0, test_topic); test1_sendAndReceive(c, 0, test_topic);
//test1_sendAndReceive(c, 1, test_topic); test1_sendAndReceive(c, 1, test_topic);
//test1_sendAndReceive(c, 2, test_topic); test1_sendAndReceive(c, 2, test_topic);
MyLog(LOGA_DEBUG, "Stopping\n"); MyLog(LOGA_DEBUG, "Stopping\n");
//rc = MQTTClient_unsubscribe(c, test_topic); rc = MQTTClient_unsubscribe(c, test_topic);
//assert("Unsubscribe successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Unsubscribe successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
rc = MQTTClient_disconnect(c, 0); rc = MQTTClient_disconnect(c, 0);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
...@@ -1251,7 +1251,8 @@ int main(int argc, char** argv) ...@@ -1251,7 +1251,8 @@ int main(int argc, char** argv)
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
setenv("MQTT_C_CLIENT_TRACE", "ON", 1); setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0); //setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0);
setenv("MQTT_C_CLIENT_TRACE_LEVEL", "PROTOCOL", 0);
getopts(argc, argv); getopts(argc, argv);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment