Commit fca13278 authored by Ian Craggs's avatar Ian Craggs Committed by GitHub

Merge pull request #290 from Lance0312/fix-serverURIs-iterating

Fix `serverURIs` iterating in MQTTAsync.c
parents 149a9ea2 42f5c135
...@@ -430,7 +430,7 @@ static int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client) ...@@ -430,7 +430,7 @@ static int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
int rc; int rc;
FUNC_ENTRY; FUNC_ENTRY;
rc = command->details.conn.currentURI < client->serverURIcount || rc = command->details.conn.currentURI + 1 < client->serverURIcount ||
(command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT); (command->details.conn.MQTTVersion == 4 && client->c->MQTTVersion == MQTTVERSION_DEFAULT);
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
...@@ -1162,17 +1162,6 @@ static int MQTTAsync_processCommand(void) ...@@ -1162,17 +1162,6 @@ static int MQTTAsync_processCommand(void)
if (command->client->serverURIcount > 0) if (command->client->serverURIcount > 0)
{ {
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
command->command.details.conn.currentURI++;
serverURI = command->client->serverURIs[command->command.details.conn.currentURI]; serverURI = command->client->serverURIs[command->command.details.conn.currentURI];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0) if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
...@@ -1321,6 +1310,18 @@ static int MQTTAsync_processCommand(void) ...@@ -1321,6 +1310,18 @@ static int MQTTAsync_processCommand(void)
if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client)) if (command->command.type == CONNECT && MQTTAsync_checkConn(&command->command, command->client))
{ {
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
if (command->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (command->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
command->command.details.conn.currentURI++;
command->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
command->command.details.conn.currentURI++;
/* put the connect command back to the head of the command queue, using the next serverURI */ /* put the connect command back to the head of the command queue, using the next serverURI */
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn)); rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
} }
...@@ -1358,6 +1359,18 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message) ...@@ -1358,6 +1359,18 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
conn->client = m; conn->client = m;
conn->command = m->connect; conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, more to try"); Log(TRACE_MIN, -1, "Connect failed, more to try");
if (conn->client->c->MQTTVersion == MQTTVERSION_DEFAULT)
{
if (conn->command.details.conn.MQTTVersion == MQTTVERSION_3_1)
{
conn->command.details.conn.currentURI++;
conn->command.details.conn.MQTTVersion = MQTTVERSION_DEFAULT;
}
}
else
conn->command.details.conn.currentURI++;
MQTTAsync_addCommand(conn, sizeof(m->connect)); MQTTAsync_addCommand(conn, sizeof(m->connect));
} }
else else
......
...@@ -373,11 +373,29 @@ ADD_TEST( ...@@ -373,11 +373,29 @@ ADD_TEST(
COMMAND test8 "--test_no" "4" "--connection" ${MQTT_TEST_BROKER} "--size" "500000" COMMAND test8 "--test_no" "4" "--connection" ${MQTT_TEST_BROKER} "--size" "500000"
) )
ADD_TEST(
NAME test8-5a-all-ha-connections-out-of-service
COMMAND test8 "--test_no" "5" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test8-5b-all-ha-connections-out-of-service-except-the-last-one
COMMAND test8 "--test_no" "6" "--connection" ${MQTT_TEST_BROKER}
)
ADD_TEST(
NAME test8-5c-all-ha-connections-out-of-service-except-the-first-one
COMMAND test8 "--test_no" "7" "--connection" ${MQTT_TEST_BROKER}
)
SET_TESTS_PROPERTIES( SET_TESTS_PROPERTIES(
test8-1-basic-connect-subscribe-receive test8-1-basic-connect-subscribe-receive
test8-2-connect-timeout test8-2-connect-timeout
test8-3-multiple-client-objects-simultaneous-working test8-3-multiple-client-objects-simultaneous-working
test8-4-send-receive-big-messages test8-4-send-receive-big-messages
test8-5a-all-ha-connections-out-of-service
test8-5b-all-ha-connections-out-of-service-except-the-last-one
test8-5c-all-ha-connections-out-of-service-except-the-first-one
PROPERTIES TIMEOUT 540 PROPERTIES TIMEOUT 540
) )
......
...@@ -876,6 +876,216 @@ exit: ...@@ -876,6 +876,216 @@ exit:
} }
int test5_onConnect_called = 0;
int test5_onFailure_called = 0;
void test5_onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
test5_onConnect_called++;
test_finished = 1;
}
void test5_onConnectFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test5_onFailure_called++;
test_finished = 1;
}
/*********************************************************************
Test5a: All HA connections out of service.
*********************************************************************/
int test5a(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
char* test_topic = "C client test5a";
char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", "tcp://localhost:1882"};
failures = 0;
MyLog(LOGA_INFO, "Starting test 5a - All HA connections out of service");
rc = MQTTAsync_create(&c, "rubbish", "all_ha_down",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.onSuccess = test5_onConnect;
opts.onFailure = test5_onConnectFailure;
opts.context = c;
opts.serverURIcount = 3;
opts.serverURIs = serverURIs;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
assert("Connect onFailure should be called once", test5_onFailure_called == 1,
"connect onFailure was called %d times", test5_onFailure_called);
MyLog(LOGA_INFO, "TEST5a: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
return failures;
}
/*********************************************************************
Test5b: All HA connections out of service except the last one.
*********************************************************************/
int test5b(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
char* test_topic = "C client test5b";
char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", options.connection};
failures = 0;
MyLog(LOGA_INFO, "Starting test 5b - All HA connections out of service except the last one");
rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_last_one",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.onSuccess = test5_onConnect;
opts.onFailure = test5_onConnectFailure;
opts.context = c;
opts.serverURIcount = 3;
opts.serverURIs = serverURIs;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
assert("Connect onConnect should be called once", test5_onConnect_called == 1,
"connect onConnect was called %d times", test5_onConnect_called);
MyLog(LOGA_INFO, "TEST5b: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
return failures;
}
/*********************************************************************
Test5c: All HA connections out of service except the first one.
*********************************************************************/
int test5c(struct Options options)
{
MQTTAsync c;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
char* test_topic = "C client test5c";
char* serverURIs[3] = {options.connection, "tcp://localhost:1881", "tcp://localhost:1882"};
failures = 0;
MyLog(LOGA_INFO, "Starting test 5c - All HA connections out of service except the first one");
rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_first_one",
MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.onSuccess = test5_onConnect;
opts.onFailure = test5_onConnectFailure;
opts.context = c;
opts.serverURIcount = 3;
opts.serverURIs = serverURIs;
MyLog(LOGA_DEBUG, "Connecting");
rc = MQTTAsync_connect(c, &opts);
rc = 0;
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
if (rc != MQTTASYNC_SUCCESS)
goto exit;
while (!test_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&c);
exit:
assert("Connect onConnect should be called once", test5_onConnect_called == 1,
"connect onConnect was called %d times", test5_onConnect_called);
MyLog(LOGA_INFO, "TEST5c: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", tests, failures);
return failures;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
if (strstr(message, "onnect") && !strstr(message, "isconnect")) if (strstr(message, "onnect") && !strstr(message, "isconnect"))
...@@ -886,7 +1096,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) ...@@ -886,7 +1096,7 @@ void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int rc = 0; int rc = 0;
int (*tests[])() = {NULL, test1, test2, test3, test4}; /* indexed starting from 1 */ int (*tests[])() = {NULL, test1, test2, test3, test4, test5a, test5b, test5c}; /* indexed starting from 1 */
MQTTAsync_nameValue* info; MQTTAsync_nameValue* info;
getopts(argc, argv); getopts(argc, argv);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment