Commit 2d1c61d5 authored by Ian Craggs's avatar Ian Craggs

Update stdinpub and stdoutsub samples to use string delimiters

parent 5c2ebb84
...@@ -63,7 +63,7 @@ void usage() ...@@ -63,7 +63,7 @@ void usage()
printf(" --port <port> (default is 1883)\n"); printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n"); printf(" --qos <qos> (default is 0)\n");
printf(" --retained (default is off)\n"); printf(" --retained (default is off)\n");
printf(" --delimiter <delim> (default is \n)"); printf(" --delimiter <delim> (default is \\n)");
printf(" --clientid <clientid> (default is hostname+timestamp)"); printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n"); printf(" --maxdatalen 100\n");
printf(" --username none\n"); printf(" --username none\n");
...@@ -93,7 +93,7 @@ void cfinish(int sig) ...@@ -93,7 +93,7 @@ void cfinish(int sig)
struct struct
{ {
char* clientid; char* clientid;
char delimiter; char* delimiter;
int maxdatalen; int maxdatalen;
int qos; int qos;
int retained; int retained;
...@@ -104,7 +104,7 @@ struct ...@@ -104,7 +104,7 @@ struct
int verbose; int verbose;
} opts = } opts =
{ {
"publisher", '\n', 100, 0, 0, NULL, NULL, "localhost", "1883", 0 "publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0
}; };
void getopts(int argc, char** argv); void getopts(int argc, char** argv);
...@@ -156,11 +156,19 @@ int main(int argc, char** argv) ...@@ -156,11 +156,19 @@ int main(int argc, char** argv)
while (!toStop) while (!toStop)
{ {
int data_len = 0; int data_len = 0;
int delim_len = 0;
delim_len = strlen(opts.delimiter);
do do
{ {
buffer[data_len++] = getchar(); buffer[data_len++] = getchar();
} while (buffer[data_len-1] != opts.delimiter && data_len < opts.maxdatalen); if (data_len > delim_len)
{
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose) if (opts.verbose)
printf("Publishing data of length %d\n", data_len); printf("Publishing data of length %d\n", data_len);
...@@ -256,7 +264,7 @@ void getopts(int argc, char** argv) ...@@ -256,7 +264,7 @@ void getopts(int argc, char** argv)
else if (strcmp(argv[count], "--delimiter") == 0) else if (strcmp(argv[count], "--delimiter") == 0)
{ {
if (++count < argc) if (++count < argc)
opts.delimiter = argv[count][0]; opts.delimiter = argv[count];
else else
usage(); usage();
} }
......
...@@ -13,27 +13,28 @@ ...@@ -13,27 +13,28 @@
* Contributors: * Contributors:
* Ian Craggs - initial contribution * Ian Craggs - initial contribution
*******************************************************************************/ *******************************************************************************/
/*
stdout subscriber /*
stdin publisher
compulsory parameters: compulsory parameters:
--topic topic to subscribe to --topic topic to publish on
defaulted parameters: defaulted parameters:
--host localhost --host localhost
--port 1883 --port 1883
--qos 2 --qos 0
--delimiter \n --delimiters \n
--clientid stdout_subscriber --clientid stdin_publisher
--maxdatalen 100
--userid none --userid none
--password none --password none
*/ */
#include "MQTTClient.h" #include "MQTTClient.h"
#include "MQTTClientPersistence.h" #include "MQTTClientPersistence.h"
...@@ -56,26 +57,27 @@ volatile int toStop = 0; ...@@ -56,26 +57,27 @@ volatile int toStop = 0;
void usage() void usage()
{ {
printf("MQTT stdout subscriber\n"); printf("MQTT stdin publisher\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n"); printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n"); printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n"); printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 2)\n"); printf(" --qos <qos> (default is 0)\n");
printf(" --delimiter <delim> (default is no delimiter)\n"); printf(" --retained (default is off)\n");
printf(" --clientid <clientid> (default is hostname+timestamp)\n"); printf(" --delimiter <delim> (default is \\n)");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf(" --username none\n"); printf(" --username none\n");
printf(" --password none\n"); printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
exit(-1); exit(-1);
} }
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts) void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{ {
int rc = 0; printf("Connecting\n");
if ((rc = MQTTClient_connect(*client, opts)) != 0) if (MQTTClient_connect(*client, opts) != 0)
{ {
printf("Failed to connect, return code %d\n", rc); printf("Failed to connect\n");
exit(-1); exit(-1);
} }
} }
...@@ -88,50 +90,59 @@ void cfinish(int sig) ...@@ -88,50 +90,59 @@ void cfinish(int sig)
} }
struct opts_struct struct
{ {
char* clientid; char* clientid;
int nodelimiter; char* delimiter;
char delimiter; int maxdatalen;
int qos; int qos;
int retained;
char* username; char* username;
char* password; char* password;
char* host; char* host;
char* port; char* port;
int showtopics; int verbose;
} opts = } opts =
{ {
"stdout-subscriber", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0 "publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0
}; };
void getopts(int argc, char** argv); void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MQTTClient client; MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL; char* topic = NULL;
char* buffer = NULL;
int rc = 0; int rc = 0;
char url[100]; char url[100];
if (argc < 2) if (argc < 2)
usage(); usage();
topic = argv[1]; getopts(argc, argv);
if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1;
if (opts.showtopics)
printf("topic is %s\n", topic);
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port); sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10; conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0; conn_opts.reliable = 0;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
...@@ -139,33 +150,41 @@ int main(int argc, char** argv) ...@@ -139,33 +150,41 @@ int main(int argc, char** argv)
conn_opts.password = opts.password; conn_opts.password = opts.password;
myconnect(&client, &conn_opts); myconnect(&client, &conn_opts);
rc = MQTTClient_subscribe(client, topic, opts.qos);
buffer = malloc(opts.maxdatalen);
while (!toStop) while (!toStop)
{ {
char* topicName = NULL; int data_len = 0;
int topicLen; int delim_len = 0;
MQTTClient_message* message = NULL;
rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000); delim_len = strlen(opts.delimiter);
if (message) do
{ {
if (opts.showtopics) buffer[data_len++] = getchar();
printf("%s\t", topicName); if (data_len > delim_len)
if (opts.nodelimiter) {
printf("%.*s", message->payloadlen, (char*)message->payload); //printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
else if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter); break;
fflush(stdout); }
MQTTClient_freeMessage(&message); } while (data_len < opts.maxdatalen);
MQTTClient_free(topicName);
} if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0) if (rc != 0)
{
myconnect(&client, &conn_opts); myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
} }
printf("Stopping\n"); printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0); MQTTClient_disconnect(client, 0);
...@@ -180,7 +199,11 @@ void getopts(int argc, char** argv) ...@@ -180,7 +199,11 @@ void getopts(int argc, char** argv)
while (count < argc) while (count < argc)
{ {
if (strcmp(argv[count], "--qos") == 0) if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{ {
if (++count < argc) if (++count < argc)
{ {
...@@ -231,30 +254,17 @@ void getopts(int argc, char** argv) ...@@ -231,30 +254,17 @@ void getopts(int argc, char** argv)
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--delimiter") == 0) else if (strcmp(argv[count], "--maxdatalen") == 0)
{ {
if (++count < argc) if (++count < argc)
{ opts.maxdatalen = atoi(argv[count]);
if (strcmp("newline", argv[count]) == 0)
opts.delimiter = '\n';
else
opts.delimiter = argv[count][0];
opts.nodelimiter = 0;
}
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--showtopics") == 0) else if (strcmp(argv[count], "--delimiter") == 0)
{ {
if (++count < argc) if (++count < argc)
{ opts.delimiter = argv[count];
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else else
usage(); usage();
} }
...@@ -262,3 +272,4 @@ void getopts(int argc, char** argv) ...@@ -262,3 +272,4 @@ void getopts(int argc, char** argv)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment