Commit 00bdf989 authored by Ian Craggs's avatar Ian Craggs

Merge branch 'issue_2' into develop

Automatic reconnect and offline buffering
parents ad943e4c b62244d5
......@@ -5,12 +5,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643" moduleId="org.eclipse.cdt.core.settings" name="debug-native">
<externalSettings/>
<extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
......@@ -38,6 +38,17 @@
</tool>
</toolChain>
</folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.660980254" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.919295913.892752676">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.919295913.892752676" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.919295913">
<option id="gnu.c.compiler.option.include.paths.1043418401" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.1471037566" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
<sourceEntries>
<entry flags="VALUE_WORKSPACE_PATH" kind="sourcePath" name="src"/>
</sourceEntries>
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
......@@ -46,12 +57,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804" moduleId="org.eclipse.cdt.core.settings" name="debug-x86-linux">
<externalSettings/>
<extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
......@@ -79,6 +90,14 @@
</tool>
</toolChain>
</folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.44392871" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543.1236301446">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543.1236301446" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.1542253543">
<option id="gnu.c.compiler.option.include.paths.387085861" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6/"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.912266645" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
......@@ -87,12 +106,12 @@
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084" moduleId="org.eclipse.cdt.core.settings" name="debug-arm-linux-gnueabihf">
<externalSettings/>
<extensions>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.ELF" point="org.eclipse.cdt.core.BinaryParser"/>
</extensions>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
......@@ -120,6 +139,14 @@
</tool>
</toolChain>
</folderInfo>
<fileInfo id="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.1410045178" name="mqttclient_module.c" rcbsApplicability="disable" resourcePath="test/python/mqttclient_module.c" toolsToInvoke="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752.404015131">
<tool id="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752.404015131" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base.1805700752">
<option id="gnu.c.compiler.option.include.paths.253579949" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="/usr/include/python2.6/"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.987997893" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
</fileInfo>
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
......@@ -128,9 +155,6 @@
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
<project id="org.eclipse.paho.mqtt.c.null.1335713239" name="org.eclipse.paho.mqtt.c"/>
</storageModule>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
<storageModule moduleId="refreshScope" versionNumber="2">
<configuration configurationName="Default">
......@@ -144,4 +168,25 @@
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.make.core.buildtargets"/>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804;cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.;cdt.managedbuild.tool.gnu.cpp.compiler.base.1478312340;cdt.managedbuild.tool.gnu.cpp.compiler.input.1918693642">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.128299804;cdt.managedbuild.toolchain.gnu.base.2116872643.128299804.;cdt.managedbuild.tool.gnu.c.compiler.base.1542253543;cdt.managedbuild.tool.gnu.c.compiler.input.273102950">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084;cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.;cdt.managedbuild.tool.gnu.c.compiler.base.1805700752;cdt.managedbuild.tool.gnu.c.compiler.input.892553453">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643;cdt.managedbuild.toolchain.gnu.base.2116872643.2105697674;cdt.managedbuild.tool.gnu.cpp.compiler.base.2093522624;cdt.managedbuild.tool.gnu.cpp.compiler.input.58852055">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084;cdt.managedbuild.toolchain.gnu.base.2116872643.1588218084.;cdt.managedbuild.tool.gnu.cpp.compiler.base.1005171182;cdt.managedbuild.tool.gnu.cpp.compiler.input.1367844459">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.2116872643;cdt.managedbuild.toolchain.gnu.base.2116872643.2105697674;cdt.managedbuild.tool.gnu.c.compiler.base.919295913;cdt.managedbuild.tool.gnu.c.compiler.input.873628110">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
</storageModule>
</cproject>
......@@ -5,6 +5,11 @@
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
<triggers>clean,full,incremental,</triggers>
......@@ -23,5 +28,6 @@
<nature>org.eclipse.cdt.core.ccnature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
</pydev_project>
......@@ -84,10 +84,10 @@ HEADERS = $(srcdir)/*.h
HEADERS_C = $(filter-out $(srcdir)/MQTTAsync.h, $(HEADERS))
HEADERS_A = $(HEADERS)
SAMPLE_FILES_C = stdinpub stdoutsub pubsync pubasync subasync
SAMPLE_FILES_C = paho_cs_pub paho_cs_sub MQTTClient_publish MQTTClient_publish_async MQTTClient_subscribe
SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = stdinpuba stdoutsuba MQTTAsync_subscribe MQTTAsync_publish
SAMPLE_FILES_A = paho_c_pub paho_c_sub MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}
TEST_FILES_C = test1 test2 sync_client_test test_mqtt4sync
......
Issue 2 - offline buffering and automatic reconnect
if we don't have automatic reconnect and we don't successfully connect, then ...
offline buffering implies automatic reconnect?
is it worth having automatic reconnect without offline buffering?
should automatic reconnect even be offered separately?
\ No newline at end of file
/*******************************************************************************
* Copyright (c) 2009, 2015 IBM Corp.
* Copyright (c) 2009, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -27,6 +27,7 @@
* Ian Craggs - fix for bug 465369 - longer latency than expected
* Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
* Ian Craggs - fix for bug 484363 - segfault in getReadySocket
* Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
*******************************************************************************/
/**
......@@ -60,6 +61,10 @@
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
#if !defined(min)
#define min(a, b) (((a) < (b)) ? (a) : (b))
#endif
extern Sockets s;
static ClientStates ClientState =
......@@ -289,6 +294,9 @@ typedef struct MQTTAsync_struct
MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc;
void* context; /* the context to be associated with the main callbacks*/
MQTTAsync_connected* connected;
void* connected_context; /* the context to be associated with the connected callback*/
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
......@@ -299,6 +307,20 @@ typedef struct MQTTAsync_struct
MQTTPacket* pack;
/* added for offline buffering */
MQTTAsync_createOptions* createOptions;
int shouldBeConnected;
/* added for automatic reconnect */
int automaticReconnect;
int minRetryInterval;
int maxRetryInterval;
int currentInterval;
START_TIME_TYPE lastConnectionFailedTime;
int retrying;
int reconnectNow;
} MQTTAsyncs;
......@@ -369,8 +391,8 @@ int MQTTAsync_checkConn(MQTTAsync_command* command, MQTTAsyncs* client)
}
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
{
int rc = 0;
MQTTAsyncs *m = NULL;
......@@ -390,6 +412,12 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien
goto exit;
}
if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
if (!initialized)
{
#if defined(HEAP_H)
......@@ -430,6 +458,13 @@ int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clien
m->c->messageQueue = ListInitialize();
m->c->clientID = MQTTStrdup(clientId);
m->shouldBeConnected = 0;
if (options)
{
m->createOptions = malloc(sizeof(MQTTAsync_createOptions));
memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
}
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
if (rc == 0)
......@@ -451,6 +486,14 @@ exit:
}
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context)
{
return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
persistence_context, NULL);
}
void MQTTAsync_terminate(void)
{
FUNC_ENTRY;
......@@ -789,6 +832,47 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
}
void MQTTAsync_startConnectRetry(MQTTAsyncs* m)
{
if (m->automaticReconnect && m->shouldBeConnected)
{
m->lastConnectionFailedTime = MQTTAsync_start_clock();
if (m->retrying)
m->currentInterval = min(m->currentInterval * 2, m->maxRetryInterval);
else
{
m->currentInterval = m->minRetryInterval;
m->retrying = 1;
}
}
}
int MQTTAsync_reconnect(MQTTAsync handle)
{
int rc = MQTTASYNC_FAILURE;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m->automaticReconnect && m->shouldBeConnected)
{
m->reconnectNow = 1;
if (m->retrying == 0)
{
m->currentInterval = m->minRetryInterval;
m->retrying = 1;
}
rc = MQTTASYNC_SUCCESS;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{
MQTTAsyncs* m = handle;
......@@ -799,12 +883,16 @@ void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{
int was_connected = m->c->connected;
MQTTAsync_closeSession(m->c);
if (command->details.dis.internal && m->cl && was_connected)
if (command->details.dis.internal)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
(*(m->cl))(m->context, NULL);
if (m->cl && was_connected)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
(*(m->cl))(m->context, NULL);
}
MQTTAsync_startConnectRetry(m);
}
else if (!command->details.dis.internal && command->onSuccess)
else if (command->onSuccess)
{
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, NULL);
......@@ -1237,6 +1325,7 @@ void MQTTAsync_checkTimeouts()
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL);
}
MQTTAsync_startConnectRetry(m);
}
continue;
}
......@@ -1266,6 +1355,21 @@ void MQTTAsync_checkTimeouts()
}
for (i = 0; i < timed_out_count; ++i)
ListRemoveHead(m->responses); /* remove the first response in the list */
if (m->automaticReconnect && m->retrying)
{
if (m->reconnectNow || MQTTAsync_elapsed(m->lastConnectionFailedTime) > (m->currentInterval * 1000))
{
/* put the connect command to the head of the command queue, using the next serverURI */
MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Automatically attempting to reconnect");
MQTTAsync_addCommand(conn, sizeof(m->connect));
m->reconnectNow = 0;
}
}
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
exit:
......@@ -1429,6 +1533,8 @@ void MQTTAsync_destroy(MQTTAsync* handle)
if (m->serverURI)
free(m->serverURI);
if (m->createOptions)
free(m->createOptions);
if (!ListRemove(handles, m))
Log(LOG_ERROR, -1, "free error");
*handle = NULL;
......@@ -1470,6 +1576,7 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTASYNC_SUCCESS)
{
m->retrying = 0;
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0;
......@@ -1491,6 +1598,12 @@ int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
}
free(connack);
m->pack = NULL;
#if !defined(WIN32) && !defined(WIN64)
Thread_signal_cond(send_cond);
#else
if (!Thread_check_sem(send_sem))
Thread_post_sem(send_sem);
#endif
}
FUNC_EXIT_RC(rc);
return rc;
......@@ -1587,6 +1700,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_freeConnect(m->connect);
int onSuccess = (m->connect.onSuccess != NULL); /* save setting of onSuccess callback */
if (m->connect.onSuccess)
{
MQTTAsync_successData data;
......@@ -1599,6 +1713,13 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
data.alt.connect.MQTTVersion = m->connect.details.conn.MQTTVersion;
data.alt.connect.sessionPresent = sessionPresent;
(*(m->connect.onSuccess))(m->connect.context, &data);
m->connect.onSuccess = NULL; /* don't accidentally call it again */
}
if (m->connected)
{
Log(TRACE_MIN, -1, "Calling connected for client %s", m->c->clientID);
char* reason = (onSuccess) ? "connect onSuccess called" : "automatic reconnect";
(*(m->connected))(m->connected_context, reason);
}
}
else
......@@ -1630,6 +1751,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, &data);
}
MQTTAsync_startConnectRetry(m);
}
}
}
......@@ -1803,6 +1925,28 @@ int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
}
int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c->connect_state != 0)
rc = MQTTASYNC_FAILURE;
else
{
m->connected_context = context;
m->connected = connected;
}
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_closeOnly(Clients* client)
{
FUNC_ENTRY;
......@@ -1880,9 +2024,6 @@ int MQTTAsync_cleanSession(Clients* client)
}
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, size_t topicLen, MQTTAsync_message* mm)
{
int rc;
......@@ -1971,9 +2112,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 ||
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2 &&
options->struct_version != 3))
if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 4)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
......@@ -2029,10 +2168,16 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->keepAliveInterval = options->keepAliveInterval;
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = options->maxInflight;
if (options->struct_version == 3)
if (options->struct_version >= 3)
m->c->MQTTVersion = options->MQTTVersion;
else
m->c->MQTTVersion = 0;
if (options->struct_version >= 4)
{
m->automaticReconnect = options->automaticReconnect;
m->minRetryInterval = options->minRetryInterval;
m->maxRetryInterval = options->maxRetryInterval;
}
if (m->c->will)
{
......@@ -2089,6 +2234,7 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
m->c->username = options->username;
m->c->password = options->password;
m->c->retryInterval = options->retryInterval;
m->shouldBeConnected = 1;
/* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
......@@ -2133,6 +2279,8 @@ int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* o
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (!internal)
m->shouldBeConnected = 0;
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
......@@ -2404,7 +2552,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
FUNC_ENTRY;
if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0)
else if (m->c->connected == 0 && (m->createOptions == NULL ||
m->createOptions->sendWhileDisconnected == 0 || m->shouldBeConnected == 0))
rc = MQTTASYNC_DISCONNECTED;
else if (!UTF8_validateString(destinationName))
rc = MQTTASYNC_BAD_UTF8_STRING;
......@@ -2593,6 +2742,7 @@ exit:
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL);
}
MQTTAsync_startConnectRetry(m);
}
}
FUNC_EXIT_RC(rc);
......@@ -2605,7 +2755,6 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
struct timeval tp = {0L, 0L};
static Ack ack;
MQTTPacket* pack = NULL;
static int nosockets_count = 0;
FUNC_ENTRY;
if (timeout > 0L)
......@@ -2623,18 +2772,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
*sock = Socket_getReadySocket(0, &tp);
Thread_unlock_mutex(socket_mutex);
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
{
MQTTAsync_sleep(100L);
#if 0
if (s.clientsds->count == 0)
{
if (++nosockets_count == 50) /* 5 seconds with no sockets */
tostop = 1;
}
#endif
}
else
nosockets_count = 0;
#if defined(OPENSSL)
}
#endif
......@@ -2675,6 +2813,7 @@ MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
(*(m->connect.onFailure))(m->connect.context, NULL);
}
MQTTAsync_startConnectRetry(m);
}
}
}
......
/*******************************************************************************
* Copyright (c) 2009, 2015 IBM Corp.
* Copyright (c) 2009, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -16,6 +16,7 @@
* Ian Craggs - multiple server connection support
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
* Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
*******************************************************************************/
/********************************************************************/
......@@ -24,7 +25,7 @@
* @cond MQTTAsync_main
* @mainpage Asynchronous MQTT client library for C
*
* &copy; Copyright IBM Corp. 2009, 2015
* &copy; Copyright IBM Corp. 2009, 2016
*
* @brief An Asynchronous MQTT client library for C.
*
......@@ -317,6 +318,23 @@ typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
*/
typedef void MQTTAsync_connectionLost(void* context, char* cause);
/**
* This is a callback function, which will be called when the client
* library successfully connects. This is superfluous when the connection
* is made in response to a MQTTAsync_connect call, because the onSuccess
* callback can be used. It is intended for use when automatic reconnect
* is enabled, so that when a reconnection attempt succeeds in the background,
* the application is notified and can take any required actions.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param cause The reason for the disconnection.
* Currently, <i>cause</i> is always set to NULL.
*/
typedef void MQTTAsync_connected(void* context, char* cause);
/** The data returned on completion of an unsuccessful API call in the response callback onFailure. */
typedef struct
{
......@@ -440,6 +458,14 @@ typedef struct
*/
DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);
/**
......@@ -488,6 +514,24 @@ DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_
DLLExport int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context);
typedef struct
{
/** The eyecatcher for this structure. must be MQCO. */
const char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Whether to allow messages to be sent when the client library is not connected. */
int sendWhileDisconnected;
/** the maximum number of messages allowed to be buffered while not connected. */
int maxBufferedMessages;
} MQTTAsync_createOptions;
#define MQTTAsync_createOptions_initializer { {'M', 'Q', 'C', 'O'}, 0, 0, 100 }
DLLExport int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context, MQTTAsync_createOptions* options);
/**
* MQTTAsync_willOptions defines the MQTT "Last Will and Testament" (LWT) settings for
* the client. In the event that a client unexpectedly loses its connection to
......@@ -583,10 +627,11 @@ typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
const char struct_id[4];
/** The version number of this structure. Must be 0, 1 or 2.
/** The version number of this structure. Must be 0, 1, 2, 3 or 4.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
* 3 signifies no automatic reconnect options
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
......@@ -695,10 +740,23 @@ typedef struct
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1
*/
int MQTTVersion;
/**
* Reconnect automatically in the case of a connection being lost?
*/
int automaticReconnect;
/**
* Minimum retry interval in seconds. Doubled on each failed retry.
*/
int minRetryInterval;
/**
* Maximum retry interval in seconds. The doubling stops here on failed retries.
*/
int maxRetryInterval;
} MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 3, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0}
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 4, 60, 1, 10, NULL, NULL, NULL, 30, 0, NULL, NULL, NULL, NULL, 0, NULL, 0, \
0, 1, 60}
/**
* This function attempts to connect a previously-created client (see
......
#*******************************************************************************
# Copyright (c) 2015 logi.cals GmbH
# Copyright (c) 2015, 2016 logi.cals GmbH
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
......@@ -12,6 +12,7 @@
#
# Contributors:
# Rainer Poisel - initial version
# Ian Craggs - update sample names
#*******************************************************************************/
# Note: on OS X you should install XCode and the associated command-line tools
......@@ -24,37 +25,37 @@ INCLUDE_DIRECTORIES(
)
# sample files c
ADD_EXECUTABLE(stdinpub stdinpub.c)
ADD_EXECUTABLE(stdoutsub stdoutsub.c)
ADD_EXECUTABLE(pubsync pubsync.c)
ADD_EXECUTABLE(pubasync pubasync.c)
ADD_EXECUTABLE(subasync subasync.c)
TARGET_LINK_LIBRARIES(stdinpub paho-mqtt3c)
TARGET_LINK_LIBRARIES(stdoutsub paho-mqtt3c)
TARGET_LINK_LIBRARIES(pubsync paho-mqtt3c)
TARGET_LINK_LIBRARIES(pubasync paho-mqtt3c)
TARGET_LINK_LIBRARIES(subasync paho-mqtt3c)
ADD_EXECUTABLE(stdinpuba stdinpuba.c)
ADD_EXECUTABLE(stdoutsuba stdoutsuba.c)
ADD_EXECUTABLE(paho_c_pub paho_c_pub.c)
ADD_EXECUTABLE(paho_c_sub paho_c_sub.c)
ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c)
ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c)
TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3a)
TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3a)
TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3c)
TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3c)
ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c)
ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c)
ADD_EXECUTABLE(MQTTClient_subscribe MQTTClient_subscribe.c)
ADD_EXECUTABLE(MQTTClient_publish MQTTClient_publish.c)
ADD_EXECUTABLE(MQTTClient_publish_async MQTTClient_publish_async.c)
TARGET_LINK_LIBRARIES(stdinpuba paho-mqtt3a)
TARGET_LINK_LIBRARIES(stdoutsuba paho-mqtt3a)
TARGET_LINK_LIBRARIES(MQTTAsync_subscribe paho-mqtt3a)
TARGET_LINK_LIBRARIES(MQTTAsync_publish paho-mqtt3a)
INSTALL(TARGETS stdinpub
stdoutsub
pubsync
pubasync
subasync
stdinpuba
stdoutsuba
TARGET_LINK_LIBRARIES(MQTTClient_subscribe paho-mqtt3c)
TARGET_LINK_LIBRARIES(MQTTClient_publish paho-mqtt3c)
TARGET_LINK_LIBRARIES(MQTTClient_publish_async paho-mqtt3c)
INSTALL(TARGETS paho_c_sub
paho_c_pub
paho_cs_sub
paho_cs_pub
MQTTAsync_subscribe
MQTTAsync_publish
MQTTClient_subscribe
MQTTClient_publish
MQTTClient_publish_async
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib)
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
* Copyright (c) 2012, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
......@@ -118,11 +118,15 @@ void onDisconnect(void* context, MQTTAsync_successData* response)
static int connected = 0;
void myconnect(MQTTAsync* client);
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? -1 : response->code);
printf("Connect failed, rc %d\n", response ? response->code : -1);
connected = -1;
MQTTAsync client = (MQTTAsync)context;
myconnect(client);
}
......@@ -135,7 +139,6 @@ void onConnect(void* context, MQTTAsync_successData* response)
connected = 1;
}
void myconnect(MQTTAsync* client)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
......@@ -152,18 +155,13 @@ void myconnect(MQTTAsync* client)
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
conn_opts.automaticReconnect = 1;
connected = 0;
if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (connected == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
}
......@@ -209,120 +207,12 @@ void connectionLost(void* context, char* cause)
}
}
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define setenv(a, b, c) _putenv_s(a, b)
#endif
#if !defined(SOCKET_ERROR)
#define SOCKET_ERROR -1
#endif
typedef struct
{
int socket;
time_t lastContact;
#if defined(OPENSSL)
SSL* ssl;
SSL_CTX* ctx;
#endif
} networkHandles;
typedef struct
{
char* clientID; /**< the string id of the client */
char* username; /**< MQTT v3.1 user name */
char* password; /**< MQTT v3.1 password */
unsigned int cleansession : 1; /**< MQTT clean session flag */
unsigned int connected : 1; /**< whether it is currently connected */
unsigned int good : 1; /**< if we have an error on the socket we turn this off */
unsigned int ping_outstanding : 1;
int connect_state : 4;
networkHandles net;
/* ... */
} Clients;
typedef struct MQTTAsync_struct
{
char* serverURI;
int ssl;
Clients* c;
/* "Global", to the client, callback definitions */
MQTTAsync_connectionLost* cl;
MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc;
void* context; /* the context to be associated with the main callbacks*/
#if 0
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */
List* responses;
unsigned int command_seqno;
MQTTPacket* pack;
#endif
} MQTTAsyncs;
int test6_socket_error(char* aString, int sock)
{
#if defined(WIN32)
int errno;
#endif
#if defined(WIN32)
errno = WSAGetLastError();
#endif
if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
{
if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
printf("Socket error %d in %s for socket %d", errno, aString, sock);
}
return errno;
}
int test6_socket_close(int socket)
{
int rc;
#if defined(WIN32)
if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
test6_socket_error("shutdown", socket);
if ((rc = closesocket(socket)) == SOCKET_ERROR)
test6_socket_error("close", socket);
#else
if (shutdown(socket, SHUT_RDWR) == SOCKET_ERROR)
test6_socket_error("shutdown", socket);
if ((rc = close(socket)) == SOCKET_ERROR)
test6_socket_error("close", socket);
#endif
return rc;
}
int main(int argc, char** argv)
{
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client;
char* topic = NULL;
char* buffer = NULL;
......@@ -341,7 +231,8 @@ int main(int argc, char** argv)
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
create_opts.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
......@@ -375,19 +266,9 @@ int main(int argc, char** argv)
pub_opts.onFailure = onPublishFailure;
do
{
published = 0;
rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
while (published == 0)
#if defined(WIN32)
Sleep(100);
#else
usleep(1000L);
#endif
if (published == -1)
myconnect(&client);
test6_socket_close(((MQTTAsyncs*)client)->c->net.socket);
}
while (published != 1);
while (rc != MQTTASYNC_SUCCESS);
}
printf("Stopping\n");
......
/*******************************************************************************
* Copyright (c) 2012, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
* Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin-publisher
--maxdatalen 100
--keepalive 10
--userid none
--password none
*/
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#include <stdio.h>
#include <signal.h>
#include <memory.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <sys/time.h>
#include <stdlib.h>
#endif
volatile int toStop = 0;
struct
{
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
int keepalive;
} opts =
{
"stdin-publisher", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0, 10
};
void usage(void)
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --retained (default is %s)\n", opts.retained ? "on" : "off");
printf(" --delimiter <delim> (default is \\n)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --maxdatalen <bytes> (default is %d)\n", opts.maxdatalen);
printf(" --username none\n");
printf(" --password none\n");
printf(" --keepalive <seconds> (default is %d seconds)\n", opts.keepalive);
exit(EXIT_FAILURE);
}
void myconnect(MQTTClient* client, MQTTClient_connectOptions* opts)
{
printf("Connecting\n");
if (MQTTClient_connect(*client, opts) != 0)
{
printf("Failed to connect\n");
exit(EXIT_FAILURE);
}
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
{
/* not expecting any messages */
return 1;
}
int main(int argc, char** argv)
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
char* topic = NULL;
char* buffer = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
rc = MQTTClient_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.reliable = 0;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
myconnect(&client, &conn_opts);
buffer = malloc(opts.maxdatalen);
while (!toStop)
{
int data_len = 0;
int delim_len = 0;
delim_len = strlen(opts.delimiter);
do
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
if (rc != 0)
{
myconnect(&client, &conn_opts);
rc = MQTTClient_publish(client, topic, data_len, buffer, opts.qos, opts.retained, NULL);
}
if (opts.qos > 0)
MQTTClient_yield();
}
printf("Stopping\n");
free(buffer);
MQTTClient_disconnect(client, 0);
MQTTClient_destroy(&client);
return EXIT_SUCCESS;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--keepalive") == 0)
{
if (++count < argc)
opts.keepalive = atoi(argv[count]);
else
usage();
}
count++;
}
}
#include <Python.h>
#include "MQTTAsync.h"
#include "LinkedList.h"
static PyObject *MqttV3Error;
static PyObject* mqttv3_create(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* serverURI;
char* clientId;
int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
PyObject *pyoptions = NULL, *temp;
MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
&persistence_option, &pyoptions))
return NULL;
if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
&& persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
{
PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
return NULL;
}
if (pyoptions)
{
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError,
"Create options parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
!= NULL)
{
if (PyInt_Check(temp))
options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
!= NULL)
{
if (PyInt_Check(temp))
options.maxBufferedMessages = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
return NULL;
}
}
rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
}
else
rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
return Py_BuildValue("ik", rc, c);
}
static List* callbacks = NULL;
static List* connected_callbacks = NULL;
enum msgTypes
{
CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE
};
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *cl, *ma, *dc;
} CallbackEntry;
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *co;
} ConnectedEntry;
int clientCompare(void* a, void* b)
{
CallbackEntry* e = (CallbackEntry*) a;
return e->c == (MQTTAsync) b;
}
int connectedCompare(void* a, void* b)
{
ConnectedEntry* e = (ConnectedEntry*) a;
return e->c == (MQTTAsync) b;
}
void connected(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
ConnectedEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->co, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
void connectionLost(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->cl, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
int messageArrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* message)
{
PyObject *result = NULL;
CallbackEntry* e = context;
int rc = -99;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
if (topicLen == 0)
result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
topicName, "payload", message->payload, message->payloadlen,
"qos", message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
else
result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
topicName, topicLen, "payload", message->payload,
message->payloadlen, "qos", message->qos, "retained",
message->retained, "dup", message->dup, "msgid",
message->msgid);
if (result)
{
if (PyInt_Check(result))
rc = (int) PyInt_AsLong(result);
Py_DECREF(result);
}
PyGILState_Release(gstate);
MQTTAsync_free(topicName);
MQTTAsync_freeMessage(&message);
return rc;
}
void deliveryComplete(void* context, MQTTAsync_token dt)
{
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Oi", e->context, dt);
result = PyEval_CallObject(e->dc, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
{
MQTTAsync c;
CallbackEntry* e = NULL;
int rc;
e = malloc(sizeof(CallbackEntry));
if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
&e->ma, &e->dc))
return NULL;
e->c = c;
if ((e->cl != Py_None && !PyCallable_Check(e->cl))
|| (e->ma != Py_None && !PyCallable_Check(e->ma))
|| (e->dc != Py_None && !PyCallable_Check(e->dc)))
{
PyErr_SetString(PyExc_TypeError,
"3rd, 4th and 5th parameters must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
ListAppend(callbacks, e, sizeof(e));
Py_XINCREF(e->cl);
Py_XINCREF(e->ma);
Py_XINCREF(e->dc);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
{
MQTTAsync c;
ConnectedEntry* e = NULL;
int rc;
e = malloc(sizeof(ConnectedEntry));
if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
return NULL;
e->c = c;
if (e->co != Py_None && !PyCallable_Check(e->co))
{
PyErr_SetString(PyExc_TypeError,
"3rd parameter must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_setConnected(c, e, connected);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
{
ListDetach(connected_callbacks, temp->content);
free(temp->content);
}
ListAppend(connected_callbacks, e, sizeof(e));
Py_XINCREF(e->co);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
typedef struct
{
MQTTAsync c;
PyObject *context;
PyObject *onSuccess, *onFailure;
enum msgTypes msgType;
} ResponseEntry;
void onSuccess(void* context, MQTTAsync_successData* response)
{
PyObject *result = NULL;
ResponseEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
switch (e->msgType)
{
case CONNECT:
result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
"MQTTVersion", response->alt.connect.MQTTVersion,
"sessionPresent", response->alt.connect.sessionPresent,
"serverURI", response->alt.connect.serverURI);
break;
case PUBLISH:
result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
"token", response->token,
"destinationName", response->alt.pub.destinationName,
"message",
"payload", response->alt.pub.message.payload,
response->alt.pub.message.payloadlen,
"qos", response->alt.pub.message.qos,
"retained", response->alt.pub.message.retained);
break;
case SUBSCRIBE:
result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
"token", response->token,
"qos", response->alt.qos);
break;
case SUBSCRIBE_MANY:
result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
"token", response->token,
"qosList", response->alt.qosList[0]);
break;
case UNSUBSCRIBE:
result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
"token", response->token);
break;
}
if (result)
{
Py_DECREF(result);
printf("decrementing reference count for result\n");
}
PyGILState_Release(gstate);
free(e);
}
void onFailure(void* context, MQTTAsync_failureData* response)
{
PyObject *result = NULL;
PyObject *arglist = NULL;
ResponseEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
// TODO: convert response into Python structure
if (e->context)
arglist = Py_BuildValue("OO", e->context, response);
else
arglist = Py_BuildValue("OO", Py_None, response);
result = PyEval_CallObject(e->onFailure, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
free(e);
}
/* return true if ok, false otherwise */
int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
enum msgTypes msgType)
{
PyObject *temp = NULL;
if (!pyoptions)
return 1;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
return 0;
}
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
{
if (PyCallable_Check(temp)) /* temp points to Python function */
responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onSuccess value must be callable");
return 0;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
{
if (PyCallable_Check(temp))
responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onFailure value must be callable");
return 0;
}
}
responseOptions->context = PyDict_GetItemString(pyoptions, "context");
if (responseOptions->onFailure || responseOptions->onSuccess)
{
ResponseEntry* r = malloc(sizeof(ResponseEntry));
r->c = c;
r->context = responseOptions->context;
responseOptions->context = r;
r->onSuccess = (PyObject*)responseOptions->onSuccess;
responseOptions->onSuccess = onSuccess;
r->onFailure = (PyObject*)responseOptions->onFailure;
responseOptions->onFailure = onFailure;
r->msgType = msgType;
}
return 1; /* not an error, if we get here */
}
static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
{
MQTTAsync c;
PyObject *pyoptions = NULL, *temp;
MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
return NULL;
if (!pyoptions)
goto skip;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
{
if (PyCallable_Check(temp)) /* temp points to Python function */
connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onSuccess value must be callable");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
{
if (PyCallable_Check(temp))
connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
else
{
PyErr_SetString(PyExc_TypeError,
"onFailure value must be callable");
return NULL;
}
}
connectOptions.context = PyDict_GetItemString(pyoptions, "context");
if (connectOptions.onFailure || connectOptions.onSuccess)
{
ResponseEntry* r = malloc(sizeof(ResponseEntry));
r->c = c;
r->context = connectOptions.context;
connectOptions.context = r;
r->onSuccess = (PyObject*)connectOptions.onSuccess;
connectOptions.onSuccess = onSuccess;
r->onFailure = (PyObject*)connectOptions.onFailure;
connectOptions.onFailure = onFailure;
r->msgType = CONNECT;
}
if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError,
"keepAliveLiveInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.cleansession = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
{
if (PyDict_Check(temp))
{
PyObject *wtemp = NULL;
if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
willOptions.topicName = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will message value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
willOptions.message = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will message value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
{
if (PyInt_Check(wtemp))
willOptions.retained = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will retained value must be int");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
{
if (PyInt_Check(wtemp))
willOptions.qos = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will qos value must be int");
return NULL;
}
}
connectOptions.will = &willOptions;
}
else
{
PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
{
if (PyString_Check(temp))
connectOptions.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "username value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
{
if (PyString_Check(temp))
connectOptions.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "password value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
{
if (PyInt_Check(temp))
connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
return NULL;
}
}
skip:
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_connect(c, &connectOptions);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_disconnect(c, &options);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
{
MQTTAsync c;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_isConnected(c);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
char* topic;
int qos = 2;
int rc;
if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
return NULL;
Py_BEGIN_ALLOW_THREADS;
rc = MQTTAsync_subscribe(c, topic, qos, &response);
Py_END_ALLOW_THREADS;
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject* topicList;
PyObject* qosList;
PyObject *pyoptions = NULL;
int count;
char** topics;
int* qoss;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
return NULL;
if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences");
return NULL;
}
if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences of the same length");
return NULL;
}
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
qoss = malloc(count * sizeof(int));
for (i = 0; i < count; ++i)
qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
Py_END_ALLOW_THREADS
for (i = 0; i < count; ++i)
PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
free(topics);
free(qoss);
if (rc == MQTTASYNC_SUCCESS)
return Py_BuildValue("iO", rc, qosList);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
char* topic;
int rc;
if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_unsubscribe(c, topic, &response);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject* topicList;
PyObject *pyoptions = NULL;
int count;
char** topics;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
return NULL;
if (!PySequence_Check(topicList))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
return NULL;
}
count = PySequence_Length(topicList);
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
Py_END_ALLOW_THREADS
free(topics);
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_send(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* destinationName;
int payloadlen;
void* payload;
int qos = 0;
int retained = 0;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
int rc;
if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
&payloadlen, &qos, &retained, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS && qos > 0)
return Py_BuildValue("ii", rc, response);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
{
MQTTAsync c;
char* destinationName;
PyObject *message, *temp;
MQTTAsync_message msg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
PyObject *pyoptions = NULL;
int rc;
if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
return NULL;
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
return NULL;
if (!PyDict_Check(message))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
{
PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
return NULL;
}
if (PyString_Check(temp))
PyString_AsStringAndSize(temp, (char**) &msg.payload,
(Py_ssize_t*) &msg.payloadlen);
else
{
PyErr_SetString(PyExc_TypeError, "payload value must be string");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
msg.qos = (int) PyInt_AsLong(temp);
if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
msg.retained = (int) PyInt_AsLong(temp);
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
return Py_BuildValue("ii", rc, response);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
{
MQTTAsync c;
unsigned long timeout = 1000L;
MQTTAsync_token dt;
int rc;
if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_waitForCompletion(c, dt, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
{
MQTTAsync c;
MQTTAsync_token* tokens;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS
rc = MQTTAsync_getPendingTokens(c, &tokens);
Py_END_ALLOW_THREADS
if (rc == MQTTASYNC_SUCCESS)
{
int i = 0;
PyObject* dts = PyList_New(0);
while (tokens[i] != -1)
PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
return Py_BuildValue("iO", rc, dts);
}
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
{
MQTTAsync c;
ListElement* temp = NULL;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
{
ListDetach(connected_callbacks, temp->content);
free(temp->content);
}
MQTTAsync_destroy(&c);
Py_INCREF(Py_None);
return Py_None;
}
static PyMethodDef MqttV3Methods[] =
{
{ "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
{ "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
"Sets the callback functions for a particular client." },
{ "setconnected", mqttv3_setconnected, METH_VARARGS,
"Sets the connected callback function for a particular client." },
{ "connect", mqttv3_connect, METH_VARARGS,
"Connects to a server using the specified options." },
{ "disconnect", mqttv3_disconnect, METH_VARARGS,
"Disconnects from a server." },
{ "isConnected", mqttv3_isConnected, METH_VARARGS,
"Determines if this client is currently connected to the server." },
{ "subscribe", mqttv3_subscribe, METH_VARARGS,
"Subscribe to the given topic." },
{ "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
"Subscribe to the given topics." },
{ "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
"Unsubscribe from the given topic." },
{ "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
"Unsubscribe from the given topics." },
{ "send", mqttv3_send, METH_VARARGS,
"Publish a message to the given topic." },
{ "sendMessage", mqttv3_sendMessage, METH_VARARGS,
"Publish a message to the given topic." },
{ "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
"Waits for the completion of the delivery of the message represented by a delivery token." },
{ "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
"Returns the tokens pending of completion." },
{ "destroy", mqttv3_destroy, METH_VARARGS,
"Free memory allocated to a MQTT client. It is the opposite to create." },
{ NULL, NULL, 0, NULL } /* Sentinel */
};
PyMODINIT_FUNC initpaho_mqtt3a(void)
{
PyObject *m;
PyEval_InitThreads();
callbacks = ListInitialize();
connected_callbacks = ListInitialize();
m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
if (m == NULL)
return;
MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
Py_INCREF(MqttV3Error);
PyModule_AddObject(m, "error", MqttV3Error);
PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
MQTTCLIENT_PERSISTENCE_ERROR);
}
#include <Python.h>
#include "MQTTClient.h"
#include "LinkedList.h"
static PyObject *MqttV3Error;
static PyObject* mqttv3_create(PyObject* self, PyObject *args)
{
MQTTClient c;
char* serverURI;
char* clientId;
int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
int rc;
if (!PyArg_ParseTuple(args, "ss|i", &serverURI, &clientId,
&persistence_option))
return NULL;
if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
&& persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
{
PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
return NULL;
}
rc = MQTTClient_create(&c, serverURI, clientId, persistence_option, NULL);
printf("create MQTTClient pointer %p\n", c);
return Py_BuildValue("ik", rc, c);
}
static List* callbacks = NULL;
typedef struct
{
MQTTClient c;
PyObject *context;
PyObject *cl, *ma, *dc;
} CallbackEntry;
int clientCompare(void* a, void* b)
{
CallbackEntry* e = (CallbackEntry*) a;
return e->c == (MQTTClient) b;
}
void connectionLost(void* context, char* cause)
{
/* call the right Python function, using the context */
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Os", e->context, cause);
result = PyEval_CallObject(e->cl, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
int messageArrived(void* context, char* topicName, int topicLen,
MQTTClient_message* message)
{
PyObject *result = NULL;
CallbackEntry* e = context;
int rc = -99;
PyGILState_STATE gstate;
//printf("messageArrived %s %s %d %.*s\n", PyString_AsString(e->context), topicName, topicLen,
// message->payloadlen, (char*)message->payload);
gstate = PyGILState_Ensure();
if (topicLen == 0)
result = PyEval_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
topicName, "payload", message->payload, message->payloadlen,
"qos", message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
else
result = PyEval_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
topicName, topicLen, "payload", message->payload,
message->payloadlen, "qos", message->qos, "retained",
message->retained, "dup", message->dup, "msgid",
message->msgid);
if (result)
{
if (PyInt_Check(result))
rc = (int) PyInt_AsLong(result);
Py_DECREF(result);
}
PyGILState_Release(gstate);
MQTTClient_free(topicName);
MQTTClient_freeMessage(&message);
return rc;
}
void deliveryComplete(void* context, MQTTClient_deliveryToken dt)
{
PyObject *arglist;
PyObject *result;
CallbackEntry* e = context;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
arglist = Py_BuildValue("Oi", e->context, dt);
result = PyEval_CallObject(e->dc, arglist);
Py_DECREF(arglist);
PyGILState_Release(gstate);
}
static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
{
MQTTClient c;
CallbackEntry* e = NULL;
int rc;
e = malloc(sizeof(CallbackEntry));
if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
&e->ma, &e->dc))
return NULL;
e->c = c;
printf("setCallbacks MQTTClient pointer %p\n", c);
if ((e->cl != Py_None && !PyCallable_Check(e->cl))
|| (e->ma != Py_None && !PyCallable_Check(e->ma))
|| (e->dc != Py_None && !PyCallable_Check(e->dc)))
{
PyErr_SetString(PyExc_TypeError,
"3rd, 4th and 5th parameters must be callable or None");
return NULL;
}
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_setCallbacks(c, e, connectionLost,
messageArrived, deliveryComplete);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS)
{
ListElement* temp = NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
ListAppend(callbacks, e, sizeof(e));
Py_XINCREF(e->cl);
Py_XINCREF(e->ma);
Py_XINCREF(e->dc);
Py_XINCREF(e->context);
}
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject *pyoptions = NULL, *temp;
MQTTClient_connectOptions options = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions woptions = MQTTClient_willOptions_initializer;
int rc;
if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
return NULL;
printf("connect MQTTClient pointer %p\n", c);
if (!pyoptions)
goto skip;
if (!PyDict_Check(pyoptions))
{
PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
{
if (PyInt_Check(temp))
options.keepAliveInterval = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError,
"keepAliveLiveInterval value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
{
if (PyInt_Check(temp))
options.cleansession = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "reliable")) != NULL)
{
if (PyInt_Check(temp))
options.reliable = (int) PyInt_AsLong(temp);
else
{
PyErr_SetString(PyExc_TypeError, "reliable value must be int");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
{
if (PyDict_Check(temp))
{
PyObject *wtemp = NULL;
if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
woptions.topicName = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will topicName value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
{
PyErr_SetString(PyExc_TypeError,
"will message value must be set");
return NULL;
}
else
{
if (PyString_Check(wtemp))
woptions.message = PyString_AsString(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will message value must be string");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
{
if (PyInt_Check(wtemp))
woptions.retained = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will retained value must be int");
return NULL;
}
}
if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
{
if (PyInt_Check(wtemp))
woptions.qos = (int) PyInt_AsLong(wtemp);
else
{
PyErr_SetString(PyExc_TypeError,
"will qos value must be int");
return NULL;
}
}
options.will = &woptions;
}
else
{
PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
{
if (PyString_Check(temp))
options.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "username value must be string");
return NULL;
}
}
if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
{
if (PyString_Check(temp))
options.username = PyString_AsString(temp);
else
{
PyErr_SetString(PyExc_TypeError, "password value must be string");
return NULL;
}
}
skip: Py_BEGIN_ALLOW_THREADS rc = MQTTClient_connect(c, &options);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
{
MQTTClient c;
int timeout = 0;
int rc;
if (!PyArg_ParseTuple(args, "k|i", &c, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_disconnect(c, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
{
MQTTClient c;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_isConnected(c);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topic;
int qos = 2;
int rc;
if (!PyArg_ParseTuple(args, "ks|i", &c, &topic, &qos))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_subscribe(c, topic, qos);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject* topicList;
PyObject* qosList;
int count;
char** topics;
int* qoss;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO", &c, &topicList, &qosList))
return NULL;
if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences");
return NULL;
}
if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
{
PyErr_SetString(PyExc_TypeError,
"3rd and 4th parameters must be sequences of the same length");
return NULL;
}
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
qoss = malloc(count * sizeof(int));
for (i = 0; i < count; ++i)
qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_subscribeMany(c, count, topics,
qoss);
Py_END_ALLOW_THREADS
for (i = 0; i < count; ++i)
PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
free(topics);
free(qoss);
if (rc == MQTTCLIENT_SUCCESS)
return Py_BuildValue("iO", rc, qosList);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topic;
int rc;
if (!PyArg_ParseTuple(args, "ks", &c, &topic))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_unsubscribe(c, topic);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
{
MQTTClient c;
PyObject* topicList;
int count;
char** topics;
int i, rc = 0;
if (!PyArg_ParseTuple(args, "kOO", &c, &topicList))
return NULL;
if (!PySequence_Check(topicList))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
return NULL;
}
count = PySequence_Length(topicList);
topics = malloc(count * sizeof(char*));
for (i = 0; i < count; ++i)
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_unsubscribeMany(c, count, topics);
Py_END_ALLOW_THREADS
free( topics);
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_publish(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topicName;
int payloadlen;
void* payload;
int qos = 0;
int retained = 0;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "kss#|ii", &c, &topicName, &payload,
&payloadlen, &qos, &retained))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_publish(c, topicName, payloadlen,
payload, qos, retained, &dt);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS && qos > 0)
return Py_BuildValue("ii", rc, dt);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_publishMessage(PyObject* self, PyObject *args)
{
MQTTClient c;
char* topicName;
PyObject *message, *temp;
MQTTClient_message msg = MQTTClient_message_initializer;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "ksO", &c, &topicName, &message))
return NULL;
if (!PyDict_Check(message))
{
PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
{
PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
return NULL;
}
if (PyString_Check(temp))
PyString_AsStringAndSize(temp, (char**) &msg.payload,
(Py_ssize_t*) &msg.payloadlen);
else
{
PyErr_SetString(PyExc_TypeError, "payload value must be string");
return NULL;
}
if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
msg.qos = (int) PyInt_AsLong(temp);
if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
msg.retained = (int) PyInt_AsLong(temp);
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_publishMessage(c, topicName, &msg,
&dt);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS && msg.qos > 0)
return Py_BuildValue("ii", rc, dt);
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
{
MQTTClient c;
unsigned long timeout = 1000L;
MQTTClient_deliveryToken dt;
int rc;
if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_waitForCompletion(c, dt, timeout);
Py_END_ALLOW_THREADS
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_getPendingDeliveryTokens(PyObject* self, PyObject *args)
{
MQTTClient c;
MQTTClient_deliveryToken* tokens;
int rc;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_getPendingDeliveryTokens(c, &tokens);
Py_END_ALLOW_THREADS
if (rc == MQTTCLIENT_SUCCESS)
{
int i = 0;
PyObject* dts = PyList_New(0);
while (tokens[i] != -1)
PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
return Py_BuildValue("iO", rc, dts);
}
else
return Py_BuildValue("i", rc);
}
static PyObject* mqttv3_yield(PyObject* self, PyObject *args)
{
if (!PyArg_ParseTuple(args, ""))
return NULL;
Py_BEGIN_ALLOW_THREADS
MQTTClient_yield();
Py_END_ALLOW_THREADS
Py_INCREF( Py_None);
return Py_None;
}
static PyObject* mqttv3_receive(PyObject* self, PyObject *args)
{
MQTTClient c;
unsigned long timeout = 1000L;
int rc;
PyObject* temp = NULL;
char* topicName;
int topicLen;
MQTTClient_message* message;
if (!PyArg_ParseTuple(args, "k|k", &c, &timeout))
return NULL;
Py_BEGIN_ALLOW_THREADS rc = MQTTClient_receive(c, &topicName, &topicLen,
&message, timeout);
Py_END_ALLOW_THREADS
if (message)
{
temp = Py_BuildValue("is#{ss#sisisisi}", rc, topicName, topicLen,
"payload", message->payload, message->payloadlen, "qos",
message->qos, "retained", message->retained, "dup",
message->dup, "msgid", message->msgid);
free(topicName);
MQTTClient_freeMessage(&message);
}
else
temp = Py_BuildValue("iz", rc, NULL);
return temp;
}
static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
{
MQTTClient c;
ListElement* temp = NULL;
if (!PyArg_ParseTuple(args, "k", &c))
return NULL;
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
{
ListDetach(callbacks, temp->content);
free(temp->content);
}
MQTTClient_destroy(&c);
Py_INCREF(Py_None);
return Py_None;
}
static PyMethodDef MqttV3Methods[] =
{
{ "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
{ "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
"Sets the callback functions for a particular client." },
{ "connect", mqttv3_connect, METH_VARARGS,
"Connects to a server using the specified options." },
{ "disconnect", mqttv3_disconnect, METH_VARARGS,
"Disconnects from a server." },
{ "isConnected", mqttv3_isConnected, METH_VARARGS,
"Determines if this client is currently connected to the server." },
{ "subscribe", mqttv3_subscribe, METH_VARARGS,
"Subscribe to the given topic." },
{ "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
"Subscribe to the given topics." },
{ "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
"Unsubscribe from the given topic." },
{ "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
"Unsubscribe from the given topics." },
{ "publish", mqttv3_publish, METH_VARARGS,
"Publish a message to the given topic." },
{ "publishMessage", mqttv3_publishMessage, METH_VARARGS,
"Publish a message to the given topic." },
{ "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
"Waits for the completion of the delivery of the message represented by a delivery token." },
{ "getPendingDeliveryTokens", mqttv3_getPendingDeliveryTokens,
METH_VARARGS,
"Returns the delivery tokens pending of completion." },
{ "yield", mqttv3_yield, METH_VARARGS,
"Single-thread keep alive but don't receive message." },
{ "receive", mqttv3_receive, METH_VARARGS,
"Single-thread receive message if available." },
{ "destroy", mqttv3_destroy, METH_VARARGS,
"Free memory allocated to a MQTT client. It is the opposite to create." },
{ NULL, NULL, 0, NULL } /* Sentinel */
};
PyMODINIT_FUNC initpaho_mqtt3c(void)
{
PyObject *m;
PyEval_InitThreads();
callbacks = ListInitialize();
m = Py_InitModule("paho_mqtt3c", MqttV3Methods);
if (m == NULL)
return;
MqttV3Error = PyErr_NewException("paho_mqtt3c.error", NULL, NULL);
Py_INCREF(MqttV3Error);
PyModule_AddObject(m, "error", MqttV3Error);
PyModule_AddIntConstant(m, "SUCCESS", MQTTCLIENT_SUCCESS);
PyModule_AddIntConstant(m, "FAILURE", MQTTCLIENT_FAILURE);
PyModule_AddIntConstant(m, "DISCONNECTED", MQTTCLIENT_DISCONNECTED);
PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTCLIENT_MAX_MESSAGES_INFLIGHT);
PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTCLIENT_BAD_UTF8_STRING);
PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTCLIENT_NULL_PARAMETER);
PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTCLIENT_TOPICNAME_TRUNCATED);
PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
MQTTCLIENT_PERSISTENCE_ERROR);
}
from distutils.core import setup, Extension
paho_mqtt3c = Extension('paho_mqtt3c',
define_macros = [('NO_HEAP_TRACKING', '1')],
sources = ['mqttclient_module.c', '../../src/LinkedList.c'],
libraries = ['paho-mqtt3c'],
library_dirs = ['../../build/output'],
include_dirs = ['../../src'])
paho_mqtt3a = Extension('paho_mqtt3a',
define_macros = [('NO_HEAP_TRACKING', '1')],
sources = ['mqttasync_module.c', '../../src/LinkedList.c'],
libraries = ['paho-mqtt3a'],
library_dirs = ['../../build/output'],
include_dirs = ['../../src'])
setup (name = 'EclipsePahoMQTTClient',
version = '1.0',
description = 'Binding to the Eclipse Paho C clients',
ext_modules = [paho_mqtt3c, paho_mqtt3a])
import paho_mqtt3c as mqttv3, time, random
print dir(mqttv3)
host = "localhost"
clientid = "myclientid"
noclients = 4
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost"
print "rc from reconnect is", mqttv3.connect(self.client)
def messageArrived(context, topicName, message):
print "clientid", context
print "topicName", topicName
print "message", message
return 1
print messageArrived
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+host+":1883", myclientid)
print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
print "client is", hex(client)
print "rc from connect is", mqttv3.connect(client, {})
clients.append(client)
for client in clients:
print "rc from subscribe is", mqttv3.subscribe(client, "$SYS/#")
for client in clients:
print "rc from publish is", mqttv3.publish(client, "a topic", "a message")
print "rc from publish is", mqttv3.publish(client, "a topic", "a message", 1)
print "rc from publish is", mqttv3.publish(client, "a topic", "a message", 2)
print "about to sleep"
time.sleep(10)
print "finished sleeping"
for client in clients:
print "rc from isConnected is", mqttv3.isConnected(client)
print "rc from disconnect is", mqttv3.disconnect(client)
mqttv3.destroy(client)
import paho_mqtt3a as mqttv3, time, random
import contextlib
print dir(mqttv3)
hostname = "localhost"
clientid = "myclientid"
topic = "test2_topic"
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost"
print "rc from reconnect is", mqttv3.connect(self.client)
def messageArrived(context, topicName, message):
print "messageArrived", message
#print "clientid", context
#print "topicName", topicName
return 1
def onSuccess(context, successData):
print "onSuccess for", context["clientid"], context["state"], successData
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
#responseOptions = {"context": context}
if context["state"] == "connecting":
context["state"] = "subscribing"
print "rc from subscribe is", mqttv3.subscribe(client, topic, 2, responseOptions)
elif context["state"] == "subscribing":
context["state"] = "publishing qos 0"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 0 message", 0, 0, responseOptions)
elif context["state"] == "publishing qos 0":
context["state"] = "publishing qos 1"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 1 message", 1, 0, responseOptions)
elif context["state"] == "publishing qos 1":
context["state"] = "publishing qos 2"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 2 message", 2, 0, responseOptions)
elif context["state"] == "publishing qos 2":
context["state"] = "finished"
print "leaving onSuccess"
def onFailure(context, failureData):
print "onFailure for", context["clientid"]
context["state"] = "finished"
noclients = 1
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+hostname+":1883", myclientid)
#print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
context = {"client" : client, "clientid" : clientid, "state" : "connecting"}
print "rc from connect is", mqttv3.connect(client, {"context": context, "onSuccess": onSuccess, "onFailure": onFailure})
clients.append(context)
while [x for x in clients if x["state"] != "finished"]:
print [x for x in clients if x["state"] != "finished"]
time.sleep(1)
for client in clients:
if mqttv3.isConnected(client["client"]):
print "rc from disconnect is", mqttv3.disconnect(client["client"], 1000)
time.sleep(1)
mqttv3.destroy(client["client"])
print "after destroy"
import paho_mqtt3a as mqttv3, time, random
import contextlib
print dir(mqttv3)
hostname = "localhost"
clientid = "myclientid"
topic = "test2_topic"
def deliveryComplete(context, msgid):
print "deliveryComplete", msgid
def connectionLost(context, cause):
print "connectionLost", cause
client = context
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
print "rc from publish while disconnected is", mqttv3.send(client, topic, "message while disconnected", 2, 0, responseOptions)
def messageArrived(context, topicName, message):
print "messageArrived", message
#print "clientid", context
#print "topicName", topicName
return 1
def connected(context, cause):
print "connected", cause
def onSuccess(context, successData):
print "onSuccess for", context["clientid"], context["state"], successData
responseOptions = {"context": context, "onSuccess": onSuccess, "onFailure" : onFailure}
if context["state"] == "connecting":
context["state"] = "subscribing"
print "rc from subscribe is", mqttv3.subscribe(client, topic, 2, responseOptions)
elif context["state"] == "subscribing":
context["state"] = "publishing qos 0"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 0 message", 0, 0, responseOptions)
elif context["state"] == "publishing qos 0":
context["state"] = "publishing qos 1"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 1 message", 1, 0, responseOptions)
elif context["state"] == "publishing qos 1":
context["state"] = "publishing qos 2"
print "rc from publish is", mqttv3.send(client, topic, "a QoS 2 message", 2, 0, responseOptions)
elif context["state"] == "publishing qos 2":
context["state"] = "finished"
print "leaving onSuccess"
def onFailure(context, failureData):
print "onFailure for", context["clientid"]
context["state"] = "finished"
noclients = 1
myclientid = None
clients = []
for i in range(noclients):
myclientid = clientid+str(i)
rc, client = mqttv3.create("tcp://"+hostname+":1883", myclientid, mqttv3.PERSISTENCE_DEFAULT, {"sendWhileDisconnected" : 1})
#print "client is", hex(client)
print "rc from create is", rc
print "rc from setcallbacks is", mqttv3.setcallbacks(client, client, connectionLost, messageArrived, deliveryComplete)
print "rc from setconnected is", mqttv3.setconnected(client, client, connected)
context = {"client" : client, "clientid" : clientid, "state" : "connecting"}
print "rc from connect is", mqttv3.connect(client, {"cleansession" : 0, "automaticReconnect": 1, "context": context, "onSuccess": onSuccess, "onFailure": onFailure})
clients.append(context)
while [x for x in clients if x["state"] != "finished"]:
print [x for x in clients if x["state"] != "finished"]
time.sleep(1)
print "waiting for 60 seconds"
time.sleep(60)
for client in clients:
if mqttv3.isConnected(client["client"]):
print "rc from disconnect is", mqttv3.disconnect(client["client"], 1000)
time.sleep(1)
mqttv3.destroy(client["client"])
print "after destroy"
/*******************************************************************************
* Copyright (c) 2012, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
/**
* @file
* Offline buffering and automatic reconnect tests for the MQ Telemetry Asynchronous MQTT C client
*
*/
/*
#if !defined(_RTSHEADER)
#include <rts.h>
#endif
*/
#include "MQTTAsync.h"
#include <string.h>
#include <stdlib.h>
#include "Thread.h"
#if !defined(_WINDOWS)
#include <sys/time.h>
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define snprintf _snprintf
#endif
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
void usage()
{
printf("Options:\n");
printf("\t--test_no <test_no> - Run test number <test_no>\n");
printf("\t--server <hostname> - Connect to <hostname> for tests\n");
printf("\t--client_key <key_file> - Use <key_file> as the client certificate for SSL authentication\n");
printf("\t--client_key_pass <password> - Use <password> to access the private key in the client certificate\n");
printf("\t--server_key <key_file> - Use <key_file> as the trusted certificate for server\n");
printf("\t--verbose - Enable verbose output \n");
printf("\t--help - This help output\n");
exit(-1);
}
struct Options
{
char connection[100]; /**< connection to system under test. */
char mutual_auth_connection[100]; /**< connection to system under test. */
char nocert_mutual_auth_connection[100];
char server_auth_connection[100];
char anon_connection[100];
char* client_key_file;
char* client_key_pass;
char* server_key_file;
char* client_private_key_file;
int verbose;
int test_no;
int size;
} options =
{
"ssl://m2m.eclipse.org:18883",
"ssl://m2m.eclipse.org:18884",
"ssl://m2m.eclipse.org:18887",
"ssl://m2m.eclipse.org:18885",
"ssl://m2m.eclipse.org:18886",
"../../../test/ssl/client.pem",
NULL,
"../../../test/ssl/test-root-ca.crt",
NULL,
0,
0,
5000000
};
typedef struct
{
MQTTAsync client;
char clientid[24];
char topic[100];
int maxmsgs;
int rcvdmsgs[3];
int sentmsgs[3];
int testFinished;
int subscribed;
} AsyncTestClient;
#define AsyncTestClient_initializer {NULL, "\0", "\0", 0, {0, 0, 0}, {0, 0, 0}, 0, 0}
void getopts(int argc, char** argv)
{
int count = 1;
while (count < argc)
{
if (strcmp(argv[count], "--help") == 0)
{
usage();
}
else if (strcmp(argv[count], "--test_no") == 0)
{
if (++count < argc)
options.test_no = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--client_key") == 0)
{
if (++count < argc)
options.client_key_file = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--client_key_pass") == 0)
{
if (++count < argc)
options.client_key_pass = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--server_key") == 0)
{
if (++count < argc)
options.server_key_file = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--verbose") == 0)
{
options.verbose = 1;
printf("\nSetting verbose on\n");
}
else if (strcmp(argv[count], "--hostname") == 0)
{
if (++count < argc)
{
sprintf(options.connection, "ssl://%s:18883", argv[count]);
printf("Setting connection to %s\n", options.connection);
sprintf(options.mutual_auth_connection, "ssl://%s:18884", argv[count]);
printf("Setting mutual_auth_connection to %s\n", options.mutual_auth_connection);
sprintf(options.nocert_mutual_auth_connection, "ssl://%s:18887", argv[count]);
printf("Setting nocert_mutual_auth_connection to %s\n",
options.nocert_mutual_auth_connection);
sprintf(options.server_auth_connection, "ssl://%s:18885", argv[count]);
printf("Setting server_auth_connection to %s\n", options.server_auth_connection);
sprintf(options.anon_connection, "ssl://%s:18886", argv[count]);
printf("Setting anon_connection to %s\n", options.anon_connection);
}
else
usage();
}
count++;
}
}
#if 0
#include <logaX.h> /* For general log messages */
#define MyLog logaLine
#else
#define LOGA_DEBUG 0
#define LOGA_INFO 1
#include <stdarg.h>
#include <time.h>
#include <sys/timeb.h>
void MyLog(int LOGA_level, char* format, ...)
{
static char msg_buf[256];
va_list args;
struct timeb ts;
struct tm *timeinfo;
if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
return;
ftime(&ts);
timeinfo = localtime(&ts.time);
strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
va_start(args, format);
vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
format, args);
va_end(args);
printf("%s\n", msg_buf);
fflush(stdout);
}
#endif
void MySleep(long milliseconds)
{
#if defined(WIN32) || defined(WIN64)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
}
#if defined(WIN32) || defined(_WINDOWS)
#define START_TIME_TYPE DWORD
static DWORD start_time = 0;
START_TIME_TYPE start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define START_TIME_TYPE struct timespec
START_TIME_TYPE start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define START_TIME_TYPE struct timeval
/* TODO - unused - remove? static struct timeval start_time; */
START_TIME_TYPE start_clock(void)
{
struct timeval start_time;
gettimeofday(&start_time, NULL);
return start_time;
}
#endif
#if defined(WIN32)
long elapsed(START_TIME_TYPE start_time)
{
return GetTickCount() - start_time;
}
#elif defined(AIX)
#define assert(a)
long elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long elapsed(START_TIME_TYPE start_time)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start_time, &res);
return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
}
#endif
#define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
#define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
#define MAXMSGS 30;
int tests = 0;
int failures = 0;
FILE* xml;
START_TIME_TYPE global_start_time;
char output[3000];
char* cur_output = output;
void write_test_result()
{
long duration = elapsed(global_start_time);
fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
if (cur_output != output)
{
fprintf(xml, "%s", output);
cur_output = output;
}
fprintf(xml, "</testcase>\n");
}
void myassert(char* filename, int lineno, char* description, int value,
char* format, ...)
{
++tests;
if (!value)
{
va_list args;
++failures;
printf("Assertion failed, file %s, line %d, description: %s", filename,
lineno, description);
va_start(args, format);
vprintf(format, args);
va_end(args);
cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
description, filename, lineno);
}
else
MyLog(LOGA_DEBUG,
"Assertion succeeded, file %s, line %d, description: %s",
filename, lineno, description);
}
/*********************************************************************
Tests: offline buffering - sending messages while disconnected
1. send some messages while disconnected, check that they are sent
2. check max-buffered
3. check auto-reconnect parms alter behaviour as expected
*********************************************************************/
/*********************************************************************
Test1: offline buffering - sending messages while disconnected
1. call connect
2. use proxy to disconnect the client
3. while the client is disconnected, send more messages
4. when the client reconnects, check that those messages are sent
*********************************************************************/
int test1Finished = 0;
int test1OnFailureCalled = 0;
void test1cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test1OnFailureCalled++;
test1Finished = 1;
}
void test1cOnFailure(void* context, MQTTAsync_failureData* response)
{
MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
test1OnFailureCalled++;
test1Finished = 1;
}
void test1cOnConnect(void* context, MQTTAsync_successData* response)
{
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
}
int test1(struct Options options)
{
char* testname = "test1";
int subsqos = 2;
MQTTAsync c, d;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
int rc = 0;
char* test_topic = "C client offline buffering test";
int count = 0;
test1Finished = 0;
failures = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 1 - messages while disconnected");
fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
global_start_time = start_clock();
rc = MQTTAsync_create(&c, options.connection, "paho-test9-test1-c", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
rc = MQTTAsync_create(&d, options.connection, "paho-test9-test1-d", MQTTCLIENT_PERSISTENCE_DEFAULT,
NULL);
assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
if (rc != MQTTASYNC_SUCCESS)
{
MQTTAsync_destroy(&c);
goto exit;
}
opts.keepAliveInterval = 20;
opts.cleansession = 1;
opts.username = "testuser";
opts.password = "testpassword";
opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
opts.context = d;
opts.onSuccess = test1dOnConnect;
opts.onFailure = test1dOnFailure;
MyLog(LOGA_DEBUG, "Connecting client d");
rc = MQTTAsync_connect(d, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait until d is ready: connected and subscribed */
count = 0;
while (!test1dReady && ++count < 10000)
MySleep(100);
assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
/* let client c go: connect, publish some messages, and send disconnect command to proxy */
opts.will = &wopts;
opts.will->message = "will message";
opts.will->qos = 1;
opts.will->retained = 0;
opts.will->topicName = "will topic";
opts.onSuccess = test1cOnConnect;
opts.onFailure = test1cOnFailure;
opts.context = c;
MyLog(LOGA_DEBUG, "Connecting client c");
rc = MQTTAsync_connect(c, &opts);
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
if (rc != MQTTASYNC_SUCCESS)
{
failures++;
goto exit;
}
/* wait for success or failure callback */
while (!test1Finished && ++count < 10000)
MySleep(100);
exit: MQTTAsync_destroy(&c);
MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
(failures == 0) ? "passed" : "failed", testname, tests, failures);
write_test_result();
return failures;
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
int main(int argc, char** argv)
{
int* numtests = &tests;
int rc = 0;
int (*tests[])() = { NULL, test1, };
xml = fopen("TEST-test9.xml", "w");
fprintf(xml, "<testsuite name=\"test9\" tests=\"%lu\">\n", ARRAY_SIZE(tests) - 1);
MQTTAsync_setTraceCallback(handleTrace);
getopts(argc, argv);
if (options.test_no == 0)
{ /* run all the tests */
for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
{
failures = 0;
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
}
}
else
{
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
rc = tests[options.test_no](options); /* run just the selected test */
}
MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
if (rc == 0)
MyLog(LOGA_INFO, "verdict pass");
else
MyLog(LOGA_INFO, "verdict fail");
fprintf(xml, "</testsuite>\n");
fclose(xml);
return rc;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment