Commit 6983ad7f authored by Ian Craggs's avatar Ian Craggs

MQTTClient unsubscribe and disconnect

parent 36ca298e
...@@ -2121,7 +2121,7 @@ static void MQTTAsync_closeOnly(Clients* client) ...@@ -2121,7 +2121,7 @@ static void MQTTAsync_closeOnly(Clients* client)
{ {
MQTTProtocol_checkPendingWrites(); MQTTProtocol_checkPendingWrites();
if (client->connected && Socket_noPendingWrites(client->net.socket)) if (client->connected && Socket_noPendingWrites(client->net.socket))
MQTTPacket_send_disconnect(&client->net, client->clientID); MQTTPacket_send_disconnect(client, SUCCESS, NULL);
Thread_lock_mutex(socket_mutex); Thread_lock_mutex(socket_mutex);
#if defined(OPENSSL) #if defined(OPENSSL)
SSLSocket_close(&client->net); SSLSocket_close(&client->net);
......
...@@ -284,7 +284,7 @@ static int clientSockCompare(void* a, void* b); ...@@ -284,7 +284,7 @@ static int clientSockCompare(void* a, void* b);
static thread_return_type WINAPI connectionLost_call(void* context); static thread_return_type WINAPI connectionLost_call(void* context);
static thread_return_type WINAPI MQTTClient_run(void* n); static thread_return_type WINAPI MQTTClient_run(void* n);
static void MQTTClient_stop(void); static void MQTTClient_stop(void);
static void MQTTClient_closeSession(Clients* client); static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
static int MQTTClient_cleanSession(Clients* client); static int MQTTClient_cleanSession(Clients* client);
static MQTTResponse MQTTClient_connectURIVersion( static MQTTResponse MQTTClient_connectURIVersion(
MQTTClient handle, MQTTClient_connectOptions* options, MQTTClient handle, MQTTClient_connectOptions* options,
...@@ -293,7 +293,7 @@ static MQTTResponse MQTTClient_connectURIVersion( ...@@ -293,7 +293,7 @@ static MQTTResponse MQTTClient_connectURIVersion(
MQTTProperties* connectProperties, MQTTProperties* willProperties); MQTTProperties* connectProperties, MQTTProperties* willProperties);
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
MQTTProperties* connectProperties, MQTTProperties* willProperties); MQTTProperties* connectProperties, MQTTProperties* willProperties);
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop); static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout); static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
static void MQTTClient_retry(void); static void MQTTClient_retry(void);
static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc); static MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
...@@ -777,7 +777,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect ...@@ -777,7 +777,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect
} }
static void MQTTClient_closeSession(Clients* client) static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
{ {
FUNC_ENTRY; FUNC_ENTRY;
client->good = 0; client->good = 0;
...@@ -785,7 +785,7 @@ static void MQTTClient_closeSession(Clients* client) ...@@ -785,7 +785,7 @@ static void MQTTClient_closeSession(Clients* client)
if (client->net.socket > 0) if (client->net.socket > 0)
{ {
if (client->connected) if (client->connected)
MQTTPacket_send_disconnect(&client->net, client->clientID); MQTTPacket_send_disconnect(client, reason, props);
Thread_lock_mutex(socket_mutex); Thread_lock_mutex(socket_mutex);
#if defined(OPENSSL) #if defined(OPENSSL)
SSLSocket_close(&client->net); SSLSocket_close(&client->net);
...@@ -1047,7 +1047,7 @@ exit: ...@@ -1047,7 +1047,7 @@ exit:
} }
} }
else else
MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3)); /* don't want to call connection lost */ MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), SUCCESS, NULL); /* don't want to call connection lost */
resp.reasonCode = rc; resp.reasonCode = rc;
FUNC_EXIT_RC(resp.reasonCode); FUNC_EXIT_RC(resp.reasonCode);
...@@ -1335,7 +1335,8 @@ exit: ...@@ -1335,7 +1335,8 @@ exit:
/** /**
* mqttclient_mutex must be locked when you call this function, if multi threaded * mqttclient_mutex must be locked when you call this function, if multi threaded
*/ */
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop) static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
enum MQTTReasonCodes reason, MQTTProperties* props)
{ {
MQTTClients* m = handle; MQTTClients* m = handle;
START_TIME_TYPE start; START_TIME_TYPE start;
...@@ -1368,7 +1369,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne ...@@ -1368,7 +1369,7 @@ static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_conne
} }
} }
MQTTClient_closeSession(m->c); MQTTClient_closeSession(m->c, reason, props);
while (Thread_check_sem(m->connect_sem)) while (Thread_check_sem(m->connect_sem))
Thread_wait_sem(m->connect_sem, 100); Thread_wait_sem(m->connect_sem, 100);
...@@ -1396,7 +1397,7 @@ exit: ...@@ -1396,7 +1397,7 @@ exit:
*/ */
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout) static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
{ {
return MQTTClient_disconnect1(handle, timeout, 1, 1); return MQTTClient_disconnect1(handle, timeout, 1, 1, SUCCESS, NULL);
} }
...@@ -1414,7 +1415,18 @@ int MQTTClient_disconnect(MQTTClient handle, int timeout) ...@@ -1414,7 +1415,18 @@ int MQTTClient_disconnect(MQTTClient handle, int timeout)
int rc = 0; int rc = 0;
Thread_lock_mutex(mqttclient_mutex); Thread_lock_mutex(mqttclient_mutex);
rc = MQTTClient_disconnect1(handle, timeout, 0, 1); rc = MQTTClient_disconnect1(handle, timeout, 0, 1, SUCCESS, NULL);
Thread_unlock_mutex(mqttclient_mutex);
return rc;
}
int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
{
int rc = 0;
Thread_lock_mutex(mqttclient_mutex);
rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
Thread_unlock_mutex(mqttclient_mutex); Thread_unlock_mutex(mqttclient_mutex);
return rc; return rc;
} }
......
...@@ -800,6 +800,8 @@ DLLExport MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connect ...@@ -800,6 +800,8 @@ DLLExport MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connect
*/ */
DLLExport int MQTTClient_disconnect(MQTTClient handle, int timeout); DLLExport int MQTTClient_disconnect(MQTTClient handle, int timeout);
DLLExport int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props);
/** /**
* This function allows the client application to test whether or not a * This function allows the client application to test whether or not a
* client is currently connected to the MQTT server. * client is currently connected to the MQTT server.
......
...@@ -479,7 +479,7 @@ void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, ...@@ -479,7 +479,7 @@ void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data,
* @param socket the open socket to send the data to * @param socket the open socket to send the data to
* @return the completion code (e.g. TCPSOCKET_COMPLETE) * @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/ */
int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID) int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
{ {
Header header; Header header;
int rc = 0; int rc = 0;
...@@ -487,8 +487,25 @@ int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID) ...@@ -487,8 +487,25 @@ int MQTTPacket_send_disconnect(networkHandles *net, const char* clientID)
FUNC_ENTRY; FUNC_ENTRY;
header.byte = 0; header.byte = 0;
header.bits.type = DISCONNECT; header.bits.type = DISCONNECT;
rc = MQTTPacket_send(net, header, NULL, 0, 0);
Log(LOG_PROTOCOL, 28, NULL, net->socket, clientID, rc); if (client->MQTTVersion >= 5)
{
if (props || reason != SUCCESS)
{
size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
char *buf = malloc(buflen), *ptr = NULL;
ptr = buf;
writeChar(&ptr, reason);
if (props)
MQTTProperties_write(&ptr, props);
if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1)) != TCPSOCKET_INTERRUPTED)
free(buf);
}
}
else
rc = MQTTPacket_send(&client->net, header, NULL, 0, 0);
Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
......
...@@ -228,7 +228,7 @@ int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buf ...@@ -228,7 +228,7 @@ int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buf
int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees); int MQTTPacket_sends(networkHandles* net, Header header, int count, char** buffers, size_t* buflens, int* frees);
void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen); void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
int MQTTPacket_send_disconnect(networkHandles* net, const char* clientID); int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen); void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen);
void MQTTPacket_freePublish(Publish* pack); void MQTTPacket_freePublish(Publish* pack);
......
...@@ -491,17 +491,32 @@ int test1(struct Options options) ...@@ -491,17 +491,32 @@ int test1(struct Options options)
MyLog(LOGA_DEBUG, "Stopping\n"); MyLog(LOGA_DEBUG, "Stopping\n");
rc = MQTTClient_unsubscribe(c, test_topic); MQTTProperties_free(&props);
assert("Unsubscribe successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); property.identifier = USER_PROPERTY;
rc = MQTTClient_disconnect(c, 0); property.value.data.data = "User property name";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "User property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
response = MQTTClient_unsubscribe5(c, test_topic, &props);
assert("Unsubscribe successful", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
MQTTProperties_free(&props);
property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 0;
MQTTProperties_add(&props, &property);
rc = MQTTClient_disconnect5(c, 0, SUCCESS, &props);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
/* Just to make sure we can connect again */ /* Just to make sure we can connect again */
response = MQTTClient_connect5(c, &opts, NULL, NULL); response = MQTTClient_connect5(c, &opts, NULL, NULL);
assert("Connect successful", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); assert("Connect successful", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
rc = MQTTClient_disconnect(c, 0); rc = MQTTClient_disconnect5(c, 0, SUCCESS, &props);
assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc); assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
MQTTProperties_free(&props);
MQTTClient_destroy(&c); MQTTClient_destroy(&c);
exit: exit:
...@@ -1251,8 +1266,8 @@ int main(int argc, char** argv) ...@@ -1251,8 +1266,8 @@ int main(int argc, char** argv)
fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1)); fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
setenv("MQTT_C_CLIENT_TRACE", "ON", 1); setenv("MQTT_C_CLIENT_TRACE", "ON", 1);
//setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0); setenv("MQTT_C_CLIENT_TRACE_LEVEL", "ERROR", 0);
setenv("MQTT_C_CLIENT_TRACE_LEVEL", "PROTOCOL", 0); //setenv("MQTT_C_CLIENT_TRACE_LEVEL", "PROTOCOL", 0);
getopts(argc, argv); getopts(argc, argv);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment