Commit 6694ae38 authored by Ian Craggs's avatar Ian Craggs

First implementation of offline buffering/automatic reconnect

parent 5f1bfaea
...@@ -72,10 +72,10 @@ HEADERS = $(srcdir)/*.h ...@@ -72,10 +72,10 @@ HEADERS = $(srcdir)/*.h
HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS)) HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS))
HEADERS_A = $(HEADERS) HEADERS_A = $(HEADERS)
SAMPLE_FILES_C = stdinpub stdoutsub pubsync pubasync subasync SAMPLE_FILES_C = stdinpub stdoutsub pubsync pubasync subasync
SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}} SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = stdoutsuba MQTTAsync_subscribe MQTTAsync_publish SAMPLE_FILES_A = stdinpuba stdoutsuba MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}} ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1 sync_client_test test_mqtt4sync TEST_FILES_C = test1 sync_client_test test_mqtt4sync
......
Issue 2 - offline buffering and automatic reconnect
if we don't have automatic reconnect and we don't successfully connect, then ...
offline buffering implies automatic reconnect?
is it worth having automatic reconnect without offline buffering?
should automatic reconnect even be offered separately?
\ No newline at end of file
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp. * Copyright (c) 2009, 2016 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1 * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
* Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
*******************************************************************************/ *******************************************************************************/
/** /**
...@@ -52,12 +53,16 @@ ...@@ -52,12 +53,16 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##" #define BUILD_TIMESTAMP "Sun Feb 14 19:24:53 GMT 2016"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##" #define CLIENT_VERSION "1.0.3"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
#if !defined(min)
#define min(a, b) (((a) < (b)) ? (a) : (b))
#endif
extern Sockets s; extern Sockets s;
static ClientStates ClientState = static ClientStates ClientState =
...@@ -283,6 +288,19 @@ typedef struct MQTTAsync_struct ...@@ -283,6 +288,19 @@ typedef struct MQTTAsync_struct
MQTTPacket* pack; MQTTPacket* pack;
/* added for offline buffering */
MQTTAsync_createOptions* createOptions;
int shouldBeConnected;
/* added for automatic reconnect */
int automaticReconnect;
int minRetryInterval;
int maxRetryInterval;
int currentInterval;
START_TIME_TYPE lastConnectionFailedTime;
int retrying;
} MQTTAsyncs; } MQTTAsyncs;
...@@ -353,8 +371,8 @@ int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client) ...@@ -353,8 +371,8 @@ int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
} }
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId, int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context) int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
{ {
int rc = 0; int rc = 0;
MQTTAsyncs *m = NULL; MQTTAsyncs *m = NULL;
...@@ -374,6 +392,12 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien ...@@ -374,6 +392,12 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien
goto exit; goto exit;
} }
if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
if (!initialized) if (!initialized)
{ {
#if defined(HEAP_H) #if defined(HEAP_H)
...@@ -414,6 +438,13 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien ...@@ -414,6 +438,13 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien
m->c->messageQueue = ListInitialize(); m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId); m->c->clientID = MQTTStrdup(clientId);
m->shouldBeConnected = 0;
if (options)
{
m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
m->createOptions = options;
}
#if !defined(NO_PERSISTENCE) #if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context); rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
if (rc == 0) if (rc == 0)
...@@ -435,6 +466,14 @@ exit: ...@@ -435,6 +466,14 @@ exit:
} }
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
persistence_context, NULL);
}
void MQTTAsync_terminate(void) void MQTTAsync_terminate(void)
{ {
FUNC_ENTRY; FUNC_ENTRY;
...@@ -770,6 +809,22 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size) ...@@ -770,6 +809,22 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
} }
void MQTTAsync_startConnectRetry(MQTTAsyncs* m)
{
if (m->automaticReconnect && m->shouldBeConnected)
{
m->lastConnectionFailedTime = MQTTAsync_start_clock();
if (m->retrying)
m->currentInterval = min(m->currentInterval * 2, m->maxRetryInterval);
else
{
m->currentInterval = m->minRetryInterval;
m->retrying = 1;
}
}
}
void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command) void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{ {
MQTTAsyncs* m = handle; MQTTAsyncs* m = handle;
...@@ -780,12 +835,16 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command) ...@@ -780,12 +835,16 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{ {
int was_connected = m->c->connected; int was_connected = m->c->connected;
MQTTAsync_closeSession(m->c); MQTTAsync_closeSession(m->c);
if (command->details.dis.internal && m->cl && was_connected) if (command->details.dis.internal)
{ {
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID); if (m->cl && was_connected)
(*(m->cl))(m->context, NULL); {
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
(*(m->cl))(m->context, NULL);
}
MQTTAsync_startConnectRetry(m);
} }
else if (!command->details.dis.internal && command->onSuccess) else if (command->onSuccess)
{ {
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, NULL); (*(command->onSuccess))(command->context, NULL);
...@@ -1216,6 +1275,7 @@ void MQTTAsync_checkTimeouts() ...@@ -1216,6 +1275,7 @@ void MQTTAsync_checkTimeouts()
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
} }
MQTTAsync_startConnectRetry(m);
} }
continue; continue;
} }
...@@ -1245,6 +1305,20 @@ void MQTTAsync_checkTimeouts() ...@@ -1245,6 +1305,20 @@ void MQTTAsync_checkTimeouts()
} }
for (i = 0; i < timed_out_count; ++i) for (i = 0; i < timed_out_count; ++i)
ListRemoveHead(m->responses); /* remove the first response in the list */ ListRemoveHead(m->responses); /* remove the first response in the list */
if (m->automaticReconnect && m->retrying)
{
if (MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000))
{
/* put the connect command to the head of the command queue, using the next serverURI */
MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
}
} }
MQTTAsync_unlock_mutex(mqttasync_mutex); MQTTAsync_unlock_mutex(mqttasync_mutex);
exit: exit:
...@@ -1271,7 +1345,6 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n) ...@@ -1271,7 +1345,6 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
break; /* no commands were processed, so go into a wait */ break; /* no commands were processed, so go into a wait */
} }
#if !defined(WIN32) && !defined(WIN64) #if !defined(WIN32) && !defined(WIN64)
rc = Thread_wait_cond(send_cond, 1);
if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT) if ((rc = Thread_wait_cond(send_cond, 1)) != 0 && rc != ETIMEDOUT)
Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc); Log(LOG_ERROR, -1, "Error %d waiting for condition variable", rc);
#else #else
...@@ -1384,6 +1457,8 @@ void MQTTAsync_destroy(MQTTAsync* handle) ...@@ -1384,6 +1457,8 @@ void MQTTAsync_destroy(MQTTAsync* handle)
if (m->serverURI) if (m->serverURI)
free(m->serverURI); free(m->serverURI);
if (m->createOptions)
free(m->createOptions);
if (!ListRemove(handles, m)) if (!ListRemove(handles, m))
Log(LOG_ERROR, -1, "free error"); Log(LOG_ERROR, -1, "free error");
*handle = NULL; *handle = NULL;
...@@ -1425,6 +1500,7 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack) ...@@ -1425,6 +1500,7 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc); Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTASYNC_SUCCESS) if ((rc = connack->rc) == MQTTASYNC_SUCCESS)
{ {
m->retrying = 0;
m->c->connected = 1; m->c->connected = 1;
m->c->good = 1; m->c->good = 1;
m->c->connect_state = 0; m->c->connect_state = 0;
...@@ -1446,6 +1522,12 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack) ...@@ -1446,6 +1522,12 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
} }
free(connack); free(connack);
m->pack = NULL; m->pack = NULL;
#if !defined(WIN32) && !defined(WIN64)
Thread_signal_cond(send_cond);
#else
if (!Thread_check_sem(send_sem))
Thread_post_sem(send_sem);
#endif
} }
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -1585,6 +1667,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n) ...@@ -1585,6 +1667,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data); (*(m->connect.onFailure))(m->connect.context, &data);
} }
MQTTAsync_startConnectRetry(m);
} }
} }
} }
...@@ -1924,9 +2007,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -1924,9 +2007,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
goto exit; goto exit;
} }
if (strncmp(options->struct_id, "MQTC", 4) != 0 || if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 4)
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2 &&
options->struct_version != 3))
{ {
rc = MQTTASYNC_BAD_STRUCTURE; rc = MQTTASYNC_BAD_STRUCTURE;
goto exit; goto exit;
...@@ -1982,10 +2063,16 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -1982,10 +2063,16 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->keepAliveInterval = options->keepAliveInterval; m->c->keepAliveInterval = options->keepAliveInterval;
m->c->cleansession = options->cleansession; m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = options->maxInflight; m->c->maxInflightMessages = options->maxInflight;
if (options->struct_version == 3) if (options->struct_version >= 3)
m->c->MQTTVersion = options->MQTTVersion; m->c->MQTTVersion = options->MQTTVersion;
else else
m->c->MQTTVersion = 0; m->c->MQTTVersion = 0;
if (options->struct_version >= 4)
{
m->automaticReconnect = options->automaticReconnect;
m->minRetryInterval = options->minRetryInterval;
m->maxRetryInterval = options->maxRetryInterval;
}
if (m->c->will) if (m->c->will)
{ {
...@@ -2042,6 +2129,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2042,6 +2129,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->username = options->username; m->c->username = options->username;
m->c->password = options->password; m->c->password = options->password;
m->c->retryInterval = options->retryInterval; m->c->retryInterval = options->retryInterval;
m->shouldBeConnected = 1;
/* Add connect request to operation queue */ /* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand)); conn = malloc(sizeof(MQTTAsync_queuedCommand));
...@@ -2086,6 +2174,8 @@ int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* o ...@@ -2086,6 +2174,8 @@ int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* o
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
goto exit; goto exit;
} }
if (!internal)
m->shouldBeConnected = 0;
if (m->c->connected == 0) if (m->c->connected == 0)
{ {
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
...@@ -2357,7 +2447,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -2357,7 +2447,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
FUNC_ENTRY; FUNC_ENTRY;
if (m == NULL || m->c == NULL) if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE; rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0) else if (m->c->connected == 0 && (m->createOptions == NULL ||
m->createOptions->send_while_disconnected == 0 || m->shouldBeConnected == 0))
rc = MQTTASYNC_DISCONNECTED; rc = MQTTASYNC_DISCONNECTED;
else if (!UTF8_validateString(destinationName)) else if (!UTF8_validateString(destinationName))
rc = MQTTASYNC_BAD_UTF8_STRING; rc = MQTTASYNC_BAD_UTF8_STRING;
...@@ -2546,6 +2637,7 @@ exit: ...@@ -2546,6 +2637,7 @@ exit:
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
} }
MQTTAsync_startConnectRetry(m);
} }
} }
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
...@@ -2558,7 +2650,6 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2558,7 +2650,6 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
struct timeval tp = {0L, 0L}; struct timeval tp = {0L, 0L};
static Ack ack; static Ack ack;
MQTTPacket* pack = NULL; MQTTPacket* pack = NULL;
static int nosockets_count = 0;
FUNC_ENTRY; FUNC_ENTRY;
if (timeout > 0L) if (timeout > 0L)
...@@ -2574,18 +2665,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2574,18 +2665,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */ /* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
*sock = Socket_getReadySocket(0, &tp); *sock = Socket_getReadySocket(0, &tp);
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L)) if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
{
MQTTAsync_sleep(100L); MQTTAsync_sleep(100L);
#if 0
if (s.clientsds->count == 0)
{
if (++nosockets_count == 50) /* 5 seconds with no sockets */
tostop = 1;
}
#endif
}
else
nosockets_count = 0;
#if defined(OPENSSL) #if defined(OPENSSL)
} }
#endif #endif
...@@ -2626,6 +2706,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc) ...@@ -2626,6 +2706,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID); Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL); (*(m->connect.onFailure))(m->connect.context, NULL);
} }
MQTTAsync_startConnectRetry(m);
} }
} }
} }
......
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2009, 2015 IBM Corp. * Copyright (c) 2009, 2016 IBM Corp.
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* Ian Craggs, Allan Stockdill-Mander - SSL connections * Ian Craggs, Allan Stockdill-Mander - SSL connections
* Ian Craggs - multiple server connection support * Ian Craggs - multiple server connection support
* Ian Craggs - MQTT 3.1.1 support * Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
*******************************************************************************/ *******************************************************************************/
/********************************************************************/ /********************************************************************/
...@@ -23,7 +24,7 @@ ...@@ -23,7 +24,7 @@
* @cond MQTTAsync_main * @cond MQTTAsync_main
* @mainpage Asynchronous MQTT client library for C * @mainpage Asynchronous MQTT client library for C
* *
* &copy; Copyright IBM Corp. 2009, 2015 * &copy; Copyright IBM Corp. 2009, 2016
* *
* @brief An Asynchronous MQTT client library for C. * @brief An Asynchronous MQTT client library for C.
* *
...@@ -483,6 +484,24 @@ DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_ ...@@ -483,6 +484,24 @@ DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_
DLLExport int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId, DLLExport int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context); int persistence_type, void* persistence_context);
typedef struct
{
/** The eyecatcher for this structure. must be MQCO. */
const char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Whether to allow messages to be sent when the client library is not connected. */
int send_while_disconnected;
/** the maximum number of messages allowed to be buffered while not connected. */
int max_buffered_messages;
} MQTTAsync_createOptions;
#define MQTTAsync_createOptions_initializer { {'M', 'Q', 'C', 'O'}, 0, 0, 100 }
DLLExport int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTAsync_createOptions* options);
/** /**
* MQTTAsync_willOptions defines the MQTT "Last Will and Testament" (LWT) settings for * MQTTAsync_willOptions defines the MQTT "Last Will and Testament" (LWT) settings for
* the client. In the event that a client unexpectedly loses its connection to * the client. In the event that a client unexpectedly loses its connection to
...@@ -578,10 +597,11 @@ typedef struct ...@@ -578,10 +597,11 @@ typedef struct
{ {
/** The eyecatcher for this structure. must be MQTC. */ /** The eyecatcher for this structure. must be MQTC. */
const char struct_id[4]; const char struct_id[4];
/** The version number of this structure. Must be 0, 1 or 2. /** The version number of this structure. Must be 0, 1, 2, 3 or 4.
* 0 signifies no SSL options and no serverURIs * 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs * 1 signifies no serverURIs
* 2 signifies no MQTTVersion * 2 signifies no MQTTVersion
* 3 signifies no automatic reconnect options
*/ */
int struct_version; int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time /** The "keep alive" interval, measured in seconds, defines the maximum time
...@@ -690,10 +710,23 @@ typedef struct ...@@ -690,10 +710,23 @@ typedef struct
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1 * MQTTVERSION_3_1_1 (4) = only try version 3.1.1
*/ */
int MQTTVersion; int MQTTVersion;
/**
* Reconnect automatically in the case of a connection being lost?
*/
int automaticReconnect;
/**
* Minimum retry interval in seconds. Doubled on each failed retry.
*/
int minRetryInterval;
/**
* Maximum retry interval in seconds. The doubling stops here on failed retries.
*/
int maxRetryInterval;
} MQTTAsync_connectOptions; } MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 3, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0} #define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 4, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0, \
0, 1, 60}
/** /**
* This function attempts to connect a previously-created client (see * This function attempts to connect a previously-created client (see
......
...@@ -59,8 +59,8 @@ ...@@ -59,8 +59,8 @@
#define URI_TCP "tcp://" #define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "##MQTTCLIENT_BUILD_TAG##" #define BUILD_TIMESTAMP "Sun Feb 14 19:24:42 GMT 2016"
#define CLIENT_VERSION "##MQTTCLIENT_VERSION_TAG##" #define CLIENT_VERSION "1.0.3"
char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP; char* client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION; char* client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
......
...@@ -114,11 +114,15 @@ void onDisconnect(void* context, MQTTAsync_successData* response) ...@@ -114,11 +114,15 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
static int connected = 0; static int connected = 0;
void myconnect(MQTTAsync* client);
void onConnectFailure(void* context, MQTTAsync_failureData* response) void onConnectFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Connect failed, rc %d\n", response ? -1 : response->code); printf("Connect failed, rc %d\n", response ? response->code : -1);
connected = -1; connected = -1;
MQTTAsync client = (MQTTAsync)context;
//myconnect(client);
} }
...@@ -131,7 +135,6 @@ void onConnect(void* context, MQTTAsync_successData* response) ...@@ -131,7 +135,6 @@ void onConnect(void* context, MQTTAsync_successData* response)
connected = 1; connected = 1;
} }
void myconnect(MQTTAsync* client) void myconnect(MQTTAsync* client)
{ {
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
...@@ -140,7 +143,7 @@ void myconnect(MQTTAsync* client) ...@@ -140,7 +143,7 @@ void myconnect(MQTTAsync* client)
printf("Connecting\n"); printf("Connecting\n");
conn_opts.keepAliveInterval = 10; conn_opts.keepAliveInterval = 10;
conn_opts.cleansession = 1; conn_opts.cleansession = 0;
conn_opts.username = opts.username; conn_opts.username = opts.username;
conn_opts.password = opts.password; conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect; conn_opts.onSuccess = onConnect;
...@@ -148,18 +151,13 @@ void myconnect(MQTTAsync* client) ...@@ -148,18 +151,13 @@ void myconnect(MQTTAsync* client)
conn_opts.context = client; conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0; ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts; conn_opts.ssl = &ssl_opts;
conn_opts.automaticReconnect = 1;
connected = 0; connected = 0;
if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start connect, return code %d\n", rc); printf("Failed to start connect, return code %d\n", rc);
exit(-1); exit(-1);
} }
while (connected == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
} }
...@@ -183,135 +181,7 @@ void onPublish(void* context, MQTTAsync_successData* response) ...@@ -183,135 +181,7 @@ void onPublish(void* context, MQTTAsync_successData* response)
void connectionLost(void* context, char* cause) void connectionLost(void* context, char* cause)
{ {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; //myconnect(client);
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0;
printf("Connecting\n");
conn_opts.keepAliveInterval = 10;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
connected = 0;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
}
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define setenv(a, b, c) _putenv_s(a, b)
#endif
#if !defined(SOCKET_ERROR)
#define SOCKET_ERROR -1
#endif
typedef struct
{
int socket;
time_t lastContact;
#if defined(OPENSSL)
SSL* ssl;
SSL_CTX* ctx;
#endif
} networkHandles;
typedef struct
{
char* clientID; /**< the string id of the client */
char* username; /**< MQTT v3.1 user name */
char* password; /**< MQTT v3.1 password */
unsigned int cleansession : 1; /**< MQTT clean session flag */
unsigned int connected : 1; /**< whether it is currently connected */
unsigned int good : 1; /**< if we have an error on the socket we turn this off */
unsigned int ping_outstanding : 1;
int connect_state : 4;
networkHandles net;
/* ... */
} Clients;
typedef struct MQTTAsync_struct
{
char* serverURI;
int ssl;
Clients* c;
/* "Global", to the client, callback definitions */
MQTTAsync_connectionLost* cl;
MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc;
void* context; /* the context to be associated with the main callbacks*/
#if 0
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */
List* responses;
unsigned int command_seqno;
MQTTPacket* pack;
#endif
} MQTTAsyncs;
int test6_socket_error(char* aString, int sock)
{
#if defined(WIN32)
int errno;
#endif
#if defined(WIN32)
errno = WSAGetLastError();
#endif
if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
{
if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
printf("Socket error %d in %s for socket %d", errno, aString, sock);
}
return errno;
}
int test6_socket_close(int socket)
{
int rc;
#if defined(WIN32)
if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
test6_socket_error("shutdown", socket);
if ((rc = closesocket(socket)) == SOCKET_ERROR)
test6_socket_error("close", socket);
#else
if (shutdown(socket, SHUT_RDWR) == SOCKET_ERROR)
test6_socket_error("shutdown", socket);
if ((rc = close(socket)) == SOCKET_ERROR)
test6_socket_error("close", socket);
#endif
return rc;
} }
...@@ -319,6 +189,7 @@ int main(int argc, char** argv) ...@@ -319,6 +189,7 @@ int main(int argc, char** argv)
{ {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client; MQTTAsync client;
char* topic = NULL; char* topic = NULL;
char* buffer = NULL; char* buffer = NULL;
...@@ -337,7 +208,8 @@ int main(int argc, char** argv) ...@@ -337,7 +208,8 @@ int main(int argc, char** argv)
topic = argv[1]; topic = argv[1];
printf("Using topic %s\n", topic); printf("Using topic %s\n", topic);
rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); create_opts.send_while_disconnected = 1;
rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
...@@ -371,19 +243,9 @@ int main(int argc, char** argv) ...@@ -371,19 +243,9 @@ int main(int argc, char** argv)
pub_opts.onFailure = onPublishFailure; pub_opts.onFailure = onPublishFailure;
do do
{ {
published = 0;
rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts); rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
while (published == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(1000L);
#endif
if (published == -1)
myconnect(&client);
test6_socket_close(((MQTTAsyncs*)client)->c->net.socket);
} }
while (published != 1); while (rc != MQTTASYNC_SUCCESS);
} }
printf("Stopping\n"); printf("Stopping\n");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment