Commit d5c96a27 authored by Ian Craggs's avatar Ian Craggs

More sample updates #504

parent 6f0edc21
...@@ -210,13 +210,13 @@ ${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET) ...@@ -210,13 +210,13 @@ ${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET)
${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_C_TARGET) ${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_C_TARGET)
${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES} ${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES}
${SYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_CS_TARGET) ${SYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c ${srcdir}/samples/pubsub_opts.c $(MQTTLIB_CS_TARGET)
${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c ${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c
${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_A_TARGET) ${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_A_TARGET)
${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES} ${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES}
${ASYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_AS_TARGET) ${ASYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c ${srcdir}/samples/pubsub_opts.c $(MQTTLIB_AS_TARGET)
${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c ${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c
$(blddir_work)/VersionInfo.h: $(srcdir)/VersionInfo.h.in $(blddir_work)/VersionInfo.h: $(srcdir)/VersionInfo.h.in
......
...@@ -381,10 +381,12 @@ int Thread_signal_cond(cond_type condvar) ...@@ -381,10 +381,12 @@ int Thread_signal_cond(cond_type condvar)
{ {
int rc = 0; int rc = 0;
FUNC_ENTRY;
pthread_mutex_lock(&condvar->mutex); pthread_mutex_lock(&condvar->mutex);
rc = pthread_cond_signal(&condvar->cond); rc = pthread_cond_signal(&condvar->cond);
pthread_mutex_unlock(&condvar->mutex); pthread_mutex_unlock(&condvar->mutex);
FUNC_EXIT_RC(rc);
return rc; return rc;
} }
...@@ -394,10 +396,10 @@ int Thread_signal_cond(cond_type condvar) ...@@ -394,10 +396,10 @@ int Thread_signal_cond(cond_type condvar)
*/ */
int Thread_wait_cond(cond_type condvar, int timeout) int Thread_wait_cond(cond_type condvar, int timeout)
{ {
FUNC_ENTRY;
int rc = 0; int rc = 0;
struct timespec cond_timeout; struct timespec cond_timeout;
FUNC_ENTRY;
#if defined(OSX) #if defined(OSX)
clock_gettime(CLOCK_REALTIME, &cond_timeout); clock_gettime(CLOCK_REALTIME, &cond_timeout);
#else #else
......
...@@ -40,33 +40,17 @@ volatile int toStop = 0; ...@@ -40,33 +40,17 @@ volatile int toStop = 0;
struct pubsub_opts opts = struct pubsub_opts opts =
{ {
MQTTVERSION_DEFAULT, 0, 1, 0, 0, "\n", 100, /* debug/app options */
NULL, "paho-c-pub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10, NULL, NULL, 1, 0, 0, /* message options */
MQTTVERSION_DEFAULT, NULL, "paho-c-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
NULL, NULL, 0, 0, /* will options */ NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */ 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */ 0, {NULL, NULL}, /* MQTT V5 options */
}; };
void usage(void) MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
{ MQTTProperty property;
printf("Eclipse Paho MQTT C publisher\n" MQTTProperties props = MQTTProperties_initializer;
"Usage: paho_c_pub <topicname> <options>, where options are:\n"
" -t (--topic) MQTT topic to publish to\n"
" -h (--host) host to connect to (default is %s)\n"
" -p (--port) network port to connect to (default is %s)\n"
" -c (--connection) connection string, overrides host/port e.g wss://hostname:port/ws\n"
" -q (--qos) MQTT QoS to publish on (0, 1 or 2) (default is %d)\n"
" -r (--retained) use MQTT retain option? (default is %s)\n"
" -i (--clientid) <clientid> (default is %s)\n"
" -u (--username) MQTT username (default is none)\n"
" -P (--password) MQTT password (default is none)\n"
" -k (--keepalive) MQTT keepalive timeout value (default is %d seconds)\n"
" --delimiter <delim> (default is \\n)\n"
" --maxdatalen <bytes> (default is %d)\n",
opts.host, opts.port, opts.qos, opts.retained ? "on" : "off",
opts.clientid, opts.maxdatalen, opts.keepalive);
exit(EXIT_FAILURE);
}
void mysleep(int ms) void mysleep(int ms)
...@@ -107,6 +91,7 @@ void onDisconnect(void* context, MQTTAsync_successData* response) ...@@ -107,6 +91,7 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
static int connected = 0; static int connected = 0;
void myconnect(MQTTAsync client); void myconnect(MQTTAsync client);
int mypublish(MQTTAsync client, int datalen, char* data);
void onConnectFailure5(void* context, MQTTAsync_failureData5* response) void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
{ {
...@@ -129,15 +114,33 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response) ...@@ -129,15 +114,33 @@ void onConnectFailure(void* context, MQTTAsync_failureData* response)
void onConnect5(void* context, MQTTAsync_successData5* response) void onConnect5(void* context, MQTTAsync_successData5* response)
{ {
MQTTAsync client = (MQTTAsync)context;
int rc = 0;
if (opts.verbose) if (opts.verbose)
printf("Connected\n"); printf("Connected\n");
if (opts.null_message == 1)
rc = mypublish(client, 0, "");
else if (opts.message)
rc = mypublish(client, strlen(opts.message), opts.message);
connected = 1; connected = 1;
} }
void onConnect(void* context, MQTTAsync_successData* response) void onConnect(void* context, MQTTAsync_successData* response)
{ {
MQTTAsync client = (MQTTAsync)context;
int rc = 0;
if (opts.verbose) if (opts.verbose)
printf("Connected\n"); printf("Connected\n");
if (opts.null_message == 1)
rc = mypublish(client, 0, "");
else if (opts.message)
rc = mypublish(client, strlen(opts.message), opts.message);
connected = 1; connected = 1;
} }
...@@ -163,6 +166,10 @@ void onPublish5(void* context, MQTTAsync_successData5* response) ...@@ -163,6 +166,10 @@ void onPublish5(void* context, MQTTAsync_successData5* response)
{ {
if (opts.verbose) if (opts.verbose)
printf("Publish succeeded, reason code %d\n", response->reasonCode); printf("Publish succeeded, reason code %d\n", response->reasonCode);
if (opts.null_message || opts.message || opts.filename)
toStop = 1;
published = 1; published = 1;
} }
...@@ -171,6 +178,10 @@ void onPublish(void* context, MQTTAsync_successData* response) ...@@ -171,6 +178,10 @@ void onPublish(void* context, MQTTAsync_successData* response)
{ {
if (opts.verbose) if (opts.verbose)
printf("Publish succeeded\n"); printf("Publish succeeded\n");
if (opts.null_message || opts.message || opts.filename)
toStop = 1;
published = 1; published = 1;
} }
...@@ -237,27 +248,53 @@ void myconnect(MQTTAsync client) ...@@ -237,27 +248,53 @@ void myconnect(MQTTAsync client)
} }
int mypublish(MQTTAsync client, int datalen, char* data)
{
int rc;
if (opts.verbose)
printf("Publishing data of length %d\n", datalen);
rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts);
if (opts.verbose && rc != MQTTASYNC_SUCCESS)
fprintf(stderr, "Error from MQTTAsync_send %d\n", rc);
return rc;
}
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); fprintf(stderr, "Trace : %d, %s\n", level, message);
} }
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client; MQTTAsync client;
char* buffer = NULL; char* buffer = NULL;
char* url = NULL; char* url = NULL;
int rc = 0; int rc = 0;
MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
const char* version = NULL;
while (infos->name)
{
if (strcmp(infos->name, "Version") == 0)
{
version = infos->value;
break;
}
++infos;
}
if (argc < 2) if (argc < 2)
usage(); usage(&opts, version);
if (getopts(argc, argv, &opts) != 0) if (getopts(argc, argv, &opts) != 0)
usage(); usage(&opts, version);
if (opts.connection) if (opts.connection)
url = opts.connection; url = opts.connection;
...@@ -281,69 +318,70 @@ int main(int argc, char** argv) ...@@ -281,69 +318,70 @@ int main(int argc, char** argv)
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL); rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
myconnect(client); if (opts.MQTTVersion >= MQTTVERSION_5)
{
pub_opts.onSuccess5 = onPublish5;
pub_opts.onFailure5 = onPublishFailure5;
buffer = malloc(opts.maxdatalen); if (opts.message_expiry > 0)
{
property.identifier = MESSAGE_EXPIRY_INTERVAL;
property.value.integer4 = opts.message_expiry;
MQTTProperties_add(&props, &property);
}
if (opts.user_property.name)
{
property.identifier = USER_PROPERTY;
property.value.data.data = opts.user_property.name;
property.value.data.len = strlen(opts.user_property.name);
property.value.value.data = opts.user_property.value;
property.value.value.len = strlen(opts.user_property.value);
MQTTProperties_add(&props, &property);
}
pub_opts.properties = props;
}
else
{
pub_opts.onSuccess = onPublish;
pub_opts.onFailure = onPublishFailure;
}
myconnect(client);
while (!toStop) while (!toStop)
{ {
int data_len = 0; int data_len = 0;
int delim_len = 0; int delim_len = 0;
static int count = 0;
delim_len = (int)strlen(opts.delimiter); if (opts.stdin_lines)
do
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
if (opts.MQTTVersion >= MQTTVERSION_5)
{ {
MQTTProperty property; buffer = malloc(opts.maxdatalen);
MQTTProperties props = MQTTProperties_initializer;
pub_opts.onSuccess5 = onPublish5;
pub_opts.onFailure5 = onPublishFailure5;
if (opts.message_expiry > 0) delim_len = (int)strlen(opts.delimiter);
{ do
property.identifier = MESSAGE_EXPIRY_INTERVAL;
property.value.integer4 = opts.message_expiry;
MQTTProperties_add(&props, &property);
}
if (opts.user_property.name)
{ {
property.identifier = USER_PROPERTY; buffer[data_len++] = getchar();
property.value.data.data = opts.user_property.name; if (data_len > delim_len)
property.value.data.len = strlen(opts.user_property.name); {
property.value.value.data = opts.user_property.value; if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
property.value.value.len = strlen(opts.user_property.value); break;
MQTTProperties_add(&props, &property); }
} } while (data_len < opts.maxdatalen);
pub_opts.properties = props;
rc = mypublish(client, data_len, buffer);
} }
else else
{ mysleep(100);
pub_opts.onSuccess = onPublish;
pub_opts.onFailure = onPublishFailure;
}
rc = MQTTAsync_send(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
if (opts.verbose && rc != MQTTASYNC_SUCCESS)
printf("Error from MQTTAsync_send %d\n", rc);
} }
printf("Stopping\n"); if (opts.verbose)
printf("Stopping\n");
free(buffer); if (opts.message == 0 && opts.null_message == 0 && opts.filename == 0)
free(buffer);
if (opts.MQTTVersion >= MQTTVERSION_5) if (opts.MQTTVersion >= MQTTVERSION_5)
disc_opts.onSuccess5 = onDisconnect5; disc_opts.onSuccess5 = onDisconnect5;
...@@ -351,7 +389,7 @@ int main(int argc, char** argv) ...@@ -351,7 +389,7 @@ int main(int argc, char** argv)
disc_opts.onSuccess = onDisconnect; disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start disconnect, return code %d\n", rc); fprintf(stderr, "Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
...@@ -39,7 +39,6 @@ ...@@ -39,7 +39,6 @@
#endif #endif
volatile int finished = 0; volatile int finished = 0;
char* topic = NULL;
int subscribed = 0; int subscribed = 0;
int disconnected = 0; int disconnected = 0;
...@@ -62,29 +61,14 @@ void cfinish(int sig) ...@@ -62,29 +61,14 @@ void cfinish(int sig)
struct pubsub_opts opts = struct pubsub_opts opts =
{ {
MQTTVERSION_DEFAULT, 0, 0, 0, 0, "\n", 100, /* debug/app options */
NULL, "paho-c-sub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10, NULL, NULL, 1, 0, 0, /* message options */
MQTTVERSION_DEFAULT, NULL, "paho-c-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
NULL, NULL, 0, 0, /* will options */ NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */ 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */ 0, {NULL, NULL}, /* MQTT V5 options */
}; };
void usage(void)
{
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --delimiter <delim> (default is no delimiter)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --username none\n");
printf(" --password none\n");
printf(" --verbose <on or off> (default is on if the topic has a wildcard, else off)\n");
printf(" --keepalive <seconds> (default is 10 seconds)\n");
exit(EXIT_FAILURE);
}
void logProperties(MQTTProperties *props) void logProperties(MQTTProperties *props)
{ {
...@@ -161,28 +145,28 @@ void onSubscribe(void* context, MQTTAsync_successData* response) ...@@ -161,28 +145,28 @@ void onSubscribe(void* context, MQTTAsync_successData* response)
void onSubscribeFailure5(void* context, MQTTAsync_failureData5* response) void onSubscribeFailure5(void* context, MQTTAsync_failureData5* response)
{ {
printf("Subscribe failed, rc %d reason code %d\n", response->code, response->reasonCode); fprintf(stderr, "Subscribe failed, rc %d reason code %d\n", response->code, response->reasonCode);
finished = 1; finished = 1;
} }
void onSubscribeFailure(void* context, MQTTAsync_failureData* response) void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Subscribe failed, rc %d\n", response->code); fprintf(stderr, "Subscribe failed, rc %d\n", response->code);
finished = 1; finished = 1;
} }
void onConnectFailure5(void* context, MQTTAsync_failureData5* response) void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
{ {
printf("Connect failed, rc %d reason code %d\n", response->code, response->reasonCode); fprintf(stderr, "Connect failed, rc %d reason code %d\n", response->code, response->reasonCode);
finished = 1; finished = 1;
} }
void onConnectFailure(void* context, MQTTAsync_failureData* response) void onConnectFailure(void* context, MQTTAsync_failureData* response)
{ {
printf("Connect failed, rc %d\n", response ? response->code : -99); fprintf(stderr, "Connect failed, rc %d\n", response ? response->code : -99);
finished = 1; finished = 1;
} }
...@@ -194,14 +178,14 @@ void onConnect5(void* context, MQTTAsync_successData5* response) ...@@ -194,14 +178,14 @@ void onConnect5(void* context, MQTTAsync_successData5* response)
int rc; int rc;
if (opts.verbose) if (opts.verbose)
printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
copts.onSuccess5 = onSubscribe5; copts.onSuccess5 = onSubscribe5;
copts.onFailure5 = onSubscribeFailure5; copts.onFailure5 = onSubscribeFailure5;
copts.context = client; copts.context = client;
if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start subscribe, return code %d\n", rc); fprintf(stderr, "Failed to start subscribe, return code %d\n", rc);
finished = 1; finished = 1;
} }
} }
...@@ -214,14 +198,14 @@ void onConnect(void* context, MQTTAsync_successData* response) ...@@ -214,14 +198,14 @@ void onConnect(void* context, MQTTAsync_successData* response)
int rc; int rc;
if (opts.verbose) if (opts.verbose)
printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
ropts.onSuccess = onSubscribe; ropts.onSuccess = onSubscribe;
ropts.onFailure = onSubscribeFailure; ropts.onFailure = onSubscribeFailure;
ropts.context = client; ropts.context = client;
if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start subscribe, return code %d\n", rc); fprintf(stderr, "Failed to start subscribe, return code %d\n", rc);
finished = 1; finished = 1;
} }
} }
...@@ -231,7 +215,7 @@ MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; ...@@ -231,7 +215,7 @@ MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{ {
printf("Trace : %d, %s\n", level, message); fprintf(stderr, "Trace : %d, %s\n", level, message);
} }
...@@ -243,14 +227,26 @@ int main(int argc, char** argv) ...@@ -243,14 +227,26 @@ int main(int argc, char** argv)
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0; int rc = 0;
char* url = NULL; char* url = NULL;
MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
const char* version = NULL;
while (infos->name)
{
if (strcmp(infos->name, "Version") == 0)
{
version = infos->value;
break;
}
++infos;
}
if (argc < 2) if (argc < 2)
usage(); usage(&opts, version);
if (getopts(argc, argv, &opts) != 0) if (getopts(argc, argv, &opts) != 0)
usage(); usage(&opts, version);
if (strchr(topic, '#') || strchr(topic, '+')) if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
opts.verbose = 1; opts.verbose = 1;
if (opts.connection) if (opts.connection)
...@@ -321,7 +317,7 @@ int main(int argc, char** argv) ...@@ -321,7 +317,7 @@ int main(int argc, char** argv)
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start connect, return code %d\n", rc); fprintf(stderr, "Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
...@@ -337,7 +333,7 @@ int main(int argc, char** argv) ...@@ -337,7 +333,7 @@ int main(int argc, char** argv)
disc_opts.onSuccess = onDisconnect; disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{ {
printf("Failed to start disconnect, return code %d\n", rc); fprintf(stderr, "Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
...@@ -47,31 +47,15 @@ void cfinish(int sig) ...@@ -47,31 +47,15 @@ void cfinish(int sig)
struct pubsub_opts opts = struct pubsub_opts opts =
{ {
MQTTVERSION_DEFAULT, 0, 1, 0, 0, "\n", 100, /* debug/app options */
NULL, "paho-cs-pub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10, NULL, NULL, 1, 0, 0, /* message options */
MQTTVERSION_DEFAULT, NULL, "paho-cs-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
NULL, NULL, 0, 0, /* will options */ NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */ 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */ 0, {NULL, NULL}, /* MQTT V5 options */
}; };
void usage(void)
{
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --delimiter <delim> (default is no delimiter)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --username none\n");
printf(" --password none\n");
printf(" --verbose <on or off> (default is on if the topic has a wildcard, else off)\n");
printf(" --keepalive <seconds> (default is 10 seconds)\n");
exit(EXIT_FAILURE);
}
int myconnect(MQTTClient* client) int myconnect(MQTTClient* client)
{ {
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
...@@ -159,12 +143,24 @@ int main(int argc, char** argv) ...@@ -159,12 +143,24 @@ int main(int argc, char** argv)
char* buffer = NULL; char* buffer = NULL;
int rc = 0; int rc = 0;
char* url; char* url;
MQTTClient_nameValue* infos = MQTTClient_getVersionInfo();
const char* version = NULL;
while (infos->name)
{
if (strcmp(infos->name, "Version") == 0)
{
version = infos->value;
break;
}
++infos;
}
if (argc < 2) if (argc < 2)
usage(); usage(&opts, version);
if (getopts(argc, argv, &opts) != 0) if (getopts(argc, argv, &opts) != 0)
usage(); usage(&opts, version);
if (opts.connection) if (opts.connection)
url = opts.connection; url = opts.connection;
......
...@@ -43,31 +43,15 @@ volatile int toStop = 0; ...@@ -43,31 +43,15 @@ volatile int toStop = 0;
struct pubsub_opts opts = struct pubsub_opts opts =
{ {
MQTTVERSION_DEFAULT, 0, 0, 0, 0, "\n", 100, /* debug/app options */
NULL, "paho-cs-sub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10, NULL, NULL, 1, 0, 0, /* message options */
MQTTVERSION_DEFAULT, NULL, "paho-cs-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
NULL, NULL, 0, 0, /* will options */ NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */ 0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */ 0, {NULL, NULL}, /* MQTT V5 options */
}; };
void usage(void)
{
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --delimiter <delim> (default is \\n)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --username none\n");
printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
printf(" --keepalive <seconds> (default is %d seconds)\n", opts.keepalive);
exit(EXIT_FAILURE);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts) void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{ {
int rc = 0; int rc = 0;
...@@ -93,9 +77,21 @@ int main(int argc, char** argv) ...@@ -93,9 +77,21 @@ int main(int argc, char** argv)
char* topic = NULL; char* topic = NULL;
int rc = 0; int rc = 0;
char url[100]; char url[100];
MQTTClient_nameValue* infos = MQTTClient_getVersionInfo();
const char* version = NULL;
while (infos->name)
{
if (strcmp(infos->name, "Version") == 0)
{
version = infos->value;
break;
}
++infos;
}
if (argc < 2) if (argc < 2)
usage(); usage(&opts, version);
topic = argv[1]; topic = argv[1];
...@@ -105,7 +101,7 @@ int main(int argc, char** argv) ...@@ -105,7 +101,7 @@ int main(int argc, char** argv)
printf("topic is %s\n", topic); printf("topic is %s\n", topic);
if (getopts(argc, argv, &opts) != 0) if (getopts(argc, argv, &opts) != 0)
usage(); usage(&opts, version);
sprintf(url, "%s:%s", opts.host, opts.port); sprintf(url, "%s:%s", opts.host, opts.port);
......
...@@ -22,6 +22,31 @@ ...@@ -22,6 +22,31 @@
#include <stdlib.h> #include <stdlib.h>
void usage(struct pubsub_opts* opts, const char* version)
{
printf("Eclipse Paho MQTT C %s\n", opts->publisher ? "publisher" : "subscriber");
printf("Eclipse Paho C library version %s\n", version);
printf("Usage: paho_c_pub <topicname> <options>, where options are:\n"
" -t (--topic) MQTT topic to publish to\n"
" -h (--host) host to connect to (default is %s)\n"
" -p (--port) network port to connect to (default is %s)\n"
" -c (--connection) connection string, overrides host/port e.g wss://hostname:port/ws\n"
" -q (--qos) MQTT QoS to publish on (0, 1 or 2) (default is %d)\n"
" -r (--retained) use MQTT retain option? (default is %s)\n"
" -i (--clientid) <clientid> (default is %s)\n"
" -u (--username) MQTT username (default is none)\n"
" -P (--password) MQTT password (default is none)\n"
" -k (--keepalive) MQTT keepalive timeout value (default is %d seconds)\n"
" --delimiter <delim> (default is \\n)\n"
" --maxdatalen <bytes> (default is %d)\n",
opts->host, opts->port, opts->qos, opts->retained ? "on" : "off",
opts->clientid, opts->maxdatalen, opts->keepalive);
exit(EXIT_FAILURE);
}
int getopts(int argc, char** argv, struct pubsub_opts* opts) int getopts(int argc, char** argv, struct pubsub_opts* opts)
{ {
int count = 1; int count = 1;
...@@ -34,9 +59,7 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts) ...@@ -34,9 +59,7 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts)
while (count < argc) while (count < argc)
{ {
if (strcmp(argv[count], "--retained") == 0 || strcmp(argv[count], "-r") == 0) if (strcmp(argv[count], "--verbose") == 0 || strcmp(argv[count], "-v") == 0)
opts->retained = 1;
else if (strcmp(argv[count], "--verbose") == 0 || strcmp(argv[count], "-v") == 0)
opts->verbose = 1; opts->verbose = 1;
else if (strcmp(argv[count], "--qos") == 0 || strcmp(argv[count], "-q") == 0) else if (strcmp(argv[count], "--qos") == 0 || strcmp(argv[count], "-q") == 0)
{ {
...@@ -103,13 +126,6 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts) ...@@ -103,13 +126,6 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts)
else else
return 1; return 1;
} }
else if (strcmp(argv[count], "--message-expiry") == 0)
{
if (++count < argc)
opts->message_expiry = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "--delimiter") == 0) else if (strcmp(argv[count], "--delimiter") == 0)
{ {
if (++count < argc) if (++count < argc)
...@@ -242,19 +258,71 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts) ...@@ -242,19 +258,71 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts)
else else
return 1; return 1;
} }
else if (strcmp(argv[count], "--user-property") == 0) else if (opts->publisher == 0)
{ {
if (count + 2 < argc) if (strcmp(argv[count], "--no-print-retained") == 0 || strcmp(argv[count], "-R") == 0)
opts->retained = 1;
else
{ {
opts->user_property.name = argv[++count]; fprintf(stderr, "Unknown option %s\n", argv[count]);
opts->user_property.value = argv[++count]; return 1;
}
}
else if (opts->publisher == 1)
{
if (strcmp(argv[count], "--retained") == 0 || strcmp(argv[count], "-r") == 0)
opts->retained = 1;
else if (strcmp(argv[count], "--user-property") == 0)
{
if (count + 2 < argc)
{
opts->user_property.name = argv[++count];
opts->user_property.value = argv[++count];
}
else
return 1;
}
else if (strcmp(argv[count], "--message-expiry") == 0)
{
if (++count < argc)
opts->message_expiry = atoi(argv[count]);
else
return 1;
}
else if (strcmp(argv[count], "-m") == 0 || strcmp(argv[count], "--message") == 0)
{
if (++count < argc)
{
opts->stdin_lines = 0;
opts->message = argv[count];
}
else
return 1;
}
else if (strcmp(argv[count], "-f") == 0 || strcmp(argv[count], "--filename") == 0)
{
if (++count < argc)
{
opts->stdin_lines = 0;
opts->message = argv[count];
}
else
return 1;
}
else if (strcmp(argv[count], "-n") == 0 || strcmp(argv[count], "--null-message") == 0)
{
opts->stdin_lines = 0;
opts->null_message = 1;
} }
else else
{
fprintf(stderr, "Unknown option %s\n", argv[count]);
return 1; return 1;
}
} }
else else
{ {
printf("Unknown arg %s\n", argv[count]); fprintf(stderr, "Unknown option %s\n", argv[count]);
return 1; return 1;
} }
......
...@@ -23,12 +23,22 @@ ...@@ -23,12 +23,22 @@
struct pubsub_opts struct pubsub_opts
{ {
int MQTTVersion; /* debug app options */
int publisher; /* publisher app? */
int verbose;
int tracelevel; int tracelevel;
char* topic;
char* clientid;
char* delimiter; char* delimiter;
int maxdatalen; int maxdatalen;
/* message options */
char* message;
char* filename;
int stdin_lines;
int stdlin_complete;
int null_message;
/* MQTT options */
int MQTTVersion;
char* topic;
char* clientid;
int qos; int qos;
int retained; int retained;
char* username; char* username;
...@@ -36,12 +46,13 @@ struct pubsub_opts ...@@ -36,12 +46,13 @@ struct pubsub_opts
char* host; char* host;
char* port; char* port;
char* connection; char* connection;
int verbose;
int keepalive; int keepalive;
/* will options */
char* will_topic; char* will_topic;
char* will_payload; char* will_payload;
int will_qos; int will_qos;
int will_retain; int will_retain;
/* TLS options */
int insecure; int insecure;
char* capath; char* capath;
char* cert; char* cert;
...@@ -49,6 +60,7 @@ struct pubsub_opts ...@@ -49,6 +60,7 @@ struct pubsub_opts
char* key; char* key;
char* keypass; char* keypass;
char* ciphers; char* ciphers;
/* MQTT V5 options */
int message_expiry; int message_expiry;
struct { struct {
char *name; char *name;
...@@ -56,6 +68,7 @@ struct pubsub_opts ...@@ -56,6 +68,7 @@ struct pubsub_opts
} user_property; } user_property;
}; };
void usage(struct pubsub_opts* opts, const char* version);
int getopts(int argc, char** argv, struct pubsub_opts* opts); int getopts(int argc, char** argv, struct pubsub_opts* opts);
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment