summaryrefslogtreecommitdiff
path: root/FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c')
-rw-r--r--FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c853
1 files changed, 853 insertions, 0 deletions
diff --git a/FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c b/FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c
new file mode 100644
index 000000000..aabb3a6f3
--- /dev/null
+++ b/FreeRTOS-Labs/Demo/FreeRTOS_IoT_Libraries/mqtt/common/DemoTasks/LightWeightMQTTExample.c
@@ -0,0 +1,853 @@
+/*
+ * FreeRTOS Kernel V10.2.1
+ * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * http://www.FreeRTOS.org
+ * http://aws.amazon.com/freertos
+ *
+ * 1 tab == 4 spaces!
+ */
+
+/*
+ * Proof of Concept for use of MQTT light weight serializer API.
+ * Light weight serializer API lets user to serialize and
+ * deserialize MQTT messages into user provided buffer.
+ * This API allows use of statically allocated buffer.
+ *
+ * Example shown below uses this API to create MQTT messages and
+ * and send them over connection established using FreeRTOS sockets.
+ * The example is single threaded and uses statically allocated memory.
+ *
+ * !!! NOTE !!!
+ * This is work in progress to show how light weight serializer
+ * API can be used. This is not a complete demo, and should not
+ * be treated as production ready code.
+ */
+
+/* Standard includes. */
+#include <string.h>
+#include <stdio.h>
+
+/* Kernel includes. */
+#include "FreeRTOS.h"
+#include "task.h"
+
+/* FreeRTOS+TCP includes. */
+#include "FreeRTOS_IP.h"
+#include "FreeRTOS_Sockets.h"
+
+/* IoT SDK includes. */
+#include "iot_mqtt.h"
+#include "iot_mqtt_serialize.h"
+#include "platform/iot_network_freertos.h"
+
+/* Demo Specific configs. */
+#include "mqtt_demo_profile.h"
+
+
+/**
+ * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask().
+ */
+#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000 ) )
+
+/**
+ * @brief Time to wait before sending ping request to keep MQTT connection alive.
+ */
+#define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( 1000 ) )
+
+
+/**
+ * @brief The MQTT client identifier used in this example. Each client identifier
+ * must be unique so edit as required to ensure no two clients connecting to the
+ * same broker use the same client identifier.
+ */
+#define mqttexampleCLIENT_IDENTIFIER mqttdemoprofileCLIENT_IDENTIFIER
+
+/**
+ * @brief Details of the MQTT broker to connect to.
+ */
+#define mqttexampleMQTT_BROKER_ENDPOINT mqttdemoprofileBROKER_ENDPOINT
+
+/**
+ * @brief The port to use for the demo.
+ */
+#define mqttexampleMQTT_BROKER_PORT mqttdemoprofileBROKER_PORT
+
+/**
+ * @brief The topic to subscribe and publish to in the example.
+ *
+ * The topic starts with the client identifier to ensure that each demo interacts
+ * with a unique topic.
+ */
+#define mqttexampleTOPIC mqttexampleCLIENT_IDENTIFIER "/example/topic"
+
+/**
+ * @brief The MQTT message published in this example.
+ */
+#define mqttexampleMESSAGE "Hello Light Weight MQTT World!"
+
+/**
+ * @brief Dimensions a file scope buffer currently used to send and receive MQTT data from a
+ * socket
+ */
+#define mqttexampleSHARED_BUFFER_SIZE 500
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief MQTT Protocol constants used by this demo.
+ * These types are defined in internal MQTT include files.
+ * For light-weight demo application, only a few are needed, therefore
+ * they are redefined here so that internal files need not be included.
+ */
+
+/* MQTT Control Packet Types.*/
+#define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */
+#define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */
+/* MQTT Fixed Packet Sizes */
+#define MQTT_PACKET_DISCONNECT_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of DISCONNECT packet. */
+#define MQTT_PACKET_PINGREQ_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of PINGREQ packet. */
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief The task used to demonstrate the MQTT API.
+ *
+ * @param[in] pvParameters Parameters as passed at the time of task creation. Not
+ * used in this example.
+ */
+static void prvMQTTDemoTask( void * pvParameters );
+
+/**
+ * @brief Creates a TCP connection to the MQTT broker as specified in
+ * mqttexampleMQTT_BROKER_ENDPOINT and mqttexampleMQTT_BROKER_PORT.
+ *
+ * @return On success the socket connected to the MQTT broker is returned. Otherwise
+ * FREERTOS_INVALID_SOCKET is returned.
+ *
+ */
+static Socket_t prvCreateTCPConnectionToBroker( void );
+
+/**
+ * @brief Sends an MQTT Connect packet over the already connected TCP socket.
+ *
+ * @param xMQTTSocketis a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if the reply is a valid connection
+ * acknowledgeable (CONNACK) packet, otherwise an error code is returned.
+ */
+static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket );
+
+/**
+ * @brief Performs a graceful shutdown and close of the socket passed in as its
+ * parameter.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ */
+static void prvGracefulShutDown( Socket_t xSocket );
+
+/**
+ * @brief Subscribes to the topic as specified in mqttexampleTOPIC.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if the
+ * subscription is successful, otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket );
+
+/**
+ * @brief Publishes a messages mqttexampleMESSAGE on mqttexampleTOPIC topic.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if Publish is successful,
+ * otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket );
+
+/**
+ * @brief Process Incoming Publish.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return #IOT_MQTT_SUCCESS is returned if the processing is successful,
+ * otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket );
+
+/**
+ * @brief Unsubscribes from the previously subscribed topic as specified
+ * in mqttexampleTOPIC.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if the
+ * unsubscribe is successful, otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket );
+
+/**
+ * @brief Send MQTT Ping Request to broker and receive response.
+ * Ping request is used to keep connection to broker alive.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if the successful Ping Response is received.
+ * otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket );
+
+/**
+ * @brief Disconnect From MQTT Broker.
+ *
+ * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which
+ * an MQTT connection has been established.
+ *
+ * @return IOT_MQTT_SUCCESS is returned if the disconnect is successful,
+ * otherwise an error code is returned.
+ */
+static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket );
+
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Function to receive next byte from network,
+ * The declaration must match IotMqttGetNextByte_t.
+ *
+ * @param[in] pvContext Network Connection context. Implementation in this
+ * file uses FreeRTOS socket.
+ * @param[in, out] pNextBye Pointer to buffer where the byte will be stored.
+ *
+ * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_TIMEOUT
+ */
+
+IotMqttError_t getNextByte( void * pvContext,
+ uint8_t * pNextByte );
+
+/*-----------------------------------------------------------*/
+
+/* @brief Static memory buffer used for sending and receiving MQTT messages */
+static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ];
+
+/*-----------------------------------------------------------*/
+
+/*
+ * @brief Task for Light Weight MQTT Serializer API Proof of Concept.
+ * To run the proof of concept example, in main.c, in function vApplicationIPNetworkEventHook(),
+ * replace vStartSimpleMQTTDemo() with vApplicationIPNetworkEventHook().
+ */
+void vStartLightWeightMQTTDemo( void )
+{
+TickType_t xShortDelay = ( TickType_t ) pdMS_TO_TICKS( ( TickType_t ) 500 );
+
+ /* Wait a short time to allow receipt of the ARP replies. */
+ vTaskDelay( xShortDelay );
+
+ /* This example uses a single application task, which in turn is used to
+ * connect, subscribe, publish, unsubscribe and disconnect from the MQTT
+ * broker. */
+ xTaskCreate( prvMQTTDemoTask, /* Function that implements the task. */
+ "MQTTLWDemo", /* Text name for the task - only used for debugging. */
+ democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */
+ NULL, /* Task parameter - not used in this case. */
+ tskIDLE_PRIORITY, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */
+ NULL ); /* Used to pass out a handle to the created task - not used in this case. */
+}
+/*-----------------------------------------------------------*/
+
+static void prvGracefulShutDown( Socket_t xSocket )
+{
+uint8_t ucDummy[ 20 ];
+const TickType_t xShortDelay = pdMS_TO_MIN_TICKS( 250 );
+
+ if( xSocket != ( Socket_t ) 0 )
+ {
+ if( xSocket != FREERTOS_INVALID_SOCKET )
+ {
+ /* Initiate graceful shutdown. */
+ FreeRTOS_shutdown( xSocket, FREERTOS_SHUT_RDWR );
+
+ /* Wait for the socket to disconnect gracefully (indicated by FreeRTOS_recv()
+ * returning a FREERTOS_EINVAL error) before closing the socket. */
+ while( FreeRTOS_recv( xSocket, ucDummy, sizeof( ucDummy ), 0 ) >= 0 )
+ {
+ /* Wait for shutdown to complete. If a receive block time is used then
+ * this delay will not be necessary as FreeRTOS_recv() will place the RTOS task
+ * into the Blocked state anyway. */
+ vTaskDelay( xShortDelay );
+
+ /* Note ? real applications should implement a timeout here, not just
+ * loop forever. */
+ }
+
+ /* The socket has shut down and is safe to close. */
+ FreeRTOS_closesocket( xSocket );
+ }
+ }
+}
+/*-----------------------------------------------------------*/
+
+IotMqttError_t getNextByte( void * pvContext,
+ uint8_t * pNextByte )
+{
+Socket_t xMQTTSocket = ( Socket_t ) pvContext;
+BaseType_t receivedBytes;
+IotMqttError_t result;
+
+ /* Receive one byte from network */
+ receivedBytes = FreeRTOS_recv( xMQTTSocket, ( void * ) pNextByte, sizeof( uint8_t ), 0 );
+
+ if( receivedBytes == sizeof( uint8_t ) )
+ {
+ result = IOT_MQTT_SUCCESS;
+ }
+ else
+ {
+ result = IOT_MQTT_TIMEOUT;
+ }
+
+ return result;
+}
+
+/*-----------------------------------------------------------*/
+
+static void prvMQTTDemoTask( void * pvParameters )
+{
+const TickType_t xNoDelay = ( TickType_t ) 0;
+Socket_t xMQTTSocket;
+IotMqttError_t xReturned;
+uint32_t ulPublishCount = 0;
+const uint32_t ulMaxPublishCount = 5UL;
+
+ /* Remove compiler warnings about unused parameters. */
+ ( void ) pvParameters;
+
+ for( ; ; )
+ {
+ /* Don't expect any notifications to be pending yet. */
+ configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );
+
+ /****************************** Connect. ******************************/
+
+ /* Establish a TCP connection with the MQTT broker. This example connects to
+ * the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and
+ * mqttexampleMQTT_BROKER_PORT at the top of this file. */
+ configPRINTF( ( "Create a TCP connection to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
+ xMQTTSocket = prvCreateTCPConnectionToBroker();
+ configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );
+ configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
+
+ /* Sends an MQTT Connect packet over the already connected TCP socket
+ * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is
+ * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,
+ * otherwise an error code is returned. */
+ configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
+ xReturned = prvCreateMQTTConnectionWithBroker( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Established an MQTT connection.\r\n" ) );
+
+ /**************************** Subscribe. ******************************/
+
+ /* The client is now connected to the broker. Subscribe to the topic
+ * as specified in mqttexampleTOPIC at the top of this file by sending a
+ * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
+ * This client will then publish to the same topic it subscribed to, so will
+ * expect all the messages it sends to the broker to be sent back to it
+ * from the broker. */
+ configPRINTF( ( "Attempt to subscribed to the MQTT topic %s\r\n", mqttexampleTOPIC ) );
+ xReturned = prvMQTTSubscribeToTopic( xMQTTSocket );
+ configPRINTF( ( "Subscribed to the topic %s\r\n", mqttexampleTOPIC ) );
+
+ /**************************** Publish. ******************************/
+ /* Send publish for with QOS0, Process Keep alive */
+ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
+ {
+ configPRINTF( ( "Attempt to publish to the MQTT topic %s\r\n", mqttexampleTOPIC ) );
+ xReturned = prvMQTTPublishToTopic( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Publish successful to the topic %s\r\n", mqttexampleTOPIC ) );
+
+ /* Process incoming publish echo, since application subscribed to the same topic
+ * broker will send publish message back to the application */
+ configPRINTF( ( "Attempt to receive publish message from broker\r\n" ) );
+ xReturned = prvMQTTProcessIncomingPublish( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Successfully Received Publish message from broker\r\n" ) );
+
+ /* Leave Connection Idle for some time */
+ configPRINTF( ( "Keeping Connection Idle\r\n" ) );
+ vTaskDelay( pdMS_TO_TICKS( mqttexampleKEEP_ALIVE_DELAY ) );
+ /* Send Ping request to broker and receive ping response */
+ configPRINTF( ( "Sending Ping Request to the broker\r\n" ) );
+ xReturned = prvMQTTKeepAlive( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Ping Response successfully received\r\n" ) );
+ }
+
+ /************************ Unsubscribe from the topic. **************************/
+ configPRINTF( ( "Attempt to unsubscribe from the MQTT topic %s\r\n", mqttexampleTOPIC ) );
+ xReturned = prvMQTTUnsubscribeFromTopic( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Unsubscribe from the topic %s\r\n", mqttexampleTOPIC ) );
+
+ /**************************** Disconnect. ******************************/
+
+ /* Sends an MQTT Disconnect packet over the already connected TCP socket
+ * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is
+ * returned if the reply is a valid connection acknowledgeable (CONNACK) packet,
+ * otherwise an error code is returned. */
+
+ configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
+ xReturned = prvMQTTDisconnect( xMQTTSocket );
+ configASSERT( xReturned == IOT_MQTT_SUCCESS );
+ configPRINTF( ( "Established an MQTT connection.\r\n" ) );
+ /* Disconnect from broker. */
+ prvGracefulShutDown( xMQTTSocket );
+
+ /* Wait for some time between two iterations to ensure that we do not
+ * bombard the public test mosquitto broker. */
+ configPRINTF( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u\r\n", xPortGetFreeHeapSize() ) );
+ configPRINTF( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
+ vTaskDelay( pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ) );
+ }
+}
+/*-----------------------------------------------------------*/
+
+Socket_t prvCreateTCPConnectionToBroker( void )
+{
+Socket_t xMQTTSocket;
+ struct freertos_sockaddr xBrokerAddress;
+ uint32_t ulBrokerIPAddress;
+
+ /* This is the socket used to connect to the MQTT broker. */
+ xMQTTSocket = FreeRTOS_socket( FREERTOS_AF_INET,
+ FREERTOS_SOCK_STREAM,
+ FREERTOS_IPPROTO_TCP );
+
+ configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET );
+
+ /* Locate then connect to the MQTT broker. */
+ ulBrokerIPAddress = FreeRTOS_gethostbyname( mqttexampleMQTT_BROKER_ENDPOINT );
+
+ if( ulBrokerIPAddress != 0 )
+ {
+ xBrokerAddress.sin_port = FreeRTOS_htons( mqttexampleMQTT_BROKER_PORT );
+ xBrokerAddress.sin_addr = ulBrokerIPAddress;
+
+ if( FreeRTOS_connect( xMQTTSocket, &xBrokerAddress, sizeof( xBrokerAddress ) ) != 0 )
+ {
+ /* Could not connect so delete socket and return an error. */
+ FreeRTOS_closesocket( xMQTTSocket );
+ xMQTTSocket = FREERTOS_INVALID_SOCKET;
+ }
+ }
+
+ return xMQTTSocket;
+}
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket )
+{
+IotMqttConnectInfo_t xConnectInfo;
+size_t xRemainingLength = 0;
+size_t xPacketSize = 0;
+IotMqttError_t xResult;
+IotMqttPacketInfo_t xIncomingPacket;
+
+ /* Many fields not used in this demo so start with everything at 0. */
+ memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( xIncomingPacket ) );
+
+ /* Start with a clean session i.e. direct the MQTT broker to discard any
+ * previous session data. Also, establishing a connection with clean session
+ * will ensure that the broker does not store any data when this client
+ * gets disconnected. */
+ xConnectInfo.cleanSession = true;
+
+ /* The client identifier is used to uniquely identify this MQTT client to
+ * the MQTT broker. In a production device the identifier can be something
+ * unique, such as a device serial number. */
+ xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER;
+ xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER );
+
+ /* Get size requirement for the connect packet */
+ xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ /* Make sure the packet size is less than static buffer size */
+ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
+ /* Serialize MQTT connect packet into provided buffer */
+ xResult = IotMqtt_SerializeConnect( &xConnectInfo, xRemainingLength, ucSharedBuffer, xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
+ {
+ /* Wait for the connection ack. TODO check the receive timeout value. */
+
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
+
+ /* Get packet type and remaining length of the received packet
+ * We cannot assume received data is the connection acknowledgment.
+ * Therefore this function reads type and remaining length of the
+ * received packet, before processing entire packet.
+ */
+ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_CONNACK );
+ configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
+
+ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )
+ == ( BaseType_t ) xIncomingPacket.remainingLength )
+ {
+ xIncomingPacket.pRemainingData = ucSharedBuffer;
+
+ if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
+ {
+ xResult = IOT_MQTT_SERVER_REFUSED;
+ }
+ }
+ else
+ {
+ configPRINTF( ( "Receive Failed while receiving MQTT ConnAck\n" ) );
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+ }
+ else
+ {
+ configPRINTF( ( "Send Failed while connecting to MQTT broker\n" ) );
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+IotMqttSubscription_t xMQTTSubscription[ 1 ];
+size_t xRemainingLength = 0;
+size_t xPacketSize = 0;
+uint16_t usPacketIdentifier;
+IotMqttPacketInfo_t xIncomingPacket;
+
+ /* Some fields not used by this demo so start with everything at 0. */
+ memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
+
+ /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to only one topic */
+ xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;
+ xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
+ xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+
+ xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE,
+ xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
+ &xRemainingLength, &xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ /* Make sure the packet size is less than static buffer size */
+ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
+
+ /* Serialize subscribe into statically allocated ucSharedBuffer */
+ xResult = IotMqtt_SerializeSubscribe( xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
+ xRemainingLength,
+ &usPacketIdentifier,
+ ucSharedBuffer,
+ xPacketSize );
+
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
+ {
+ /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so
+ * publishes to this client can occur at any time and we cannot assume received
+ * data is the subscription acknowledgment. Therefore this function is, at this
+ * time, doing what would otherwise be done wherever incoming packets are
+ * interpreted (in a callback, or whatever). */
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
+ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_SUBACK );
+ configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
+
+ /* Receive the remaining bytes. */
+ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
+ {
+ xIncomingPacket.pRemainingData = ucSharedBuffer;
+
+ if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
+ {
+ xResult = IOT_MQTT_BAD_RESPONSE;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+IotMqttPublishInfo_t xMQTTPublishInfo;
+size_t xRemainingLength = 0;
+size_t xPacketSize = 0;
+uint16_t usPacketIdentifier;
+uint8_t * pusPacketIdentifierHigh;
+
+ /* Some fields not used by this demo so start with everything at 0. */
+ memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
+ xMQTTPublishInfo.qos = IOT_MQTT_QOS_0;
+ xMQTTPublishInfo.retain = false;
+ xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
+ xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+ xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
+ xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
+
+ /* Find out length of Publish packet size. */
+ xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ /* Make sure the packet size is less than static buffer size */
+ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
+
+ xResult = IotMqtt_SerializePublish( &xMQTTPublishInfo,
+ xRemainingLength,
+ &usPacketIdentifier,
+ &pusPacketIdentifierHigh,
+ ucSharedBuffer,
+ xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) != ( BaseType_t ) xPacketSize )
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+ else
+ {
+ /* Send success. Since in this case, we are using IOT_MQTT_QOS_0,
+ * there will not be any PubAck. Publish will be echoed back, which is processed
+ * in prvMQTTProcessIncomingPublish() */
+ xResult = IOT_MQTT_SUCCESS;
+ }
+
+ return xResult;
+}
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+IotMqttPacketInfo_t xIncomingPacket;
+
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
+ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ configASSERT( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH );
+ configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
+
+ /* Receive the remaining bytes. */
+ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
+ {
+ xIncomingPacket.pRemainingData = ucSharedBuffer;
+
+ if( IotMqtt_DeserializePublish( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
+ {
+ xResult = IOT_MQTT_BAD_RESPONSE;
+ }
+ else
+ {
+ /* Process incoming Publish */
+ configPRINTF( ( "Incoming QOS : %d\n", xIncomingPacket.pubInfo.qos ) );
+ configPRINTF( ( "Incoming Publish Topic Name: %.*s\n", xIncomingPacket.pubInfo.topicNameLength, xIncomingPacket.pubInfo.pTopicName ) );
+ configPRINTF( ( "Incoming Publish Message : %.*s\n", xIncomingPacket.pubInfo.payloadLength, xIncomingPacket.pubInfo.pPayload ) );
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}
+
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+IotMqttSubscription_t xMQTTSubscription[ 1 ];
+size_t xRemainingLength;
+size_t xPacketSize;
+uint16_t usPacketIdentifier;
+IotMqttPacketInfo_t xIncomingPacket;
+
+ /* Some fields not used by this demo so start with everything at 0. */
+ memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
+
+ /* Unsubscribe to the mqttexampleTOPIC topic filter. The task handle is passed
+ * as the callback context which is used by the callback to send a task
+ * notification to this task.*/
+ xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0;
+ xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
+ xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+
+ xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_UNSUBSCRIBE,
+ xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
+ &xRemainingLength,
+ &xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ /* Make sure the packet size is less than static buffer size */
+ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
+
+ xResult = IotMqtt_SerializeUnsubscribe( xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
+ xRemainingLength,
+ &usPacketIdentifier,
+ ucSharedBuffer,
+ xPacketSize );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize )
+ {
+ /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so
+ * publishes to this client can occur at any time and we cannot assume received
+ * data is the subscription acknowledgment. Therefore this function is, at this
+ * time, doing what would otherwise be done wherever incoming packets are
+ * interpreted (in a callback, or whatever). */
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
+ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_UNSUBACK );
+ configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );
+
+ /* Receive the remaining bytes. */
+ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength )
+ {
+ xIncomingPacket.pRemainingData = ucSharedBuffer;
+
+ if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
+ {
+ xResult = IOT_MQTT_BAD_RESPONSE;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+IotMqttPacketInfo_t xIncomingPacket;
+
+ /* PingReq is fixed length packet, therefore there is no need to calculate the size,
+ * just makes sure static buffer can accommodate ping request */
+
+ configASSERT( MQTT_PACKET_PINGREQ_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
+
+ xResult = IotMqtt_SerializePingreq( ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_PINGREQ_SIZE )
+ {
+ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) );
+ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+ configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_PINGRESP );
+ configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );
+
+ /* Receive the remaining bytes. */
+ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 )
+ == ( BaseType_t ) xIncomingPacket.remainingLength )
+ {
+ xIncomingPacket.pRemainingData = ucSharedBuffer;
+
+ if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
+ {
+ xResult = IOT_MQTT_BAD_RESPONSE;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}
+
+/*-----------------------------------------------------------*/
+
+static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket )
+{
+IotMqttError_t xResult;
+
+ /* Disconnect is fixed length packet, therefore there is no need to calculate the size,
+ * just makes sure static buffer can accommodate disconnect request */
+
+ configASSERT( MQTT_PACKET_DISCONNECT_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
+
+ xResult = IotMqtt_SerializeDisconnect( ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE );
+ configASSERT( xResult == IOT_MQTT_SUCCESS );
+
+ if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_DISCONNECT_SIZE )
+ {
+ xResult = IOT_MQTT_SUCCESS;
+ }
+ else
+ {
+ xResult = IOT_MQTT_NETWORK_ERROR;
+ }
+
+ return xResult;
+}