Commit 28e3c018 authored by Ian Craggs's avatar Ian Craggs

MQTT 3.1.1

parent 2b9cff6a
......@@ -248,6 +248,7 @@ typedef struct
int serverURIcount;
char** serverURIs;
int currentURI;
int MQTTVersion;
} conn;
} details;
} MQTTAsync_command;
......@@ -334,6 +335,18 @@ void MQTTAsync_unlock_mutex(mutex_type amutex)
}
int MQTTAsync_checkConn(MQTTAsync_command* command)
{
int rc;
FUNC_ENTRY;
rc = command->details.conn.currentURI < command->details.conn.serverURIcount ||
command->details.conn.MQTTVersion == 4;
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
int persistence_type, void* persistence_context)
{
......@@ -963,7 +976,13 @@ void MQTTAsync_processCommand()
if (command->command.details.conn.serverURIcount > 0)
{
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI++];
if (command->command.details.conn.MQTTVersion == 3)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = 4;
}
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
......@@ -976,11 +995,16 @@ void MQTTAsync_processCommand()
#endif
}
Log(TRACE_MIN, -1, "Connecting to serverURI %s", serverURI);
if (command->command.details.conn.MQTTVersion == 0)
command->command.details.conn.MQTTVersion = 4;
else if (command->command.details.conn.MQTTVersion == 4)
command->command.details.conn.MQTTVersion = 3;
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion);
#else
rc = MQTTProtocol_connect(serverURI, command->client->c);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
......@@ -1097,12 +1121,10 @@ void MQTTAsync_processCommand()
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT &&
command->command.details.conn.currentURI < command->command.details.conn.serverURIcount)
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command))
{
Log(TRACE_MIN, -1, "Connect failed, more to try");
/* put the connect command back to the head of the command queue, using the next serverURI */
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
command->command.details.conn.serverURIs[command->command.details.conn.currentURI]);
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
}
else
......@@ -1153,7 +1175,7 @@ void MQTTAsync_checkTimeouts()
/* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connect.details.conn.timeout * 1000))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
if (MQTTAsync_checkConn(&m->connect))
{
MQTTAsync_queuedCommand* conn;
......@@ -1163,8 +1185,7 @@ void MQTTAsync_checkTimeouts()
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
......@@ -1477,7 +1498,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
if (m->connect.details.conn.serverURIcount > 0)
Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI - 1]);
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onSuccess)
{
......@@ -1487,7 +1508,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
}
else
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
if (MQTTAsync_checkConn(&m->connect))
{
MQTTAsync_queuedCommand* conn;
......@@ -1497,8 +1518,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
......@@ -2506,10 +2526,11 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
rc = SOCKET_ERROR;
goto exit;
}
else if (rc == 1) {
else if (rc == 1)
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
......@@ -2528,7 +2549,7 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
{
#endif
m->c->connect_state = 3; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
goto exit;
#if defined(OPENSSL)
}
......@@ -2543,7 +2564,7 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
goto exit;
}
#endif
......@@ -2551,7 +2572,7 @@ int MQTTAsync_connecting(MQTTAsyncs* m)
exit:
if ((rc != 0 && m->c->connect_state != 2) || (rc == SSL_FATAL))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
if (MQTTAsync_checkConn(&m->connect))
{
MQTTAsync_queuedCommand* conn;
......@@ -2561,8 +2582,7 @@ exit:
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
......@@ -2630,7 +2650,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR))
{
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
if (MQTTAsync_checkConn(&m->connect))
{
MQTTAsync_queuedCommand* conn;
......@@ -2640,8 +2660,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
Log(TRACE_MIN, -1, "Connect failed, more to try");
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
......
This diff is collapsed.
......@@ -518,7 +518,7 @@ void MQTTPacket_freePublish(Publish* pack)
* @param type the MQTT packet type e.g. SUBACK
* @param msgid the MQTT message id to use
* @param dup boolean - whether to set the MQTT DUP flag
* @param socket the open socket to send the data to
* @param net the network handle to send the data to
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
......@@ -532,6 +532,8 @@ int MQTTPacket_send_ack(int type, int msgid, int dup, networkHandles *net)
header.byte = 0;
header.bits.type = type;
header.bits.dup = dup;
if (type == PUBREL)
header.bits.qos = 1;
writeInt(&ptr, msgid);
if ((rc = MQTTPacket_send(net, header, buf, 2)) != TCPSOCKET_INTERRUPTED)
free(buf);
......
......@@ -36,20 +36,20 @@
/**
* Send an MQTT CONNECT packet down a socket.
* @param client a structure from which to get all the required values
* @param MQTTVersion the MQTT version to connect with
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
*/
int MQTTPacket_send_connect(Clients* client)
int MQTTPacket_send_connect(Clients* client, int MQTTVersion)
{
char *buf, *ptr;
Connect packet;
int rc, len;
int rc = -1, len;
FUNC_ENTRY;
packet.header.byte = 0;
packet.header.bits.type = CONNECT;
packet.header.bits.qos = 1;
len = 12 + strlen(client->clientID)+2;
len = ((MQTTVersion == 3) ? 12 : 10) + strlen(client->clientID)+2;
if (client->will)
len += strlen(client->will->topic)+2 + strlen(client->will->msg)+2;
if (client->username)
......@@ -58,8 +58,18 @@ int MQTTPacket_send_connect(Clients* client)
len += strlen(client->password)+2;
ptr = buf = malloc(len);
if (MQTTVersion == 3)
{
writeUTF(&ptr, "MQIsdp");
writeChar(&ptr, (char)3);
}
else if (MQTTVersion == 4)
{
writeUTF(&ptr, "MQTT");
writeChar(&ptr, (char)4);
}
else
goto exit;
packet.flags.all = 0;
packet.flags.bits.cleanstart = client->cleansession;
......@@ -90,6 +100,7 @@ int MQTTPacket_send_connect(Clients* client)
rc = MQTTPacket_send(&client->net, packet.header, buf, len);
Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID, client->cleansession, rc);
exit:
free(buf);
FUNC_EXIT_RC(rc);
return rc;
......
......@@ -20,7 +20,7 @@
#include "MQTTPacket.h"
int MQTTPacket_send_connect(Clients* client);
int MQTTPacket_send_connect(Clients* client, int MQTTVersion);
void* MQTTPacket_connack(unsigned char aHeader, char* data, int datalen);
int MQTTPacket_send_pingreq(networkHandles* net, char* clientID);
......
......@@ -77,18 +77,15 @@ char* MQTTProtocol_addressPort(char* ip_address, int* port)
/**
* MQTT outgoing connect processing for a client
* @param ip_address the TCP address:port to connect to
* @param clientID the MQTT client id to use
* @param cleansession MQTT cleansession flag
* @param keepalive MQTT keepalive timeout in seconds
* @param willMessage pointer to the will message to be used, if any
* @param username MQTT 3.1 username, or NULL
* @param password MQTT 3.1 password, or NULL
* @return the new client structure
* @param aClient a structure with all MQTT data needed
* @param int ssl
* @param int MQTTVersion the MQTT version to connect with (3 or 4)
* @return return code
*/
#if defined(OPENSSL)
int MQTTProtocol_connect(char* ip_address, Clients* aClient, int ssl)
int MQTTProtocol_connect(char* ip_address, Clients* aClient, int ssl, int MQTTVersion)
#else
int MQTTProtocol_connect(char* ip_address, Clients* aClient)
int MQTTProtocol_connect(char* ip_address, Clients* aClient, int MQTTVersion)
#endif
{
int rc, port;
......@@ -120,7 +117,7 @@ int MQTTProtocol_connect(char* ip_address, Clients* aClient)
if (rc == 0)
{
/* Now send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(aClient)) == 0)
if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion)) == 0)
aClient->connect_state = 3; /* MQTT Connect sent - wait for CONNACK */
else
aClient->connect_state = 0;
......
......@@ -30,9 +30,9 @@
void MQTTProtocol_reconnect(char* ip_address, Clients* client);
#if defined(OPENSSL)
int MQTTProtocol_connect(char* ip_address, Clients* acClients, int ssl);
int MQTTProtocol_connect(char* ip_address, Clients* acClients, int ssl, int MQTTVersion);
#else
int MQTTProtocol_connect(char* ip_address, Clients* acClients);
int MQTTProtocol_connect(char* ip_address, Clients* acClients, int MQTTVersion);
#endif
int MQTTProtocol_handlePingresps(void* pack, int sock);
int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss);
......
......@@ -948,6 +948,17 @@ int test6_socket_close(int socket)
return rc;
}
typedef struct
{
int socket;
time_t lastContact;
#if defined(OPENSSL)
SSL* ssl;
SSL_CTX* ctx;
#endif
} networkHandles;
typedef struct
{
char* clientID; /**< the string id of the client */
......@@ -957,22 +968,12 @@ typedef struct
unsigned int connected : 1; /**< whether it is currently connected */
unsigned int good : 1; /**< if we have an error on the socket we turn this off */
unsigned int ping_outstanding : 1;
unsigned int connect_state : 2;
int socket;
int msgID;
int keepAliveInterval;
int retryInterval;
int maxInflightMessages;
time_t lastContact;
void* will;
void* inboundMsgs;
void* outboundMsgs; /**< in flight */
void* messageQueue;
void* phandle; /* the persistence handle */
MQTTClient_persistence* persistence; /* a persistence implementation */
int connectOptionsVersion;
int connect_state : 4;
networkHandles net;
/* ... */
} Clients;
typedef struct
{
char* serverURI;
......@@ -1056,7 +1057,7 @@ int test6(struct Options options)
/* now send the command which will break the connection and cause the will message to be sent */
/*rc = MQTTClient_publish(test6_c1, mqttsas_topic, strlen("TERMINATE"), "TERMINATE", 0, 0, NULL);
assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);*/
test6_socket_close(((MQTTClients*)test6_c1)->c->socket);
test6_socket_close(((MQTTClients*)test6_c1)->c->net.socket);
MyLog(LOGA_INFO, "Waiting to receive the will message");
count = 0;
......
......@@ -1095,7 +1095,6 @@ int test6(struct Options options)
usleep(10000L);
#endif
MQTTAsync_destroy(&cinfo.c);
exit:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment