Commit 34c34466 authored by Ian Craggs's avatar Ian Craggs

fix for bug 413429 - connectionLost not called

parent 6b8984fb
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - add SSL support * Ian Craggs - add SSL support
* Ian Craggs - fix for bug 413429 - connectionLost not called
*******************************************************************************/ *******************************************************************************/
#if !defined(CLIENTS_H) #if !defined(CLIENTS_H)
...@@ -180,6 +181,7 @@ typedef struct ...@@ -180,6 +181,7 @@ typedef struct
unsigned int qentry_seqno; unsigned int qentry_seqno;
void* phandle; /* the persistence handle */ void* phandle; /* the persistence handle */
MQTTClient_persistence* persistence; /* a persistence implementation */ MQTTClient_persistence* persistence; /* a persistence implementation */
void* context; /* calling context - used when calling disconnect_internal */
#if defined(OPENSSL) #if defined(OPENSSL)
MQTTClient_SSLOptions *sslopts; MQTTClient_SSLOptions *sslopts;
SSL_SESSION* session; /***< SSL session pointer for fast handhake */ SSL_SESSION* session; /***< SSL session pointer for fast handhake */
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* Ian Craggs - initial implementation and documentation * Ian Craggs - initial implementation and documentation
* Ian Craggs, Allan Stockdill-Mander - SSL support * Ian Craggs, Allan Stockdill-Mander - SSL support
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -42,8 +43,8 @@ ...@@ -42,8 +43,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##" #define BUILD_TIMESTAMP "201307221023"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##" #define CLIENT_VERSION "1.0.0.2"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
...@@ -123,6 +124,7 @@ int MQTTAsync_cleanSession(Clients* client); ...@@ -123,6 +124,7 @@ int MQTTAsync_cleanSession(Clients* client);
void MQTTAsync_stop(); void MQTTAsync_stop();
int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout); int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
void MQTTAsync_closeOnly(Clients* client); void MQTTAsync_closeOnly(Clients* client);
void MQTTAsync_closeSession(Clients* client);
void MQTTProtocol_closeSession(Clients* client, int sendwill); void MQTTProtocol_closeSession(Clients* client, int sendwill);
void MQTTAsync_writeComplete(int socket); void MQTTAsync_writeComplete(int socket);
...@@ -357,6 +359,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId, ...@@ -357,6 +359,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
m->c = malloc(sizeof(Clients)); m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients)); memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize(); m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize(); m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize(); m->c->messageQueue = ListInitialize();
...@@ -378,8 +381,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId, ...@@ -378,8 +381,7 @@ int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List)); ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
exit: exit:
if (Thread_unlock_mutex(mqttasync_mutex) != 0) Thread_unlock_mutex(mqttasync_mutex);
/* FFDC? */;
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -722,7 +724,7 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command) ...@@ -722,7 +724,7 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout) if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout)
{ {
int was_connected = m->c->connected; int was_connected = m->c->connected;
MQTTProtocol_closeSession(m->c, 0); MQTTAsync_closeSession(m->c);
if (command->details.dis.internal && m->cl && was_connected) if (command->details.dis.internal && m->cl && was_connected)
{ {
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
...@@ -1145,7 +1147,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1145,7 +1147,7 @@ void MQTTAsync_checkTimeouts()
} }
else else
{ {
MQTTProtocol_closeSession(m->c, 0); MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect); MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
...@@ -1485,7 +1487,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1485,7 +1487,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
} }
else else
{ {
MQTTProtocol_closeSession(m->c, 0); MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect); MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
...@@ -1686,7 +1688,7 @@ void MQTTAsync_closeOnly(Clients* client) ...@@ -1686,7 +1688,7 @@ void MQTTAsync_closeOnly(Clients* client)
} }
void MQTTProtocol_closeSession(Clients* client, int sendwill) void MQTTAsync_closeSession(Clients* client)
{ {
FUNC_ENTRY; FUNC_ENTRY;
MQTTAsync_closeOnly(client); MQTTAsync_closeOnly(client);
...@@ -2158,6 +2160,12 @@ int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout) ...@@ -2158,6 +2160,12 @@ int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
} }
void MQTTProtocol_closeSession(Clients* c, int sendwill)
{
MQTTAsync_disconnect_internal((MQTTAsync)c->context, 0);
}
int MQTTAsync_disconnect(MQTTAsync handle, MQTTAsync_disconnectOptions* options) int MQTTAsync_disconnect(MQTTAsync handle, MQTTAsync_disconnectOptions* options)
{ {
return MQTTAsync_disconnect1(handle, options, 0); return MQTTAsync_disconnect1(handle, options, 0);
...@@ -2489,7 +2497,7 @@ exit: ...@@ -2489,7 +2497,7 @@ exit:
} }
else else
{ {
MQTTProtocol_closeSession(m->c, 0); MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect); MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
...@@ -2570,7 +2578,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2570,7 +2578,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
} }
else else
{ {
MQTTProtocol_closeSession(m->c, 0); MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect); MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure) if (m->connect.onFailure)
{ {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
* Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
* Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -47,8 +48,8 @@ ...@@ -47,8 +48,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##" #define BUILD_TIMESTAMP "201307221023"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##" #define CLIENT_VERSION "1.0.0.2"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
...@@ -106,8 +107,6 @@ static int tostop = 0; ...@@ -106,8 +107,6 @@ static int tostop = 0;
static thread_id_type run_id = 0; static thread_id_type run_id = 0;
MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout); MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, long timeout);
int MQTTClient_receiveOrComplete(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
unsigned long timeout, MQTTPacket** packetaddr);
MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc); MQTTPacket* MQTTClient_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTClient_cleanSession(Clients* client); int MQTTClient_cleanSession(Clients* client);
void MQTTClient_stop(); void MQTTClient_stop();
...@@ -261,6 +260,7 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId, ...@@ -261,6 +260,7 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId,
m->c = malloc(sizeof(Clients)); m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients)); memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize(); m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize(); m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize(); m->c->messageQueue = ListInitialize();
...@@ -279,8 +279,7 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId, ...@@ -279,8 +279,7 @@ int MQTTClient_create(MQTTClient* handle, char* serverURI, char* clientId,
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List)); ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
exit: exit:
if (Thread_unlock_mutex(mqttclient_mutex) != 0) Thread_unlock_mutex(mqttclient_mutex);
/* FFDC? */;
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -625,7 +624,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect ...@@ -625,7 +624,7 @@ int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connect
} }
void MQTTProtocol_closeSession(Clients* client, int sendwill) void MQTTClient_closeSession(Clients* client)
{ {
FUNC_ENTRY; FUNC_ENTRY;
client->good = 0; client->good = 0;
...@@ -1009,7 +1008,7 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal) ...@@ -1009,7 +1008,7 @@ int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal)
} }
} }
MQTTProtocol_closeSession(m->c, 0); MQTTClient_closeSession(m->c);
if (Thread_check_sem(m->connect_sem)) if (Thread_check_sem(m->connect_sem))
Thread_post_sem(m->connect_sem); Thread_post_sem(m->connect_sem);
...@@ -1039,6 +1038,12 @@ int MQTTClient_disconnect_internal(MQTTClient handle, int timeout) ...@@ -1039,6 +1038,12 @@ int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
} }
void MQTTProtocol_closeSession(Clients* c, int sendwill)
{
MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
}
int MQTTClient_disconnect(MQTTClient handle, int timeout) int MQTTClient_disconnect(MQTTClient handle, int timeout)
{ {
return MQTTClient_disconnect1(handle, timeout, 0); return MQTTClient_disconnect1(handle, timeout, 0);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* Contributors: * Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates * Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - fix for bug 413429 - connectionLost not called
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -508,11 +509,18 @@ void MQTTProtocol_keepalive(time_t now) ...@@ -508,11 +509,18 @@ void MQTTProtocol_keepalive(time_t now)
{ {
if (Socket_noPendingWrites(client->net.socket)) if (Socket_noPendingWrites(client->net.socket))
{ {
MQTTPacket_send_pingreq(&client->net, client->clientID); if (MQTTPacket_send_pingreq(&client->net, client->clientID) != TCPSOCKET_COMPLETE)
{
Log(TRACE_MIN, -1, "Error sending PINGREQ for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
}
else
{
client->net.lastContact = now; client->net.lastContact = now;
client->ping_outstanding = 1; client->ping_outstanding = 1;
} }
} }
}
else else
{ {
Log(TRACE_MIN, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket); Log(TRACE_MIN, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
* *
* Contributors: * Contributors:
* Ian Craggs - initial contribution * Ian Craggs - initial contribution
* Ian Craggs - fix for bug 413429 - connectionLost not called
*******************************************************************************/ *******************************************************************************/
/* /*
...@@ -188,22 +189,6 @@ void getopts(int argc, char** argv) ...@@ -188,22 +189,6 @@ void getopts(int argc, char** argv)
} }
void connectionLost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
}
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{ {
if (opts.showtopics) if (opts.showtopics)
...@@ -266,10 +251,26 @@ void onConnect(void* context, MQTTAsync_successData* response) ...@@ -266,10 +251,26 @@ void onConnect(void* context, MQTTAsync_successData* response)
} }
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
void connectionLost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
int rc;
printf("connectionLost called\n");
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MQTTAsync client; MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0; int rc = 0;
char url[100]; char url[100];
...@@ -289,7 +290,7 @@ int main(int argc, char** argv) ...@@ -289,7 +290,7 @@ int main(int argc, char** argv)
rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment