Commit 68e27256 authored by Ian Craggs's avatar Ian Craggs

Bug #402253 - add sample code

parent e01edbfb
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTAsync.h"
#define ADDRESS "tcp://swtest.hursley.ibm.com:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}
void onSend(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
opts.onSuccess = onSend;
opts.context = client;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while (!finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
deliveredtoken = 0;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(-1);
}
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
int ch;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, msgarrvd, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
while (!subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (finished)
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(-1);
}
while (!disc_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
exit:
MQTTAsync_destroy(&client);
return rc;
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while(deliveredtoken != token);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin_publisher
--maxdatalen 100
--userid none
--password none
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
void usage()
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 0)\n");
printf(" --retained (default is off)\n");
printf(" --delimiter <delim> (default is \n)");
printf(" --clientid <clientid> (default is hostname+timestamp)");
printf(" --maxdatalen 100\n");
printf(" --username none\n");
printf(" --password none\n");
exit(-1);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
printf("Connecting\n");
if (MQTTClient_connect(*client, opts) != 0)
{
printf("Failed to connect\n");
exit(-1);
}
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct
{
char* clientid;
char delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
} opts =
{
"publisher", '\n', 100, 0, 0, NULL, NULL, "localhost", "1883", 0
};
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL;
char* buffer = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
myconnect(&client, &conn_opts);
buffer = malloc(opts.maxdatalen);
while (!toStop)
{
int data_len = 0;
do
{
buffer[data_len++] = getchar();
} while (buffer[data_len-1] != opts.delimiter && data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0)
{
myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
}
printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return 0;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count][0];
else
usage();
}
count++;
}
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
/*
stdout subscriber
compulsory parameters:
--topic topic to subscribe to
defaulted parameters:
--host localhost
--port 1883
--qos 2
--delimiter \n
--clientid stdout_subscriber
--userid none
--password none
*/
extern "C" {
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
}
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
void usage()
{
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 2)\n");
printf(" --delimiter <delim> (default is no delimiter)\n");
printf(" --clientid <clientid> (default is hostname+timestamp)\n");
printf(" --username none\n");
printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
exit(-1);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
int rc = 0;
if ((rc = MQTTClient_connect(*client, opts)) != 0)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
struct opts_struct
{
char* clientid;
int nodelimiter;
char delimiter;
int qos;
char* username;
char* password;
char* host;
char* port;
int showtopics;
} opts =
{
"stdout-subscriber", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0
};
void getopts(int argc, char** argv);
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
topic = argv[1];
if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1;
if (opts.showtopics)
printf("topic is %s\n", topic);
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
conn_opts.keepAliveInterval = 10;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
myconnect(&client, &conn_opts);
rc = MQTTClient_subscribe(client, topic, opts.qos);
while (!toStop)
{
char* topicName = NULL;
int topicLen;
MQTTClient_message* message = NULL;
rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
if (message)
{
if (opts.showtopics)
printf("%s\t", topicName);
if (opts.nodelimiter)
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter);
fflush(stdout);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
}
if (rc != 0)
myconnect(&client, &conn_opts);
}
printf("Stopping\n");
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return 0;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
{
if (strcmp("newline", argv[count]) == 0)
opts.delimiter = '\n';
else
opts.delimiter = argv[count][0];
opts.nodelimiter = 0;
}
else
usage();
}
else if (strcmp(argv[count], "--showtopics") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else
usage();
}
count++;
}
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
/*
stdout subscriber for the asynchronous client
compulsory parameters:
--topic topic to subscribe to
defaulted parameters:
--host localhost
--port 1883
--qos 2
--delimiter \n
--clientid stdout_subscriber
--userid none
--password none
*/
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <Windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int finished = 0;
char* topic = NULL;
int subscribed = 0;
int disconnected = 0;
void cfinish(int sig)
{
signal(SIGINT, NULL);
finished = 1;
}
struct
{
char* clientid;
int nodelimiter;
char delimiter;
int qos;
char* username;
char* password;
char* host;
char* port;
int showtopics;
} opts =
{
"stdout-subscriber", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0
};
void usage()
{
printf("MQTT stdout subscriber\n");
printf("Usage: stdoutsub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is localhost)\n");
printf(" --port <port> (default is 1883)\n");
printf(" --qos <qos> (default is 2)\n");
printf(" --delimiter <delim> (default is no delimiter)\n");
printf(" --clientid <clientid> (default is hostname+timestamp)\n");
printf(" --username none\n");
printf(" --password none\n");
printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n");
exit(-1);
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
{
if (strcmp("newline", argv[count]) == 0)
opts.delimiter = '\n';
else
opts.delimiter = argv[count][0];
opts.nodelimiter = 0;
}
else
usage();
}
else if (strcmp(argv[count], "--showtopics") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
usage();
}
else
usage();
}
count++;
}
}
void connectionLost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
}
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
if (opts.showtopics)
printf("%s\t", topicName);
if (opts.nodelimiter)
printf("%.*s", message->payloadlen, (char*)message->payload);
else
printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter);
fflush(stdout);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
disconnected = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response->code);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response->code);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
if (opts.showtopics)
printf("Subscribing to topic %s\n", topic, opts.clientid, opts.qos);
ropts.onSuccess = onSubscribe;
ropts.onFailure = onSubscribeFailure;
ropts.context = client;
if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
int main(int argc, char** argv)
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = 0;
char url[100];
if (argc < 2)
usage();
topic = argv[1];
if (strchr(topic, '#') || strchr(topic, '+'))
opts.showtopics = 1;
if (opts.showtopics)
printf("topic is %s\n", topic);
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
conn_opts.keepAliveInterval = 10;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
while (!subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (finished)
goto exit;
while (!finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(-1);
}
while (!disconnected)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
exit:
MQTTAsync_destroy(&client);
return 0;
}
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompanies this distribution.
*
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment