Commit 205a5a20 authored by Ian Craggs's avatar Ian Craggs

Update utilities for MQTT V5 and websockets #504

parent ea99febb
...@@ -87,9 +87,12 @@ HEADERS_A = $(HEADERS) ...@@ -87,9 +87,12 @@ HEADERS_A = $(HEADERS)
SAMPLE_FILES_C = paho_cs_pub paho_cs_sub MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe SAMPLE_FILES_C = paho_cs_pub paho_cs_sub MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe
SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}} SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = paho_c_pub paho_c_sub MQTTAsync_subscribe MQTTAsync_publish SAMPLE_FILES_A = MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}} ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
UTIL_FILES_AS = paho_c_pub paho_c_sub
ASYNC_UTILS = ${addprefix ${blddir}/samples/,${UTIL_FILES_AS}}
TEST_FILES_C = test1 test15 test2 sync_client_test test_mqtt4sync test10 TEST_FILES_C = test1 test15 test2 sync_client_test test_mqtt4sync test10
SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}} SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}
...@@ -127,7 +130,7 @@ MQTTLIB_A_TARGET = ${blddir}/lib${MQTTLIB_A}.so.${VERSION} ...@@ -127,7 +130,7 @@ MQTTLIB_A_TARGET = ${blddir}/lib${MQTTLIB_A}.so.${VERSION}
MQTTLIB_AS_TARGET = ${blddir}/lib${MQTTLIB_AS}.so.${VERSION} MQTTLIB_AS_TARGET = ${blddir}/lib${MQTTLIB_AS}.so.${VERSION}
MQTTVERSION_TARGET = ${blddir}/MQTTVersion MQTTVERSION_TARGET = ${blddir}/MQTTVersion
CCFLAGS_SO = -g -fPIC $(CFLAGS) -Os -Wall -fvisibility=hidden -I$(blddir_work) CCFLAGS_SO = -g -fPIC $(CFLAGS) -Os -Wall -fvisibility=hidden -I$(blddir_work) -fpermissive
FLAGS_EXE = $(LDFLAGS) -I ${srcdir} -lpthread -L ${blddir} FLAGS_EXE = $(LDFLAGS) -I ${srcdir} -lpthread -L ${blddir}
FLAGS_EXES = $(LDFLAGS) -I ${srcdir} ${START_GROUP} -lpthread -lssl -lcrypto ${END_GROUP} -L ${blddir} FLAGS_EXES = $(LDFLAGS) -I ${srcdir} ${START_GROUP} -lpthread -lssl -lcrypto ${END_GROUP} -L ${blddir}
...@@ -178,7 +181,7 @@ endif ...@@ -178,7 +181,7 @@ endif
all: build all: build
build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${ASYNC_SAMPLES} ${SYNC_TESTS} ${SYNC_SSL_TESTS} ${ASYNC_TESTS} ${ASYNC_SSL_TESTS} build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${ASYNC_SAMPLES} ${ASYNC_UTILS} ${SYNC_TESTS} ${SYNC_SSL_TESTS} ${ASYNC_TESTS} ${ASYNC_SSL_TESTS}
clean: clean:
rm -rf ${blddir}/* rm -rf ${blddir}/*
...@@ -202,10 +205,13 @@ ${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET) ...@@ -202,10 +205,13 @@ ${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET)
${CC} -g -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES} ${CC} -g -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES}
${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_C_TARGET) ${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_C_TARGET)
${CC} -o $@ $< -l${MQTTLIB_C} ${FLAGS_EXE} ${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES}
${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_A_TARGET) ${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_A_TARGET)
${CC} -o $@ $< -l${MQTTLIB_A} ${FLAGS_EXE} ${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES}
${ASYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_AS_TARGET)
${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c
$(blddir_work)/VersionInfo.h: $(srcdir)/VersionInfo.h.in $(blddir_work)/VersionInfo.h: $(srcdir)/VersionInfo.h.in
$(SED_COMMAND) $< > $@ $(SED_COMMAND) $< > $@
......
...@@ -2730,7 +2730,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2730,7 +2730,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
rc = MQTTASYNC_BAD_MQTT_OPTIONS; rc = MQTTASYNC_BAD_MQTT_OPTIONS;
goto exit; goto exit;
} }
if (options->MQTTVersion < MQTTVERSION_5) if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
{ {
if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 || if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
options->connectProperties || options->willProperties) options->connectProperties || options->willProperties)
...@@ -2742,8 +2742,11 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2742,8 +2742,11 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->connect.onSuccess = options->onSuccess; m->connect.onSuccess = options->onSuccess;
m->connect.onFailure = options->onFailure; m->connect.onFailure = options->onFailure;
m->connect.onSuccess5 = options->onSuccess5; if (options->struct_version >= 6)
m->connect.onFailure5 = options->onFailure5; {
m->connect.onSuccess5 = options->onSuccess5;
m->connect.onFailure5 = options->onFailure5;
}
m->connect.context = options->context; m->connect.context = options->context;
m->connectTimeout = options->connectTimeout; m->connectTimeout = options->connectTimeout;
...@@ -2834,7 +2837,6 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options) ...@@ -2834,7 +2837,6 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
if (m->c->sslopts->CApath) if (m->c->sslopts->CApath)
free((void*)m->c->sslopts->CApath); free((void*)m->c->sslopts->CApath);
} }
free(m->c->sslopts);
free((void*)m->c->sslopts); free((void*)m->c->sslopts);
m->c->sslopts = NULL; m->c->sslopts = NULL;
} }
...@@ -3321,6 +3323,19 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen ...@@ -3321,6 +3323,19 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
rc = MQTTASYNC_NO_MORE_MSGIDS; rc = MQTTASYNC_NO_MORE_MSGIDS;
else if (m->createOptions && (MQTTAsync_countBufferedMessages(m) >= m->createOptions->maxBufferedMessages)) else if (m->createOptions && (MQTTAsync_countBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
rc = MQTTASYNC_MAX_BUFFERED_MESSAGES; rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;
else if (response)
{
if (m->c->MQTTVersion >= MQTTVERSION_5)
{
if (response->struct_version == 0 || response->onFailure || response->onSuccess)
rc = MQTTASYNC_BAD_MQTT_OPTIONS;
}
else if (m->c->MQTTVersion < MQTTVERSION_5)
{
if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
rc = MQTTASYNC_BAD_MQTT_OPTIONS;
}
}
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
goto exit; goto exit;
......
...@@ -478,7 +478,7 @@ int WebSocket_getch(networkHandles *net, char* c) ...@@ -478,7 +478,7 @@ int WebSocket_getch(networkHandles *net, char* c)
size_t actual_len = 0u; size_t actual_len = 0u;
rc = WebSocket_receiveFrame( net, 1u, &actual_len ); rc = WebSocket_receiveFrame( net, 1u, &actual_len );
if ( rc != TCPSOCKET_COMPLETE ) if ( rc != TCPSOCKET_COMPLETE )
return rc; goto exit;
/* we got a frame, let take off the top of queue */ /* we got a frame, let take off the top of queue */
if ( in_frames->first ) if ( in_frames->first )
...@@ -501,6 +501,7 @@ int WebSocket_getch(networkHandles *net, char* c) ...@@ -501,6 +501,7 @@ int WebSocket_getch(networkHandles *net, char* c)
else else
rc = Socket_getch(net->socket, c); rc = Socket_getch(net->socket, c);
exit:
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -543,7 +544,7 @@ char *WebSocket_getdata(networkHandles *net, size_t bytes, size_t* actual_len) ...@@ -543,7 +544,7 @@ char *WebSocket_getdata(networkHandles *net, size_t bytes, size_t* actual_len)
free( last_frame ); free( last_frame );
last_frame = ListDetachHead(in_frames); last_frame = ListDetachHead(in_frames);
} }
return rv; goto exit;
} }
/* no current frame, let's see if there's one in the list */ /* no current frame, let's see if there's one in the list */
...@@ -577,6 +578,7 @@ char *WebSocket_getdata(networkHandles *net, size_t bytes, size_t* actual_len) ...@@ -577,6 +578,7 @@ char *WebSocket_getdata(networkHandles *net, size_t bytes, size_t* actual_len)
else else
rv = WebSocket_getRawSocketData(net, bytes, actual_len); rv = WebSocket_getRawSocketData(net, bytes, actual_len);
exit:
rc = rv != NULL; rc = rv != NULL;
FUNC_EXIT_RC(rc); FUNC_EXIT_RC(rc);
return rv; return rv;
......
...@@ -29,15 +29,15 @@ IF (WIN32) ...@@ -29,15 +29,15 @@ IF (WIN32)
ENDIF() ENDIF()
# sample files c # sample files c
ADD_EXECUTABLE(paho_c_pub paho_c_pub.c) ADD_EXECUTABLE(paho_c_pub paho_c_pub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_c_sub paho_c_sub.c) ADD_EXECUTABLE(paho_c_sub paho_c_sub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c) ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c)
ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c) ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c)
TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3a) TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3as)
TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3a) TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3as)
TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3c) TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3cs)
TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3c) TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3cs)
ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c) ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c)
ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c) ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c)
......
...@@ -15,29 +15,8 @@ ...@@ -15,29 +15,8 @@
* Guilherme Maciel Ferreira - add keep alive option * Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/ *******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin-publisher-async
--maxdatalen 100
--keepalive 10
--userid none
--password none
*/
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include "pubsub_opts.h"
#include <stdio.h> #include <stdio.h>
#include <signal.h> #include <signal.h>
...@@ -59,44 +38,45 @@ ...@@ -59,44 +38,45 @@
volatile int toStop = 0; volatile int toStop = 0;
struct pubsub_opts opts =
struct
{ {
char* clientid; MQTTVERSION_DEFAULT, 0,
char* delimiter; NULL, "paho-c-pub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10,
int maxdatalen; NULL, NULL, 0, 0, /* will options */
int qos; 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
int retained; 0, {NULL, NULL}, /* publish properties */
char* username;
char* password;
char* host;
char* port;
int verbose;
int keepalive;
} opts =
{
"stdin-publisher-async", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0, 10
}; };
void usage(void) void usage(void)
{ {
printf("MQTT stdin publisher\n"); printf("Eclipse Paho MQTT C publisher\n"
printf("Usage: stdinpub topicname <options>, where options are:\n"); "Usage: paho_c_pub <topicname> <options>, where options are:\n"
printf(" --host <hostname> (default is %s)\n", opts.host); " -t (--topic) MQTT topic to publish to\n"
printf(" --port <port> (default is %s)\n", opts.port); " -h (--host) host to connect to (default is %s)\n"
printf(" --qos <qos> (default is %d)\n", opts.qos); " -p (--port) network port to connect to (default is %s)\n"
printf(" --retained (default is %s)\n", opts.retained ? "on" : "off"); " -c (--connection) connection string, overrides host/port e.g wss://hostname:port/ws\n"
printf(" --delimiter <delim> (default is \\n)\n"); " -q (--qos) MQTT QoS to publish on (0, 1 or 2) (default is %d)\n"
printf(" --clientid <clientid> (default is %s)\n", opts.clientid); " -r (--retained) use MQTT retain option? (default is %s)\n"
printf(" --maxdatalen <bytes> (default is %d)\n", opts.maxdatalen); " -i (--clientid) <clientid> (default is %s)\n"
printf(" --username none\n"); " -u (--username) MQTT username (default is none)\n"
printf(" --password none\n"); " -P (--password) MQTT password (default is none)\n"
printf(" --keepalive <seconds> (default is 10 seconds)\n"); " -k (--keepalive) MQTT keepalive timeout value (default is %d seconds)\n"
" --delimiter <delim> (default is \\n)\n"
" --maxdatalen <bytes> (default is %d)\n",
opts.host, opts.port, opts.qos, opts.retained ? "on" : "off",
opts.clientid, opts.maxdatalen, opts.keepalive);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void mysleep(int ms)
{
#if defined(WIN32)
Sleep(ms);
#else
usleep(ms * 1000);
#endif
}
void cfinish(int sig) void cfinish(int sig)
{ {
...@@ -104,7 +84,6 @@ void cfinish(int sig) ...@@ -104,7 +84,6 @@ void cfinish(int sig)
toStop = 1; toStop = 1;
} }
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{ {
...@@ -115,6 +94,11 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_messa ...@@ -115,6 +94,11 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_messa
static int disconnected = 0; static int disconnected = 0;
void onDisconnect5(void* context, MQTTAsync_successData5* response)
{
disconnected = 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response) void onDisconnect(void* context, MQTTAsync_successData* response)
{ {
disconnected = 1; disconnected = 1;
...@@ -124,6 +108,15 @@ void onDisconnect(void* context, MQTTAsync_successData* response) ...@@ -124,6 +108,15 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
static int connected = 0; static int connected = 0;
void myconnect(MQTTAsync* client); void myconnect(MQTTAsync* client);
void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
{
printf("Connect failed, rc %d reason code %d\n", response->code, response->reasonCode);
connected = -1;
MQTTAsync client = (MQTTAsync)context;
myconnect(client);
}
void onConnectFailure(void* context, MQTTAsync_failureData* response) void onConnectFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Connect failed, rc %d\n", response ? response->code : -1); printf("Connect failed, rc %d\n", response ? response->code : -1);
...@@ -134,72 +127,108 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -134,72 +127,108 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response)
} }
void onConnect(void* context, MQTTAsync_successData* response) void onConnect5(void* context, MQTTAsync_successData5* response)
{ {
printf("Connected\n"); if (opts.verbose)
printf("Connected\n");
connected = 1; connected = 1;
} }
void myconnect(MQTTAsync* client) void onConnect(void* context, MQTTAsync_successData* response)
{ {
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; if (opts.verbose)
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; printf("Connected\n");
int rc = 0; connected = 1;
printf("Connecting\n");
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
//conn_opts.ssl = &ssl_opts; need to link with SSL library for this to work
conn_opts.automaticReconnect = 1;
connected = 0;
if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
} }
static int published = 0; static int published = 0;
void onPublishFailure5(void* context, MQTTAsync_failureData5* response)
{
if (opts.verbose)
printf("Publish failed, rc %d reason code %d\n", response->code, response->reasonCode);
published = -1;
}
void onPublishFailure(void* context, MQTTAsync_failureData* response) void onPublishFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Publish failed, rc %d\n", response ? -1 : response->code); if (opts.verbose)
printf("Publish failed, rc %d\n", response->code);
published = -1; published = -1;
} }
void onPublish5(void* context, MQTTAsync_successData5* response)
{
if (opts.verbose)
printf("Publish succeeded, reason code %d\n", response->reasonCode);
published = 1;
}
void onPublish(void* context, MQTTAsync_successData* response) void onPublish(void* context, MQTTAsync_successData* response)
{ {
if (opts.verbose)
printf("Publish succeeded\n");
published = 1; published = 1;
} }
void connectionLost(void* context, char* cause) void myconnect(MQTTAsync* client)
{ {
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
int rc = 0; int rc = 0;
printf("Connecting\n"); if (opts.verbose)
conn_opts.keepAliveInterval = 10; printf("Connecting\n");
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
conn_opts.username = opts.username; conn_opts.username = opts.username;
conn_opts.password = opts.password; conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect; conn_opts.MQTTVersion = opts.MQTTVersion;
conn_opts.onFailure = onConnectFailure; if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
conn_opts = conn_opts5;
conn_opts.onSuccess5 = onConnect5;
conn_opts.onFailure5 = onConnectFailure5;
}
else
{
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
}
conn_opts.context = client; conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0; conn_opts.automaticReconnect = 1;
//conn_opts.ssl = &ssl_opts; need to link with SSL library for this to work
if (opts.will_topic) /* will options */
{
will_opts.message = opts.will_payload;
will_opts.topicName = opts.will_topic;
will_opts.qos = opts.will_qos;
will_opts.retained = opts.will_retain;
conn_opts.will = &will_opts;
}
if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
strncmp(opts.connection, "wss://", 6) == 0))
{
if (opts.insecure)
ssl_opts.enableServerCertAuth = 0;
ssl_opts.CApath = opts.capath;
ssl_opts.keyStore = opts.cert;
ssl_opts.trustStore = opts.cafile;
ssl_opts.privateKey = opts.key;
ssl_opts.privateKeyPassword = opts.keypass;
ssl_opts.enabledCipherSuites = opts.ciphers;
conn_opts.ssl = &ssl_opts;
}
connected = 0; connected = 0;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start connect, return code %d\n", rc); printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -207,36 +236,62 @@ void connectionLost(void* context, char* cause) ...@@ -207,36 +236,62 @@ void connectionLost(void* context, char* cause)
} }
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("Trace : %d, %s\n", level, message);
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client; MQTTAsync client;
char* topic = NULL;
char* buffer = NULL; char* buffer = NULL;
char* url = NULL;
int rc = 0; int rc = 0;
char url[100];
if (argc < 2) if (argc < 2)
usage(); usage();
getopts(argc, argv); if (getopts(argc, argv, &opts) != 0)
usage();
sprintf(url, "%s:%s", opts.host, opts.port); if (opts.connection)
url = opts.connection;
else
{
url = malloc(100);
sprintf(url, "%s:%s", opts.host, opts.port);
}
if (opts.verbose) if (opts.verbose)
printf("URL is %s\n", url); printf("URL is %s\n", url);
topic = argv[1]; if (argv[1][0] != '-')
printf("Using topic %s\n", topic); {
opts.topic = argv[1];
if (opts.verbose)
printf("Topic is %s\n", opts.topic);
}
if (opts.topic == NULL)
usage();
if (opts.tracelevel > 0)
{
MQTTAsync_setTraceCallback(trace_callback);
MQTTAsync_setTraceLevel(opts.tracelevel);
}
create_opts.sendWhileDisconnected = 1; create_opts.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
signal(SIGQUIT, cfinish);
rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL); rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
myconnect(&client); myconnect(&client);
...@@ -253,129 +308,68 @@ int main(int argc, char** argv) ...@@ -253,129 +308,68 @@ int main(int argc, char** argv)
buffer[data_len++] = getchar(); buffer[data_len++] = getchar();
if (data_len > delim_len) if (data_len > delim_len)
{ {
/* printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); */ if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0) break;
break;
} }
} while (data_len < opts.maxdatalen); } while (data_len < opts.maxdatalen);
if (opts.verbose) if (opts.verbose)
printf("Publishing data of length %d\n", data_len); printf("Publishing data of length %d\n", data_len);
pub_opts.onSuccess = onPublish; if (opts.MQTTVersion >= MQTTVERSION_5)
pub_opts.onFailure = onPublishFailure; {
do MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;
pub_opts.onSuccess5 = onPublish5;
pub_opts.onFailure5 = onPublishFailure5;
if (opts.message_expiry > 0)
{
property.identifier = MESSAGE_EXPIRY_INTERVAL;
property.value.integer4 = opts.message_expiry;
MQTTProperties_add(&props, &property);
}
if (opts.user_property.name)
{
property.identifier = USER_PROPERTY;
property.value.data.data = opts.user_property.name;
property.value.data.len = strlen(opts.user_property.name);
property.value.value.data = opts.user_property.value;
property.value.value.len = strlen(opts.user_property.value);
MQTTProperties_add(&props, &property);
}
pub_opts.properties = props;
}
else
{ {
rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts); pub_opts.onSuccess = onPublish;
pub_opts.onFailure = onPublishFailure;
} }
while (rc != MQTTASYNC_SUCCESS); rc = MQTTAsync_send(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
if (opts.verbose && rc != MQTTASYNC_SUCCESS)
printf("Error from MQTTAsync_send %d\n", rc);
} }
printf("Stopping\n"); printf("Stopping\n");
free(buffer); free(buffer);
disc_opts.onSuccess = onDisconnect; if (opts.MQTTVersion >= MQTTVERSION_5)
disc_opts.onSuccess5 = onDisconnect5;
else
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start disconnect, return code %d\n", rc); printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
while (!disconnected) while (!disconnected)
#if defined(WIN32) mysleep(100);
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client); MQTTAsync_destroy(&client);
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--keepalive") == 0)
{
if (++count < argc)
opts.keepalive = atoi(argv[count]);
else
usage();
}
count++;
}
}
...@@ -16,31 +16,9 @@ ...@@ -16,31 +16,9 @@
* Guilherme Maciel Ferreira - add keep alive option * Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/ *******************************************************************************/
/*
stdout subscriber for the asynchronous client
compulsory parameters:
--topic topic to subscribe to
defaulted parameters:
--host localhost
--port 1883
--qos 2
--delimiter \n
--clientid stdout-subscriber-async
--showtopics off
--keepalive 10
--userid none
--password none
*/
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include "MQTTClientPersistence.h" #include "MQTTClientPersistence.h"
#include "pubsub_opts.h"
#include <stdio.h> #include <stdio.h>
#include <signal.h> #include <signal.h>
...@@ -66,6 +44,15 @@ int subscribed = 0; ...@@ -66,6 +44,15 @@ int subscribed = 0;
int disconnected = 0; int disconnected = 0;
void mysleep(int ms)
{
#if defined(WIN32)
Sleep(ms);
#else
usleep(ms * 1000);
#endif
}
void cfinish(int sig) void cfinish(int sig)
{ {
signal(SIGINT, NULL); signal(SIGINT, NULL);
...@@ -73,24 +60,15 @@ void cfinish(int sig) ...@@ -73,24 +60,15 @@ void cfinish(int sig)
} }
struct struct pubsub_opts opts =
{ {
char* clientid; MQTTVERSION_DEFAULT, 0,
int nodelimiter; NULL, "paho-c-sub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10,
char delimiter; NULL, NULL, 0, 0, /* will options */
int qos; 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
char* username; 0, {NULL, NULL}, /* publish properties */
char* password;
char* host;
char* port;
int showtopics;
int keepalive;
} opts =
{
"stdout-subscriber-async", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0, 10
}; };
void usage(void) void usage(void)
{ {
printf("MQTT stdout subscriber\n"); printf("MQTT stdout subscriber\n");
...@@ -102,117 +80,61 @@ void usage(void) ...@@ -102,117 +80,61 @@ void usage(void)
printf(" --clientid <clientid> (default is %s)\n", opts.clientid); printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --username none\n"); printf(" --username none\n");
printf(" --password none\n"); printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n"); printf(" --verbose <on or off> (default is on if the topic has a wildcard, else off)\n");
printf(" --keepalive <seconds> (default is 10 seconds)\n"); printf(" --keepalive <seconds> (default is 10 seconds)\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void getopts(int argc, char** argv) void logProperties(MQTTProperties *props)
{ {
int count = 2; int i = 0;
while (count < argc) for (i = 0; i < props->count; ++i)
{ {
if (strcmp(argv[count], "--qos") == 0) int id = props->array[i].identifier;
{ const char* name = MQTTPropertyName(id);
if (++count < argc) char* intformat = "Property name %s value %d\n";
{
if (strcmp(argv[count], "0") == 0) switch (MQTTProperty_getType(id))
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
{
if (strcmp("newline", argv[count]) == 0)
opts.delimiter = '\n';
else
opts.delimiter = argv[count][0];
opts.nodelimiter = 0;
}
else
usage();
}
else if (strcmp(argv[count], "--showtopics") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--keepalive") == 0)
{ {
if (++count < argc) case PROPERTY_TYPE_BYTE:
opts.keepalive = atoi(argv[count]); printf(intformat, name, props->array[i].value.byte);
else break;
usage(); case TWO_BYTE_INTEGER:
printf(intformat, name, props->array[i].value.integer2);
break;
case FOUR_BYTE_INTEGER:
printf(intformat, name, props->array[i].value.integer4);
break;
case VARIABLE_BYTE_INTEGER:
printf(intformat, name, props->array[i].value.integer4);
break;
case BINARY_DATA:
case UTF_8_ENCODED_STRING:
printf("Property name %s value len %.*s", name,
props->array[i].value.data.len, props->array[i].value.data.data);
break;
case UTF_8_STRING_PAIR:
printf("Property name %s key %.*s value %.*s", name,
props->array[i].value.data.len, props->array[i].value.data.data,
props->array[i].value.value.len, props->array[i].value.value.data);
break;
} }
count++;
} }
} }
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{ {
if (opts.showtopics) if (opts.verbose)
printf("%s\t", topicName); printf("%s\t", topicName);
if (opts.nodelimiter) if (opts.delimiter == NULL)
printf("%.*s", message->payloadlen, (char*)message->payload); printf("%.*s", message->payloadlen, (char*)message->payload);
else else
printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter); printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter[0]);
if (message->struct_version == 1 && opts.verbose)
logProperties(&message->properties);
fflush(stdout); fflush(stdout);
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName); MQTTAsync_free(topicName);
...@@ -226,12 +148,24 @@ void onDisconnect(void* context, MQTTAsync_successData* response) ...@@ -226,12 +148,24 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
} }
void onSubscribe5(void* context, MQTTAsync_successData5* response)
{
subscribed = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response) void onSubscribe(void* context, MQTTAsync_successData* response)
{ {
subscribed = 1; subscribed = 1;
} }
void onSubscribeFailure5(void* context, MQTTAsync_failureData5* response)
{
printf("Subscribe failed, rc %d reason code %d\n", response->code, response->reasonCode);
finished = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response) void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Subscribe failed, rc %d\n", response->code); printf("Subscribe failed, rc %d\n", response->code);
...@@ -239,6 +173,13 @@ void onSubscribeFailure(void* context, MQTTAsync_failureData* response) ...@@ -239,6 +173,13 @@ void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
} }
void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
{
printf("Connect failed, rc %d reason code %d\n", response->code, response->reasonCode);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response) void onConnectFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Connect failed, rc %d\n", response ? response->code : -99); printf("Connect failed, rc %d\n", response ? response->code : -99);
...@@ -246,13 +187,33 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -246,13 +187,33 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response)
} }
void onConnect5(void* context, MQTTAsync_successData5* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_callOptions copts = MQTTAsync_callOptions_initializer;
int rc;
if (opts.verbose)
printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos);
copts.onSuccess5 = onSubscribe5;
copts.onFailure5 = onSubscribeFailure5;
copts.context = client;
if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
void onConnect(void* context, MQTTAsync_successData* response) void onConnect(void* context, MQTTAsync_successData* response)
{ {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
int rc; int rc;
if (opts.showtopics) if (opts.verbose)
printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos);
ropts.onSuccess = onSubscribe; ropts.onSuccess = onSubscribe;
...@@ -265,21 +226,12 @@ void onConnect(void* context, MQTTAsync_successData* response) ...@@ -265,21 +226,12 @@ void onConnect(void* context, MQTTAsync_successData* response)
} }
} }
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
void connectionLost(void *context, char *cause) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
MQTTAsync client = (MQTTAsync)context; printf("Trace : %d, %s\n", level, message);
int rc;
printf("connectionLost called\n");
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
} }
...@@ -287,8 +239,10 @@ int main(int argc, char** argv) ...@@ -287,8 +239,10 @@ int main(int argc, char** argv)
{ {
MQTTAsync client; MQTTAsync client;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char url[100]; char* url = NULL;
if (argc < 2) if (argc < 2)
usage(); usage();
...@@ -296,16 +250,32 @@ int main(int argc, char** argv) ...@@ -296,16 +250,32 @@ int main(int argc, char** argv)
topic = argv[1]; topic = argv[1];
if (strchr(topic, '#') || strchr(topic, '+')) if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1; opts.verbose = 1;
if (opts.showtopics) if (opts.verbose)
printf("topic is %s\n", topic); printf("topic is %s\n", topic);
getopts(argc, argv); if (getopts(argc, argv, &opts) != 0)
sprintf(url, "%s:%s", opts.host, opts.port); usage();
if (opts.connection)
url = opts.connection;
else
{
url = malloc(100);
sprintf(url, "%s:%s", opts.host, opts.port);
}
if (opts.verbose)
printf("URL is %s\n", url);
if (opts.tracelevel > 0)
{
MQTTAsync_setTraceCallback(trace_callback);
MQTTAsync_setTraceLevel(opts.tracelevel);
}
rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL); MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
...@@ -314,9 +284,45 @@ int main(int argc, char** argv) ...@@ -314,9 +284,45 @@ int main(int argc, char** argv)
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
conn_opts.username = opts.username; conn_opts.username = opts.username;
conn_opts.password = opts.password; conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect; conn_opts.MQTTVersion = opts.MQTTVersion;
conn_opts.onFailure = onConnectFailure; if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
conn_opts = conn_opts5;
conn_opts.onSuccess5 = onConnect5;
conn_opts.onFailure5 = onConnectFailure5;
}
else
{
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
}
conn_opts.context = client; conn_opts.context = client;
conn_opts.automaticReconnect = 1;
if (opts.will_topic) /* will options */
{
will_opts.message = opts.will_payload;
will_opts.topicName = opts.will_topic;
will_opts.qos = opts.will_qos;
will_opts.retained = opts.will_retain;
conn_opts.will = &will_opts;
}
if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
strncmp(opts.connection, "wss://", 6) == 0))
{
if (opts.insecure)
ssl_opts.enableServerCertAuth = 0;
ssl_opts.CApath = opts.capath;
ssl_opts.keyStore = opts.cert;
ssl_opts.trustStore = opts.cafile;
ssl_opts.privateKey = opts.key;
ssl_opts.privateKeyPassword = opts.keypass;
ssl_opts.enabledCipherSuites = opts.ciphers;
conn_opts.ssl = &ssl_opts;
}
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start connect, return code %d\n", rc); printf("Failed to start connect, return code %d\n", rc);
...@@ -324,21 +330,13 @@ int main(int argc, char** argv) ...@@ -324,21 +330,13 @@ int main(int argc, char** argv)
} }
while (!subscribed) while (!subscribed)
#if defined(WIN32) mysleep(100);
Sleep(100);
#else
usleep(10000L);
#endif
if (finished) if (finished)
goto exit; goto exit;
while (!finished) while (!finished)
#if defined(WIN32) mysleep(100);
Sleep(100);
#else
usleep(10000L);
#endif
disc_opts.onSuccess = onDisconnect; disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
...@@ -347,12 +345,8 @@ int main(int argc, char** argv) ...@@ -347,12 +345,8 @@ int main(int argc, char** argv)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
while (!disconnected) while (!disconnected)
#if defined(WIN32) mysleep(100);
Sleep(100);
#else
usleep(10000L);
#endif
exit: exit:
MQTTAsync_destroy(&client); MQTTAsync_destroy(&client);
......
/*******************************************************************************
* Copyright (c) 2012, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include "pubsub_opts.h"
#include <string.h>
#include <stdlib.h>
int getopts(int argc, char** argv, struct pubsub_opts* opts)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0 || strcmp(argv[count], "-r") == 0)
opts->retained = 1;
else if (strcmp(argv[count], "--verbose") == 0 || strcmp(argv[count], "-v") == 0)
opts->verbose = 1;
else if (strcmp(argv[count], "--qos") == 0 || strcmp(argv[count], "-q") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts->qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts->qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts->qos = 2;
else
return 1;
}
else
return 1;
}
else if (strcmp(argv[count], "--connection") == 0 || strcmp(argv[count], "-c") == 0)
{
if (++count < argc)
opts->connection = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--host") == 0 || strcmp(argv[count], "-h") == 0)
{
if (++count < argc)
opts->host = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--port") == 0 || strcmp(argv[count], "-p") == 0)
{
if (++count < argc)
opts->port = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--clientid") == 0 || strcmp(argv[count], "-i") == 0)
{
if (++count < argc)
opts->clientid = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--username") == 0 || strcmp(argv[count], "-u") == 0)
{
if (++count < argc)
opts->username = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--password") == 0 || strcmp(argv[count], "-P") == 0)
{
if (++count < argc)
opts->password = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts->maxdatalen = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "--message-expiry") == 0)
{
if (++count < argc)
opts->message_expiry = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts->delimiter = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--keepalive") == 0 || strcmp(argv[count], "-k") == 0)
{
if (++count < argc)
opts->keepalive = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "--topic") == 0 || strcmp(argv[count], "-t") == 0)
{
if (++count < argc)
opts->topic = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--will-topic") == 0)
{
if (++count < argc)
opts->will_topic = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--will-payload") == 0)
{
if (++count < argc)
opts->will_payload = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--will-qos") == 0)
{
if (++count < argc)
opts->will_qos = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "--will-retain") == 0)
{
if (++count < argc)
opts->will_retain = 1;
else
return 1;
}
else if (strcmp(argv[count], "--insecure") == 0)
{
if (++count < argc)
opts->insecure = 1;
else
return 1;
}
else if (strcmp(argv[count], "--capath") == 0)
{
if (++count < argc)
opts->capath = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--cafile") == 0)
{
if (++count < argc)
opts->cafile = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--cert") == 0)
{
if (++count < argc)
opts->cert = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--key") == 0)
{
if (++count < argc)
opts->key = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--keypass") == 0)
{
if (++count < argc)
opts->keypass = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "--ciphers") == 0)
{
if (++count < argc)
opts->ciphers = argv[count];
else
return 1;
}
else if (strcmp(argv[count], "-V") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "mqttv31") == 0 || strcmp(argv[count], "31") == 0)
opts->MQTTVersion = MQTTVERSION_3_1;
else if (strcmp(argv[count], "mqttv311") == 0 || strcmp(argv[count], "311") == 0)
opts->MQTTVersion = MQTTVERSION_3_1_1;
else if (strcmp(argv[count], "mqttv5") == 0 || strcmp(argv[count], "5") == 0)
opts->MQTTVersion = MQTTVERSION_5;
else
return 1;
}
else
return 1;
}
else if (strcmp(argv[count], "--trace") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "error") == 0)
opts->tracelevel = MQTTASYNC_TRACE_ERROR;
else if (strcmp(argv[count], "protocol") == 0)
opts->tracelevel = MQTTASYNC_TRACE_PROTOCOL;
else if (strcmp(argv[count], "min") == 0 || strcmp(argv[count], "on") == 0)
opts->tracelevel = MQTTASYNC_TRACE_MINIMUM;
else if (strcmp(argv[count], "max") == 0)
opts->tracelevel = MQTTASYNC_TRACE_MAXIMUM;
else
return 1;
}
else
return 1;
}
else if (strcmp(argv[count], "--user-property") == 0)
{
if (count + 2 < argc)
{
opts->user_property.name = argv[++count];
opts->user_property.value = argv[++count];
}
else
return 1;
}
else
{
printf("Unknown arg %s\n", argv[count]);
return 1;
}
count++;
}
return 0;
}
/*******************************************************************************
* Copyright (c) 2012, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
* Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/
#if !defined(PUBSUB_OPTS_H)
#define PUBSUB_OPTS_H
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
struct pubsub_opts
{
int MQTTVersion;
int tracelevel;
char* topic;
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
char* connection;
int verbose;
int keepalive;
char* will_topic;
char* will_payload;
int will_qos;
int will_retain;
int insecure;
char* capath;
char* cert;
char* cafile;
char* key;
char* keypass;
char* ciphers;
int message_expiry;
struct {
char *name;
char *value;
} user_property;
};
int getopts(int argc, char** argv, struct pubsub_opts* opts);
#endif
...@@ -84,6 +84,7 @@ struct Options ...@@ -84,6 +84,7 @@ struct Options
char* client_private_key_file; char* client_private_key_file;
int verbose; int verbose;
int test_no; int test_no;
int websockets;
} options = } options =
{ {
"ssl://m2m.eclipse.org:18883", "ssl://m2m.eclipse.org:18883",
...@@ -99,6 +100,7 @@ struct Options ...@@ -99,6 +100,7 @@ struct Options
NULL, NULL,
0, 0,
0, 0,
0,
}; };
...@@ -154,16 +156,18 @@ void getopts(int argc, char** argv) ...@@ -154,16 +156,18 @@ void getopts(int argc, char** argv)
{ {
if (++count < argc) if (++count < argc)
{ {
sprintf(options.connection, "ssl://%s:18883", argv[count]); char* prefix = (options.websockets) ? "wss" : "ssl";
sprintf(options.connection, "%s://%s:18883", prefix, argv[count]);
printf("Setting connection to %s\n", options.connection); printf("Setting connection to %s\n", options.connection);
sprintf(options.mutual_auth_connection, "ssl://%s:18884", argv[count]); sprintf(options.mutual_auth_connection, "%s://%s:18884", prefix, argv[count]);
printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection); printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection);
sprintf(options.nocert_mutual_auth_connection, "ssl://%s:18887", argv[count]); sprintf(options.nocert_mutual_auth_connection, "%s://%s:18887", prefix, argv[count]);
printf("Setting nocert_mutual_auth_connection to %s\n", printf("Setting nocert_mutual_auth_connection to %s\n",
options.nocert_mutual_auth_connection); options.nocert_mutual_auth_connection);
sprintf(options.server_auth_connection, "ssl://%s:18885", argv[count]); sprintf(options.server_auth_connection, "%s://%s:18885", prefix, argv[count]);
printf("Setting server_auth_connection to %s\n", options.server_auth_connection); printf("Setting server_auth_connection to %s\n", options.server_auth_connection);
sprintf(options.anon_connection, "ssl://%s:18886", argv[count]); sprintf(options.anon_connection, "%s://%s:18886", prefix, argv[count]);
printf("Setting anon_connection to %s\n", options.anon_connection); printf("Setting anon_connection to %s\n", options.anon_connection);
} }
else else
...@@ -242,9 +246,13 @@ void getopts(int argc, char** argv) ...@@ -242,9 +246,13 @@ void getopts(int argc, char** argv)
else if (strcmp(argv[count], "--verbose") == 0) else if (strcmp(argv[count], "--verbose") == 0)
{ {
options.verbose = 1; options.verbose = 1;
//TODO
printf("\nSetting verbose on\n"); printf("\nSetting verbose on\n");
} }
else if (strcmp(argv[count], "--ws") == 0)
{
options.websockets = 1;
printf("\nSetting websockets on\n");
}
count++; count++;
} }
#if defined(IOS) #if defined(IOS)
......
...@@ -66,6 +66,7 @@ struct Options ...@@ -66,6 +66,7 @@ struct Options
int verbose; int verbose;
int test_no; int test_no;
int size; int size;
int websockets;
} options = } options =
{ {
"ssl://m2m.eclipse.org:18883", "ssl://m2m.eclipse.org:18883",
...@@ -79,7 +80,8 @@ struct Options ...@@ -79,7 +80,8 @@ struct Options
NULL, NULL,
0, 0,
0, 0,
5000000 5000000,
0,
}; };
typedef struct typedef struct
...@@ -143,21 +145,28 @@ void getopts(int argc, char** argv) ...@@ -143,21 +145,28 @@ void getopts(int argc, char** argv)
{ {
if (++count < argc) if (++count < argc)
{ {
sprintf(options.connection, "ssl://%s:18883", argv[count]); char* prefix = (options.websockets) ? "wss" : "ssl";
sprintf(options.connection, "%s://%s:18883", prefix, argv[count]);
printf("Setting connection to %s\n", options.connection); printf("Setting connection to %s\n", options.connection);
sprintf(options.mutual_auth_connection, "ssl://%s:18884", argv[count]); sprintf(options.mutual_auth_connection, "%s://%s:18884", prefix, argv[count]);
printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection); printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection);
sprintf(options.nocert_mutual_auth_connection, "ssl://%s:18887", argv[count]); sprintf(options.nocert_mutual_auth_connection, "%s://%s:18887", prefix, argv[count]);
printf("Setting nocert_mutual_auth_connection to %s\n", printf("Setting nocert_mutual_auth_connection to %s\n",
options.nocert_mutual_auth_connection); options.nocert_mutual_auth_connection);
sprintf(options.server_auth_connection, "ssl://%s:18885", argv[count]); sprintf(options.server_auth_connection, "%s://%s:18885", prefix, argv[count]);
printf("Setting server_auth_connection to %s\n", options.server_auth_connection); printf("Setting server_auth_connection to %s\n", options.server_auth_connection);
sprintf(options.anon_connection, "ssl://%s:18886", argv[count]); sprintf(options.anon_connection, "%s://%s:18886", prefix, argv[count]);
printf("Setting anon_connection to %s\n", options.anon_connection); printf("Setting anon_connection to %s\n", options.anon_connection);
} }
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--ws") == 0)
{
options.websockets = 1;
printf("\nSetting websockets on\n");
}
count++; count++;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment