/* * FreeRTOS Kernel V10.3.0 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * http://www.FreeRTOS.org * http://aws.amazon.com/freertos * * 1 tab == 4 spaces! */ /* * Proof of Concept for use of MQTT light weight serializer API. * Light weight serializer API lets user to serialize and * deserialize MQTT messages into user provided buffer. * This API allows use of statically allocated buffer. * * Example shown below uses this API to create MQTT messages and * and send them over connection established using FreeRTOS sockets. * The example is single threaded and uses statically allocated memory. * * !!! NOTE !!! * This is work in progress to show how light weight serializer * API can be used. This is not a complete demo, and should not * be treated as production ready code. */ /* Standard includes. */ #include #include /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" /* FreeRTOS+TCP includes. */ #include "FreeRTOS_IP.h" #include "FreeRTOS_Sockets.h" /* IoT SDK includes. */ #include "iot_mqtt.h" #include "iot_mqtt_serialize.h" #include "platform/iot_network_freertos.h" /* Demo Specific configs. */ #include "mqtt_demo_profile.h" /** * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask(). */ #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000 ) ) /** * @brief Time to wait before sending ping request to keep MQTT connection alive. */ #define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( 1000 ) ) /** * @brief The MQTT client identifier used in this example. Each client identifier * must be unique so edit as required to ensure no two clients connecting to the * same broker use the same client identifier. */ #define mqttexampleCLIENT_IDENTIFIER mqttdemoprofileCLIENT_IDENTIFIER /** * @brief Details of the MQTT broker to connect to. */ #define mqttexampleMQTT_BROKER_ENDPOINT mqttdemoprofileBROKER_ENDPOINT /** * @brief The port to use for the demo. */ #define mqttexampleMQTT_BROKER_PORT mqttdemoprofileBROKER_PORT /** * @brief The topic to subscribe and publish to in the example. * * The topic starts with the client identifier to ensure that each demo interacts * with a unique topic. */ #define mqttexampleTOPIC mqttexampleCLIENT_IDENTIFIER "/example/topic" /** * @brief The MQTT message published in this example. */ #define mqttexampleMESSAGE "Hello Light Weight MQTT World!" /** * @brief Dimensions a file scope buffer currently used to send and receive MQTT data from a * socket */ #define mqttexampleSHARED_BUFFER_SIZE 500 /*-----------------------------------------------------------*/ /** * @brief MQTT Protocol constants used by this demo. * These types are defined in internal MQTT include files. * For light-weight demo application, only a few are needed, therefore * they are redefined here so that internal files need not be included. */ /* MQTT Control Packet Types.*/ #define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */ #define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */ #define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */ #define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */ #define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */ /* MQTT Fixed Packet Sizes */ #define MQTT_PACKET_DISCONNECT_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of DISCONNECT packet. */ #define MQTT_PACKET_PINGREQ_SIZE ( ( uint8_t ) 2 ) /**< @brief Size of PINGREQ packet. */ /*-----------------------------------------------------------*/ /** * @brief The task used to demonstrate the MQTT API. * * @param[in] pvParameters Parameters as passed at the time of task creation. Not * used in this example. */ static void prvMQTTDemoTask( void * pvParameters ); /** * @brief Creates a TCP connection to the MQTT broker as specified in * mqttexampleMQTT_BROKER_ENDPOINT and mqttexampleMQTT_BROKER_PORT. * * @return On success the socket connected to the MQTT broker is returned. Otherwise * FREERTOS_INVALID_SOCKET is returned. * */ static Socket_t prvCreateTCPConnectionToBroker( void ); /** * @brief Sends an MQTT Connect packet over the already connected TCP socket. * * @param xMQTTSocketis a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if the reply is a valid connection * acknowledgeable (CONNACK) packet, otherwise an error code is returned. */ static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket ); /** * @brief Performs a graceful shutdown and close of the socket passed in as its * parameter. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. */ static void prvGracefulShutDown( Socket_t xSocket ); /** * @brief Subscribes to the topic as specified in mqttexampleTOPIC. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if the * subscription is successful, otherwise an error code is returned. */ static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket ); /** * @brief Publishes a messages mqttexampleMESSAGE on mqttexampleTOPIC topic. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if Publish is successful, * otherwise an error code is returned. */ static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket ); /** * @brief Process Incoming Publish. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return #IOT_MQTT_SUCCESS is returned if the processing is successful, * otherwise an error code is returned. */ static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket ); /** * @brief Unsubscribes from the previously subscribed topic as specified * in mqttexampleTOPIC. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if the * unsubscribe is successful, otherwise an error code is returned. */ static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket ); /** * @brief Send MQTT Ping Request to broker and receive response. * Ping request is used to keep connection to broker alive. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if the successful Ping Response is received. * otherwise an error code is returned. */ static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket ); /** * @brief Disconnect From MQTT Broker. * * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which * an MQTT connection has been established. * * @return IOT_MQTT_SUCCESS is returned if the disconnect is successful, * otherwise an error code is returned. */ static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket ); /*-----------------------------------------------------------*/ /** * @brief Function to receive next byte from network, * The declaration must match IotMqttGetNextByte_t. * * @param[in] pvContext Network Connection context. Implementation in this * file uses FreeRTOS socket. * @param[in, out] pNextBye Pointer to buffer where the byte will be stored. * * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_TIMEOUT */ IotMqttError_t getNextByte( void * pvContext, uint8_t * pNextByte ); /*-----------------------------------------------------------*/ /* @brief Static memory buffer used for sending and receiving MQTT messages */ static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ]; /*-----------------------------------------------------------*/ /* * @brief Task for Light Weight MQTT Serializer API Proof of Concept. * To run the proof of concept example, in main.c, in function vApplicationIPNetworkEventHook(), * replace vStartSimpleMQTTDemo() with vApplicationIPNetworkEventHook(). */ void vStartLightWeightMQTTDemo( void ) { TickType_t xShortDelay = ( TickType_t ) pdMS_TO_TICKS( ( TickType_t ) 500 ); /* Wait a short time to allow receipt of the ARP replies. */ vTaskDelay( xShortDelay ); /* This example uses a single application task, which in turn is used to * connect, subscribe, publish, unsubscribe and disconnect from the MQTT * broker. */ xTaskCreate( prvMQTTDemoTask, /* Function that implements the task. */ "MQTTLWDemo", /* Text name for the task - only used for debugging. */ democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */ NULL, /* Task parameter - not used in this case. */ tskIDLE_PRIORITY, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */ NULL ); /* Used to pass out a handle to the created task - not used in this case. */ } /*-----------------------------------------------------------*/ static void prvGracefulShutDown( Socket_t xSocket ) { uint8_t ucDummy[ 20 ]; const TickType_t xShortDelay = pdMS_TO_MIN_TICKS( 250 ); if( xSocket != ( Socket_t ) 0 ) { if( xSocket != FREERTOS_INVALID_SOCKET ) { /* Initiate graceful shutdown. */ FreeRTOS_shutdown( xSocket, FREERTOS_SHUT_RDWR ); /* Wait for the socket to disconnect gracefully (indicated by FreeRTOS_recv() * returning a FREERTOS_EINVAL error) before closing the socket. */ while( FreeRTOS_recv( xSocket, ucDummy, sizeof( ucDummy ), 0 ) >= 0 ) { /* Wait for shutdown to complete. If a receive block time is used then * this delay will not be necessary as FreeRTOS_recv() will place the RTOS task * into the Blocked state anyway. */ vTaskDelay( xShortDelay ); /* Note ? real applications should implement a timeout here, not just * loop forever. */ } /* The socket has shut down and is safe to close. */ FreeRTOS_closesocket( xSocket ); } } } /*-----------------------------------------------------------*/ IotMqttError_t getNextByte( void * pvContext, uint8_t * pNextByte ) { Socket_t xMQTTSocket = ( Socket_t ) pvContext; BaseType_t receivedBytes; IotMqttError_t result; /* Receive one byte from network */ receivedBytes = FreeRTOS_recv( xMQTTSocket, ( void * ) pNextByte, sizeof( uint8_t ), 0 ); if( receivedBytes == sizeof( uint8_t ) ) { result = IOT_MQTT_SUCCESS; } else { result = IOT_MQTT_TIMEOUT; } return result; } /*-----------------------------------------------------------*/ static void prvMQTTDemoTask( void * pvParameters ) { const TickType_t xNoDelay = ( TickType_t ) 0; Socket_t xMQTTSocket; IotMqttError_t xReturned; uint32_t ulPublishCount = 0; const uint32_t ulMaxPublishCount = 5UL; /* Remove compiler warnings about unused parameters. */ ( void ) pvParameters; for( ; ; ) { /* Don't expect any notifications to be pending yet. */ configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 ); /****************************** Connect. ******************************/ /* Establish a TCP connection with the MQTT broker. This example connects to * the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and * mqttexampleMQTT_BROKER_PORT at the top of this file. */ configPRINTF( ( "Create a TCP connection to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) ); xMQTTSocket = prvCreateTCPConnectionToBroker(); configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET ); configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) ); /* Sends an MQTT Connect packet over the already connected TCP socket * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is * returned if the reply is a valid connection acknowledgeable (CONNACK) packet, * otherwise an error code is returned. */ configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) ); xReturned = prvCreateMQTTConnectionWithBroker( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Established an MQTT connection.\r\n" ) ); /**************************** Subscribe. ******************************/ /* The client is now connected to the broker. Subscribe to the topic * as specified in mqttexampleTOPIC at the top of this file by sending a * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * This client will then publish to the same topic it subscribed to, so will * expect all the messages it sends to the broker to be sent back to it * from the broker. */ configPRINTF( ( "Attempt to subscribed to the MQTT topic %s\r\n", mqttexampleTOPIC ) ); xReturned = prvMQTTSubscribeToTopic( xMQTTSocket ); configPRINTF( ( "Subscribed to the topic %s\r\n", mqttexampleTOPIC ) ); /**************************** Publish. ******************************/ /* Send publish for with QOS0, Process Keep alive */ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) { configPRINTF( ( "Attempt to publish to the MQTT topic %s\r\n", mqttexampleTOPIC ) ); xReturned = prvMQTTPublishToTopic( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Publish successful to the topic %s\r\n", mqttexampleTOPIC ) ); /* Process incoming publish echo, since application subscribed to the same topic * broker will send publish message back to the application */ configPRINTF( ( "Attempt to receive publish message from broker\r\n" ) ); xReturned = prvMQTTProcessIncomingPublish( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Successfully Received Publish message from broker\r\n" ) ); /* Leave Connection Idle for some time */ configPRINTF( ( "Keeping Connection Idle\r\n" ) ); vTaskDelay( pdMS_TO_TICKS( mqttexampleKEEP_ALIVE_DELAY ) ); /* Send Ping request to broker and receive ping response */ configPRINTF( ( "Sending Ping Request to the broker\r\n" ) ); xReturned = prvMQTTKeepAlive( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Ping Response successfully received\r\n" ) ); } /************************ Unsubscribe from the topic. **************************/ configPRINTF( ( "Attempt to unsubscribe from the MQTT topic %s\r\n", mqttexampleTOPIC ) ); xReturned = prvMQTTUnsubscribeFromTopic( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Unsubscribe from the topic %s\r\n", mqttexampleTOPIC ) ); /**************************** Disconnect. ******************************/ /* Sends an MQTT Disconnect packet over the already connected TCP socket * xMQTTSocket, then waits for and interprets the reply. IOT_MQTT_SUCCESS is * returned if the reply is a valid connection acknowledgeable (CONNACK) packet, * otherwise an error code is returned. */ configPRINTF( ( "Creating an MQTT connection with %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) ); xReturned = prvMQTTDisconnect( xMQTTSocket ); configASSERT( xReturned == IOT_MQTT_SUCCESS ); configPRINTF( ( "Established an MQTT connection.\r\n" ) ); /* Disconnect from broker. */ prvGracefulShutDown( xMQTTSocket ); /* Wait for some time between two iterations to ensure that we do not * bombard the public test mosquitto broker. */ configPRINTF( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u\r\n", xPortGetFreeHeapSize() ) ); configPRINTF( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); vTaskDelay( pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ) ); } } /*-----------------------------------------------------------*/ Socket_t prvCreateTCPConnectionToBroker( void ) { Socket_t xMQTTSocket; struct freertos_sockaddr xBrokerAddress; uint32_t ulBrokerIPAddress; /* This is the socket used to connect to the MQTT broker. */ xMQTTSocket = FreeRTOS_socket( FREERTOS_AF_INET, FREERTOS_SOCK_STREAM, FREERTOS_IPPROTO_TCP ); configASSERT( xMQTTSocket != FREERTOS_INVALID_SOCKET ); /* Locate then connect to the MQTT broker. */ ulBrokerIPAddress = FreeRTOS_gethostbyname( mqttexampleMQTT_BROKER_ENDPOINT ); if( ulBrokerIPAddress != 0 ) { xBrokerAddress.sin_port = FreeRTOS_htons( mqttexampleMQTT_BROKER_PORT ); xBrokerAddress.sin_addr = ulBrokerIPAddress; if( FreeRTOS_connect( xMQTTSocket, &xBrokerAddress, sizeof( xBrokerAddress ) ) != 0 ) { /* Could not connect so delete socket and return an error. */ FreeRTOS_closesocket( xMQTTSocket ); xMQTTSocket = FREERTOS_INVALID_SOCKET; } } return xMQTTSocket; } /*-----------------------------------------------------------*/ static IotMqttError_t prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket ) { IotMqttConnectInfo_t xConnectInfo; size_t xRemainingLength = 0; size_t xPacketSize = 0; IotMqttError_t xResult; IotMqttPacketInfo_t xIncomingPacket; /* Many fields not used in this demo so start with everything at 0. */ memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); memset( ( void * ) &xIncomingPacket, 0x00, sizeof( xIncomingPacket ) ); /* Start with a clean session i.e. direct the MQTT broker to discard any * previous session data. Also, establishing a connection with clean session * will ensure that the broker does not store any data when this client * gets disconnected. */ xConnectInfo.cleanSession = true; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER ); /* Get size requirement for the connect packet */ xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); /* Make sure the packet size is less than static buffer size */ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); /* Serialize MQTT connect packet into provided buffer */ xResult = IotMqtt_SerializeConnect( &xConnectInfo, xRemainingLength, ucSharedBuffer, xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize ) { /* Wait for the connection ack. TODO check the receive timeout value. */ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) ); /* Get packet type and remaining length of the received packet * We cannot assume received data is the connection acknowledgment. * Therefore this function reads type and remaining length of the * received packet, before processing entire packet. */ xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket ); configASSERT( xResult == IOT_MQTT_SUCCESS ); configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_CONNACK ); configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE ); if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength ) { xIncomingPacket.pRemainingData = ucSharedBuffer; if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS ) { xResult = IOT_MQTT_SERVER_REFUSED; } } else { configPRINTF( ( "Receive Failed while receiving MQTT ConnAck\n" ) ); xResult = IOT_MQTT_NETWORK_ERROR; } } else { configPRINTF( ( "Send Failed while connecting to MQTT broker\n" ) ); xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTSubscribeToTopic( Socket_t xMQTTSocket ) { IotMqttError_t xResult; IotMqttSubscription_t xMQTTSubscription[ 1 ]; size_t xRemainingLength = 0; size_t xPacketSize = 0; uint16_t usPacketIdentifier; IotMqttPacketInfo_t xIncomingPacket; /* Some fields not used by this demo so start with everything at 0. */ memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to only one topic */ xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ), &xRemainingLength, &xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); /* Make sure the packet size is less than static buffer size */ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); /* Serialize subscribe into statically allocated ucSharedBuffer */ xResult = IotMqtt_SerializeSubscribe( xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ), xRemainingLength, &usPacketIdentifier, ucSharedBuffer, xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize ) { /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so * publishes to this client can occur at any time and we cannot assume received * data is the subscription acknowledgment. Therefore this function is, at this * time, doing what would otherwise be done wherever incoming packets are * interpreted (in a callback, or whatever). */ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) ); xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket ); configASSERT( xResult == IOT_MQTT_SUCCESS ); configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_SUBACK ); configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE ); /* Receive the remaining bytes. */ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength ) { xIncomingPacket.pRemainingData = ucSharedBuffer; if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS ) { xResult = IOT_MQTT_BAD_RESPONSE; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTPublishToTopic( Socket_t xMQTTSocket ) { IotMqttError_t xResult; IotMqttPublishInfo_t xMQTTPublishInfo; size_t xRemainingLength = 0; size_t xPacketSize = 0; uint16_t usPacketIdentifier; uint8_t * pusPacketIdentifierHigh; /* Some fields not used by this demo so start with everything at 0. */ memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); xMQTTPublishInfo.qos = IOT_MQTT_QOS_0; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); /* Find out length of Publish packet size. */ xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); /* Make sure the packet size is less than static buffer size */ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); xResult = IotMqtt_SerializePublish( &xMQTTPublishInfo, xRemainingLength, &usPacketIdentifier, &pusPacketIdentifierHigh, ucSharedBuffer, xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) != ( BaseType_t ) xPacketSize ) { xResult = IOT_MQTT_NETWORK_ERROR; } else { /* Send success. Since in this case, we are using IOT_MQTT_QOS_0, * there will not be any PubAck. Publish will be echoed back, which is processed * in prvMQTTProcessIncomingPublish() */ xResult = IOT_MQTT_SUCCESS; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTProcessIncomingPublish( Socket_t xMQTTSocket ) { IotMqttError_t xResult; IotMqttPacketInfo_t xIncomingPacket; memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) ); xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket ); configASSERT( xResult == IOT_MQTT_SUCCESS ); configASSERT( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH ); configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE ); /* Receive the remaining bytes. */ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength ) { xIncomingPacket.pRemainingData = ucSharedBuffer; if( IotMqtt_DeserializePublish( &xIncomingPacket ) != IOT_MQTT_SUCCESS ) { xResult = IOT_MQTT_BAD_RESPONSE; } else { /* Process incoming Publish */ configPRINTF( ( "Incoming QOS : %d\n", xIncomingPacket.pubInfo.qos ) ); configPRINTF( ( "Incoming Publish Topic Name: %.*s\n", xIncomingPacket.pubInfo.topicNameLength, xIncomingPacket.pubInfo.pTopicName ) ); configPRINTF( ( "Incoming Publish Message : %.*s\n", xIncomingPacket.pubInfo.payloadLength, xIncomingPacket.pubInfo.pPayload ) ); } } else { xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket ) { IotMqttError_t xResult; IotMqttSubscription_t xMQTTSubscription[ 1 ]; size_t xRemainingLength; size_t xPacketSize; uint16_t usPacketIdentifier; IotMqttPacketInfo_t xIncomingPacket; /* Some fields not used by this demo so start with everything at 0. */ memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Unsubscribe to the mqttexampleTOPIC topic filter. The task handle is passed * as the callback context which is used by the callback to send a task * notification to this task.*/ xMQTTSubscription[ 0 ].qos = IOT_MQTT_QOS_0; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_UNSUBSCRIBE, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ), &xRemainingLength, &xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); /* Make sure the packet size is less than static buffer size */ configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); xResult = IotMqtt_SerializeUnsubscribe( xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ), xRemainingLength, &usPacketIdentifier, ucSharedBuffer, xPacketSize ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 ) == ( BaseType_t ) xPacketSize ) { /* Wait for the subscription ack. The socket is already connected to the MQTT broker, so * publishes to this client can occur at any time and we cannot assume received * data is the subscription acknowledgment. Therefore this function is, at this * time, doing what would otherwise be done wherever incoming packets are * interpreted (in a callback, or whatever). */ memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) ); xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket ); configASSERT( xResult == IOT_MQTT_SUCCESS ); configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_UNSUBACK ); configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) ); /* Receive the remaining bytes. */ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength ) { xIncomingPacket.pRemainingData = ucSharedBuffer; if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS ) { xResult = IOT_MQTT_BAD_RESPONSE; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTKeepAlive( Socket_t xMQTTSocket ) { IotMqttError_t xResult; IotMqttPacketInfo_t xIncomingPacket; /* PingReq is fixed length packet, therefore there is no need to calculate the size, * just makes sure static buffer can accommodate ping request */ configASSERT( MQTT_PACKET_PINGREQ_SIZE <= mqttexampleSHARED_BUFFER_SIZE ); xResult = IotMqtt_SerializePingreq( ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_PINGREQ_SIZE ) { memset( ( void * ) &xIncomingPacket, 0x00, sizeof( IotMqttPacketInfo_t ) ); xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket ); configASSERT( xResult == IOT_MQTT_SUCCESS ); configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_PINGRESP ); configASSERT( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) ); /* Receive the remaining bytes. */ if( FreeRTOS_recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == ( BaseType_t ) xIncomingPacket.remainingLength ) { xIncomingPacket.pRemainingData = ucSharedBuffer; if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS ) { xResult = IOT_MQTT_BAD_RESPONSE; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } } else { xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; } /*-----------------------------------------------------------*/ static IotMqttError_t prvMQTTDisconnect( Socket_t xMQTTSocket ) { IotMqttError_t xResult; /* Disconnect is fixed length packet, therefore there is no need to calculate the size, * just makes sure static buffer can accommodate disconnect request */ configASSERT( MQTT_PACKET_DISCONNECT_SIZE <= mqttexampleSHARED_BUFFER_SIZE ); xResult = IotMqtt_SerializeDisconnect( ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE ); configASSERT( xResult == IOT_MQTT_SUCCESS ); if( FreeRTOS_send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 ) == ( BaseType_t ) MQTT_PACKET_DISCONNECT_SIZE ) { xResult = IOT_MQTT_SUCCESS; } else { xResult = IOT_MQTT_NETWORK_ERROR; } return xResult; }