Commit 2bf25bc5 authored by Ian Craggs's avatar Ian Craggs

Correct stdoutsub.c source - bug #422540

parent 253842ce
...@@ -12,29 +12,29 @@ ...@@ -12,29 +12,29 @@
* *
* Contributors: * Contributors:
* Ian Craggs - initial contribution * Ian Craggs - initial contribution
* Ian Craggs - change delimiter option from char to string
*******************************************************************************/ *******************************************************************************/
/* /*
stdin publisher
stdout subscriber
compulsory parameters: compulsory parameters:
--topic topic to publish on --topic topic to subscribe to
defaulted parameters: defaulted parameters:
--host localhost --host localhost
--port 1883 --port 1883
--qos 0 --qos 2
--delimiters \n --delimiter \n
--clientid stdin_publisher --clientid stdout_subscriber
--maxdatalen 100
--userid none --userid none
--password none --password none
*/ */
#include "MQTTClient.h" #include "MQTTClient.h"
#include "MQTTClientPersistence.h" #include "MQTTClientPersistence.h"
...@@ -57,27 +57,26 @@ volatile int toStop = 0; ...@@ -57,27 +57,26 @@ volatile int toStop = 0;
void usage() void usage()
{ {
printf("MQTT stdin publisher\n"); printf("MQTT stdout subscriber\n");
printf("Usage: stdinpub topicname <options>, where options are:\n"); printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n"); printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n"); printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n"); printf(" --qos <qos> (default is 2)\n");
printf(" --retained (default is off)\n"); printf(" --delimiter <delim> (default is \\n)\n");
printf(" --delimiter <delim> (default is \\n)"); printf(" --clientid <clientid> (default is hostname+timestamp)\n");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf(" --username none\n"); printf(" --username none\n");
printf(" --password none\n"); printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
exit(-1); exit(-1);
} }
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts) void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{ {
printf("Connecting\n"); int rc = 0;
if (MQTTClient_connect(*client, opts) != 0) if ((rc = MQTTClient_connect(*client, opts)) != 0)
{ {
printf("Failed to connect\n"); printf("Failed to connect, return code %d\n", rc);
exit(-1); exit(-1);
} }
} }
...@@ -90,59 +89,50 @@ void cfinish(int sig) ...@@ -90,59 +89,50 @@ void cfinish(int sig)
} }
struct struct opts_struct
{ {
char* clientid; char* clientid;
int nodelimiter;
char* delimiter; char* delimiter;
int maxdatalen;
int qos; int qos;
int retained;
char* username; char* username;
char* password; char* password;
char* host; char* host;
char* port; char* port;
int verbose; int showtopics;
} opts = } opts =
{ {
"publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0 "stdout-subscriber", 0, "\n", 2, NULL, NULL, "localhost", "1883", 0
}; };
void getopts(int argc, char** argv); void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MQTTClient client; MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL; char* topic = NULL;
char* buffer = NULL;
int rc = 0; int rc = 0;
char url[100]; char url[100];
if (argc < 2) if (argc < 2)
usage(); usage();
getopts(argc, argv); topic = argv[1];
sprintf(url, "%s:%s", opts.host, opts.port); if (strchr(topic, '#') || strchr(topic, '+'))
if (opts.verbose) opts.showtopics = 1;
printf("URL is %s\n", url); if (opts.showtopics)
printf("topic is %s\n", topic);
topic = argv[1]; getopts(argc, argv);
printf("Using topic %s\n", topic); sprintf(url, "%s:%s", opts.host, opts.port);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish); signal(SIGINT, cfinish);
signal(SIGTERM, cfinish); signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10; conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0; conn_opts.reliable = 0;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
...@@ -151,41 +141,33 @@ int main(int argc, char** argv) ...@@ -151,41 +141,33 @@ int main(int argc, char** argv)
myconnect(&client, &conn_opts); myconnect(&client, &conn_opts);
buffer = malloc(opts.maxdatalen); rc = MQTTClient_subscribe(client, topic, opts.qos);
while (!toStop) while (!toStop)
{ {
int data_len = 0; char* topicName = NULL;
int delim_len = 0; int topicLen;
MQTTClient_message* message = NULL;
delim_len = strlen(opts.delimiter); rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
do if (message)
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{ {
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); if (opts.showtopics)
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0) printf("%s\t", topicName);
break; if (opts.nodelimiter)
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
fflush(stdout);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
} }
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0) if (rc != 0)
{
myconnect(&client, &conn_opts); myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
} }
printf("Stopping\n"); printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0); MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client); MQTTClient_destroy(&client);
...@@ -199,11 +181,7 @@ void getopts(int argc, char** argv) ...@@ -199,11 +181,7 @@ void getopts(int argc, char** argv)
while (count < argc) while (count < argc)
{ {
if (strcmp(argv[count], "--retained") == 0) if (strcmp(argv[count], "--qos") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{ {
if (++count < argc) if (++count < argc)
{ {
...@@ -254,17 +232,24 @@ void getopts(int argc, char** argv) ...@@ -254,17 +232,24 @@ void getopts(int argc, char** argv)
else else
usage(); usage();
} }
else if (strcmp(argv[count], "--maxdatalen") == 0) else if (strcmp(argv[count], "--delimiter") == 0)
{ {
if (++count < argc) if (++count < argc)
opts.maxdatalen = atoi(argv[count]); opts.delimiter = argv[count];
else else
usage(); opts.nodelimiter = 1;
} }
else if (strcmp(argv[count], "--delimiter") == 0) else if (strcmp(argv[count], "--showtopics") == 0)
{ {
if (++count < argc) if (++count < argc)
opts.delimiter = argv[count]; {
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else else
usage(); usage();
} }
...@@ -272,4 +257,3 @@ void getopts(int argc, char** argv) ...@@ -272,4 +257,3 @@ void getopts(int argc, char** argv)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment