Commit 6f0edc21 authored by Ian Craggs's avatar Ian Craggs

Update synchronous publish sample

parent 7702f614
......@@ -84,9 +84,12 @@ HEADERS = $(srcdir)/*.h
HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS))
HEADERS_A = $(HEADERS)
SAMPLE_FILES_C = paho_cs_pub paho_cs_sub MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe
SAMPLE_FILES_C = MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe
SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
UTIL_FILES_CS = paho_cs_pub paho_cs_sub
SYNC_UTILS = ${addprefix ${blddir}/samples/,${UTIL_FILES_CS}}
SAMPLE_FILES_A = MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
......@@ -181,7 +184,7 @@ endif
all: build
build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${ASYNC_SAMPLES} ${ASYNC_UTILS} ${SYNC_TESTS} ${SYNC_SSL_TESTS} ${ASYNC_TESTS} ${ASYNC_SSL_TESTS}
build: | mkdir ${MQTTLIB_C_TARGET} ${MQTTLIB_CS_TARGET} ${MQTTLIB_A_TARGET} ${MQTTLIB_AS_TARGET} ${MQTTVERSION_TARGET} ${SYNC_SAMPLES} ${SYNC_UTILS} ${ASYNC_SAMPLES} ${ASYNC_UTILS} ${SYNC_TESTS} ${SYNC_SSL_TESTS} ${ASYNC_TESTS} ${ASYNC_SSL_TESTS}
clean:
rm -rf ${blddir}/*
......@@ -205,7 +208,10 @@ ${ASYNC_SSL_TESTS}: ${blddir}/test/%: ${srcdir}/../test/%.c $(MQTTLIB_CS_TARGET)
${CC} -g -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES}
${SYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_C_TARGET)
${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES}
${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES}
${SYNC_UTILS}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_CS_TARGET)
${CC} -o $@ $< -l${MQTTLIB_CS} ${FLAGS_EXES} ${srcdir}/samples/pubsub_opts.c
${ASYNC_SAMPLES}: ${blddir}/samples/%: ${srcdir}/samples/%.c $(MQTTLIB_A_TARGET)
${CC} -o $@ $< -l${MQTTLIB_AS} ${FLAGS_EXES}
......
......@@ -31,8 +31,8 @@ ENDIF()
# sample files c
ADD_EXECUTABLE(paho_c_pub paho_c_pub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_c_sub paho_c_sub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c)
ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c)
ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c pubsub_opts.c)
TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3as)
TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3as)
......
......@@ -106,7 +106,7 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
static int connected = 0;
void myconnect(MQTTAsync* client);
void myconnect(MQTTAsync client);
void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
{
......@@ -175,7 +175,7 @@ void onPublish(void* context, MQTTAsync_successData* response)
}
void myconnect(MQTTAsync* client)
void myconnect(MQTTAsync client)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
......@@ -185,7 +185,6 @@ void myconnect(MQTTAsync* client)
if (opts.verbose)
printf("Connecting\n");
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.MQTTVersion = opts.MQTTVersion;
......@@ -195,11 +194,13 @@ void myconnect(MQTTAsync* client)
conn_opts = conn_opts5;
conn_opts.onSuccess5 = onConnect5;
conn_opts.onFailure5 = onConnectFailure5;
conn_opts.cleanstart = 1;
}
else
{
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.cleansession = 1;
}
conn_opts.context = client;
conn_opts.automaticReconnect = 1;
......@@ -228,7 +229,7 @@ void myconnect(MQTTAsync* client)
}
connected = 0;
if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
......@@ -268,16 +269,6 @@ int main(int argc, char** argv)
if (opts.verbose)
printf("URL is %s\n", url);
if (argv[1][0] != '-')
{
opts.topic = argv[1];
if (opts.verbose)
printf("Topic is %s\n", opts.topic);
}
if (opts.topic == NULL)
usage();
if (opts.tracelevel > 0)
{
MQTTAsync_setTraceCallback(trace_callback);
......@@ -290,9 +281,10 @@ int main(int argc, char** argv)
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
myconnect(&client);
myconnect(client);
buffer = malloc(opts.maxdatalen);
......
......@@ -247,15 +247,11 @@ int main(int argc, char** argv)
if (argc < 2)
usage();
topic = argv[1];
if (getopts(argc, argv, &opts) != 0)
usage();
if (strchr(topic, '#') || strchr(topic, '+'))
opts.verbose = 1;
if (opts.verbose)
printf("topic is %s\n", topic);
if (getopts(argc, argv, &opts) != 0)
usage();
if (opts.connection)
url = opts.connection;
......
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
* Copyright (c) 2012, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -15,28 +15,14 @@
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin_publisher
--maxdatalen 100
--userid none
--password none
Synchronous API version of paho_c_pub.c
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include "pubsub_opts.h"
#include <stdio.h>
#include <signal.h>
......@@ -49,64 +35,109 @@
#include <sys/time.h>
#endif
volatile int toStop = 0;
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct pubsub_opts opts =
{
MQTTVERSION_DEFAULT, 0,
NULL, "paho-cs-pub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10,
NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */
};
void usage(void)
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n");
printf(" --retained (default is off)\n");
printf(" --delimiter <delim> (default is \\n)");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --delimiter <delim> (default is no delimiter)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --username none\n");
printf(" --password none\n");
printf(" --verbose <on or off> (default is on if the topic has a wildcard, else off)\n");
printf(" --keepalive <seconds> (default is 10 seconds)\n");
exit(EXIT_FAILURE);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
int myconnect(MQTTClient* client)
{
printf("Connecting\n");
if (MQTTClient_connect(*client, opts) != 0)
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
int rc = 0;
if (opts.verbose)
printf("Connecting\n");
if (opts.MQTTVersion == MQTTVERSION_5)
{
printf("Failed to connect\n");
exit(EXIT_FAILURE);
MQTTClient_connectOptions conn_opts5 = MQTTClient_connectOptions_initializer5;
conn_opts = conn_opts5;
}
printf("Connected\n");
}
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.MQTTVersion = opts.MQTTVersion;
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
if (opts.will_topic) /* will options */
{
will_opts.message = opts.will_payload;
will_opts.topicName = opts.will_topic;
will_opts.qos = opts.will_qos;
will_opts.retained = opts.will_retain;
conn_opts.will = &will_opts;
}
if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
strncmp(opts.connection, "wss://", 6) == 0))
{
if (opts.insecure)
ssl_opts.enableServerCertAuth = 0;
ssl_opts.CApath = opts.capath;
ssl_opts.keyStore = opts.cert;
ssl_opts.trustStore = opts.cafile;
ssl_opts.privateKey = opts.key;
ssl_opts.privateKeyPassword = opts.keypass;
ssl_opts.enabledCipherSuites = opts.ciphers;
conn_opts.ssl = &ssl_opts;
}
struct
{
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
} opts =
{
"publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0
};
if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTResponse response = MQTTResponse_initializer;
conn_opts.cleanstart = 1;
response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
rc = response.reasonCode;
}
else
{
conn_opts.cleansession = 1;
rc = MQTTClient_connect(client, &conn_opts);
}
if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
printf("Connected\n");
return rc;
}
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
......@@ -114,27 +145,42 @@ int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_mess
return 1;
}
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{
printf("Trace : %d, %s\n", level, message);
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
char* topic = NULL;
MQTTProperties pub_props = MQTTProperties_initializer;
char* buffer = NULL;
int rc = 0;
char url[100];
char* url;
if (argc < 2)
usage();
getopts(argc, argv);
if (getopts(argc, argv, &opts) != 0)
usage();
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.connection)
url = opts.connection;
else
{
url = malloc(100);
sprintf(url, "%s:%s", opts.host, opts.port);
}
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
if (opts.tracelevel > 0)
{
MQTTClient_setTraceCallback(trace_callback);
MQTTClient_setTraceLevel(opts.tracelevel);
}
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
......@@ -143,18 +189,31 @@ int main(int argc, char** argv)
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
myconnect(&client, &conn_opts);
myconnect(client);
buffer = malloc(opts.maxdatalen);
if (opts.MQTTVersion >= MQTTVERSION_5)
{
MQTTProperty property;
if (opts.message_expiry > 0)
{
property.identifier = MESSAGE_EXPIRY_INTERVAL;
property.value.integer4 = opts.message_expiry;
MQTTProperties_add(&pub_props, &property);
}
if (opts.user_property.name)
{
property.identifier = USER_PROPERTY;
property.value.data.data = opts.user_property.name;
property.value.data.len = strlen(opts.user_property.name);
property.value.value.data = opts.user_property.value;
property.value.value.len = strlen(opts.user_property.value);
MQTTProperties_add(&pub_props, &property);
}
}
while (!toStop)
{
int data_len = 0;
......@@ -166,19 +225,34 @@ int main(int argc, char** argv)
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
/* printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); */
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
printf("Publishing data of length %d\n", data_len);
if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTResponse response = MQTTResponse_initializer;
response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, NULL);
rc = response.reasonCode;
}
else
rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0)
{
myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
myconnect(client);
if (opts.MQTTVersion == MQTTVERSION_5)
{
MQTTResponse response = MQTTResponse_initializer;
response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, NULL);
rc = response.reasonCode;
}
else
rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
......@@ -188,89 +262,12 @@ int main(int argc, char** argv)
free(buffer);
MQTTClient_disconnect(client, 0);
if (opts.MQTTVersion == MQTTVERSION_5)
rc = MQTTClient_disconnect5(client, 0, SUCCESS, NULL);
else
rc = MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return EXIT_SUCCESS;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
count++;
}
}
......@@ -20,26 +20,10 @@
stdout subscriber
compulsory parameters:
--topic topic to subscribe to
defaulted parameters:
--host localhost
--port 1883
--qos 2
--delimiter \n
--clientid stdout-subscriber
--showtopics off
--keepalive 10
--userid none
--password none
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include "pubsub_opts.h"
#include <stdio.h>
#include <signal.h>
......@@ -57,21 +41,13 @@
volatile int toStop = 0;
struct opts_struct
{
char* clientid;
int nodelimiter;
char* delimiter;
int qos;
char* username;
char* password;
char* host;
char* port;
int showtopics;
int keepalive;
} opts =
struct pubsub_opts opts =
{
"stdout-subscriber", 0, "\n", 2, NULL, NULL, "localhost", "1883", 0, 10
MQTTVERSION_DEFAULT, 0,
NULL, "paho-cs-sub", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", NULL, 0, 10,
NULL, NULL, 0, 0, /* will options */
0, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
0, {NULL, NULL}, /* publish properties */
};
......@@ -109,7 +85,6 @@ void cfinish(int sig)
toStop = 1;
}
void getopts(int argc, char** argv);
int main(int argc, char** argv)
{
......@@ -125,11 +100,13 @@ int main(int argc, char** argv)
topic = argv[1];
if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1;
if (opts.showtopics)
opts.verbose = 1;
if (opts.verbose)
printf("topic is %s\n", topic);
getopts(argc, argv);
if (getopts(argc, argv, &opts) != 0)
usage();
sprintf(url, "%s:%s", opts.host, opts.port);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
......@@ -156,9 +133,9 @@ int main(int argc, char** argv)
rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
if (message)
{
if (opts.showtopics)
if (opts.verbose)
printf("%s\t", topicName);
if (opts.nodelimiter)
if (opts.delimiter == NULL)
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
......@@ -178,93 +155,3 @@ int main(int argc, char** argv)
return EXIT_SUCCESS;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
opts.nodelimiter = 1;
}
else if (strcmp(argv[count], "--showtopics") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--keepalive") == 0)
{
if (++count < argc)
opts.keepalive = atoi(argv[count]);
else
usage();
}
count++;
}
}
......@@ -24,7 +24,13 @@
int getopts(int argc, char** argv, struct pubsub_opts* opts)
{
int count = 2;
int count = 1;
if (argv[1][0] != '-')
{
opts->topic = argv[1];
count = 2;
}
while (count < argc)
{
......@@ -254,5 +260,9 @@ int getopts(int argc, char** argv, struct pubsub_opts* opts)
count++;
}
if (opts->topic == NULL)
return 1;
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment