summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c')
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c1074
1 files changed, 1074 insertions, 0 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c
new file mode 100644
index 000000000..849eb874b
--- /dev/null
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/BG96_MQTT_Mutual_Auth_Demo/DemoTasks/MutualAuthMQTTExample.c
@@ -0,0 +1,1074 @@
+/*
+ * FreeRTOS V202107.00
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * https://www.FreeRTOS.org
+ * https://github.com/FreeRTOS
+ *
+ */
+
+/*
+ * Demo for showing use of the MQTT API using a mutually authenticated
+ * network connection.
+ *
+ * The Example shown below uses MQTT APIs to create MQTT messages and send them
+ * over the mutually authenticated network connection established with the
+ * MQTT broker. This example is single threaded and uses statically allocated
+ * memory. It uses QoS1 for sending to and receiving messages from the broker.
+ *
+ * A mutually authenticated TLS connection is used to connect to the
+ * MQTT message broker in this example. Define democonfigMQTT_BROKER_ENDPOINT,
+ * democonfigROOT_CA_PEM, democonfigCLIENT_CERTIFICATE_PEM,
+ * and democonfigCLIENT_PRIVATE_KEY_PEM in demo_config.h to establish a
+ * mutually authenticated connection.
+ *
+ * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an
+ * alternative run time model whereby coreMQTT runs in an autonomous
+ * background agent task. Executing the MQTT protocol in an agent task
+ * removes the need for the application writer to explicitly manage any MQTT
+ * state or call the MQTT_ProcessLoop() API function. Using an agent task
+ * also enables multiple application tasks to more easily share a single
+ * MQTT connection.
+ */
+
+/* Standard includes. */
+#include <string.h>
+#include <stdio.h>
+
+/* Kernel includes. */
+#include "FreeRTOS.h"
+#include "task.h"
+
+/* Demo Specific configs. */
+#include "demo_config.h"
+
+/* MQTT library includes. */
+#include "core_mqtt.h"
+
+/* Exponential backoff retry include. */
+#include "backoff_algorithm.h"
+
+/* Transport interface implementation include header for TLS. */
+#include "using_mbedtls.h"
+
+/*-----------------------------------------------------------*/
+
+/* Compile time error for undefined configs. */
+#ifndef democonfigMQTT_BROKER_ENDPOINT
+ #error "Define the config democonfigMQTT_BROKER_ENDPOINT by following the instructions in file demo_config.h."
+#endif
+#ifndef democonfigROOT_CA_PEM
+ #error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h."
+#endif
+
+/* If no username is defined, then a client certificate/key is required. */
+#ifndef democonfigCLIENT_USERNAME
+
+/*
+ *!!! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for
+ *!!! convenience of demonstration only. Production devices should
+ *!!! store keys securely, such as within a secure element.
+ */
+
+ #ifndef democonfigCLIENT_CERTIFICATE_PEM
+ #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h."
+ #endif
+ #ifndef democonfigCLIENT_PRIVATE_KEY_PEM
+ #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h."
+ #endif
+#else
+
+/* If a username is defined, a client password also would need to be defined for
+ * client authentication. */
+ #ifndef democonfigCLIENT_PASSWORD
+ #error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username/password."
+ #endif
+
+/* AWS IoT MQTT broker port needs to be 443 for client authentication based on
+ * username/password. */
+ #if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443
+ #error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username/password in AWS IoT Core."
+ #endif
+#endif /* ifndef democonfigCLIENT_USERNAME */
+
+/*-----------------------------------------------------------*/
+
+/* Default values for configs. */
+#ifndef democonfigCLIENT_IDENTIFIER
+
+/**
+ * @brief The MQTT client identifier used in this example. Each client identifier
+ * must be unique so edit as required to ensure no two clients connecting to the
+ * same broker use the same client identifier.
+ *
+ * @note Appending __TIME__ to the client id string will help to create a unique
+ * client id every time an application binary is built. Only a single instance of
+ * this application's compiled binary may be used at a time, since the client ID
+ * will always be the same.
+ */
+ #define democonfigCLIENT_IDENTIFIER "testClient"__TIME__
+#endif
+
+#ifndef democonfigMQTT_BROKER_PORT
+
+/**
+ * @brief The port to use for the demo.
+ */
+ #define democonfigMQTT_BROKER_PORT ( 8883 )
+#endif
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief The maximum number of retries for network operation with server.
+ */
+#define mqttexampleRETRY_MAX_ATTEMPTS ( 5U )
+
+/**
+ * @brief The maximum back-off delay (in milliseconds) for retrying failed operation
+ * with server.
+ */
+#define mqttexampleRETRY_MAX_BACKOFF_DELAY_MS ( 5000U )
+
+/**
+ * @brief The base back-off delay (in milliseconds) to use for network operation retry
+ * attempts.
+ */
+#define mqttexampleRETRY_BACKOFF_BASE_MS ( 500U )
+
+/**
+ * @brief Timeout for receiving CONNACK packet in milliseconds.
+ */
+#define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 5000U )
+
+/**
+ * @brief The topic to subscribe and publish to in the example.
+ *
+ * The topic name starts with the client identifier to ensure that each demo
+ * interacts with a unique topic name.
+ */
+#define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic"
+
+/**
+ * @brief The number of topic filters to subscribe.
+ */
+#define mqttexampleTOPIC_COUNT ( 1 )
+
+/**
+ * @brief The MQTT message published in this example.
+ */
+#define mqttexampleMESSAGE "Hello World!"
+
+/**
+ * @brief Time in ticks to wait between each cycle of the demo implemented
+ * by prvMQTTDemoTask().
+ */
+#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
+
+/**
+ * @brief Timeout for MQTT_ProcessLoop in milliseconds.
+ */
+#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 5000U )
+
+/**
+ * @brief Keep alive time reported to the broker while establishing
+ * an MQTT connection.
+ *
+ * It is the responsibility of the Client to ensure that the interval between
+ * Control Packets being sent does not exceed the this Keep Alive value. In the
+ * absence of sending any other Control Packets, the Client MUST send a
+ * PINGREQ Packet.
+ */
+#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U )
+
+/**
+ * @brief Delay (in ticks) between consecutive cycles of MQTT publish operations in a
+ * demo iteration.
+ *
+ * Note that the process loop also has a timeout, so the total time between
+ * publishes is the sum of the two delays.
+ */
+#define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) )
+
+/**
+ * @brief Transport timeout in milliseconds for transport send and receive.
+ */
+#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 10000U )
+
+/**
+ * @brief ALPN (Application-Layer Protocol Negotiation) protocol name for AWS IoT MQTT.
+ *
+ * This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker.
+ * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint
+ * in the link below.
+ * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/
+ */
+#define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca"
+
+/**
+ * @brief This is the ALPN (Application-Layer Protocol Negotiation) string
+ * required by AWS IoT for password-based authentication using TCP port 443.
+ */
+#define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt"
+
+/**
+ * Provide default values for undefined configuration settings.
+ */
+#ifndef democonfigOS_NAME
+ #define democonfigOS_NAME "FreeRTOS"
+#endif
+
+#ifndef democonfigOS_VERSION
+ #define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER
+#endif
+
+#ifndef democonfigHARDWARE_PLATFORM_NAME
+ #define democonfigHARDWARE_PLATFORM_NAME "WinSim"
+#endif
+
+#ifndef democonfigMQTT_LIB
+ #define democonfigMQTT_LIB "core-mqtt@"MQTT_LIBRARY_VERSION
+#endif
+
+/**
+ * @brief The MQTT metrics string expected by AWS IoT.
+ */
+#define AWS_IOT_METRICS_STRING \
+ "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \
+ "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB
+
+/**
+ * @brief The length of the MQTT metrics string expected by AWS IoT.
+ */
+#define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) )
+
+#ifdef democonfigCLIENT_USERNAME
+
+/**
+ * @brief Append the username with the metrics string if #democonfigCLIENT_USERNAME is defined.
+ *
+ * This is to support both metrics reporting and username/password based client
+ * authentication by AWS IoT.
+ */
+ #define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING
+#endif
+
+/**
+ * @brief Milliseconds per second.
+ */
+#define MILLISECONDS_PER_SECOND ( 1000U )
+
+/**
+ * @brief Milliseconds per FreeRTOS tick.
+ */
+#define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ )
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Each compilation unit that consumes the NetworkContext must define it.
+ * It should contain a single pointer to the type of your desired transport.
+ * When using multiple transports in the same compilation unit, define this pointer as void *.
+ *
+ * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport.
+ */
+struct NetworkContext
+{
+ TlsTransportParams_t * pParams;
+};
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief The task used to demonstrate the MQTT API.
+ *
+ * @param[in] pvParameters Parameters as passed at the time of task creation. Not
+ * used in this example.
+ */
+static void prvMQTTDemoTask( void * pvParameters );
+
+
+/**
+ * @brief Connect to MQTT broker with reconnection retries.
+ *
+ * If connection fails, retry is attempted after a timeout.
+ * Timeout value will exponentially increase until maximum
+ * timeout value is reached or the number of attempts are exhausted.
+ *
+ * @param[out] pxNetworkContext The parameter to return the created network context.
+ *
+ * @return The status of the final connection attempt.
+ */
+static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials,
+ NetworkContext_t * pNetworkContext );
+
+/**
+ * @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection.
+ *
+ * @param[in, out] pxMQTTContext MQTT context pointer.
+ * @param[in] xNetworkContext Network context.
+ */
+static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
+ NetworkContext_t * pxNetworkContext );
+
+/**
+ * @brief Function to update variable #xTopicFilterContext with status
+ * information from Subscribe ACK. Called by the event callback after processing
+ * an incoming SUBACK packet.
+ *
+ * @param[in] Server response to the subscription request.
+ */
+static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo );
+
+/**
+ * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of
+ * this file. In the case of a Subscribe ACK failure, then subscription is
+ * retried using an exponential backoff strategy with jitter.
+ *
+ * @param[in] pxMQTTContext MQTT context pointer.
+ */
+static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext );
+
+/**
+ * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic.
+ *
+ * @param[in] pxMQTTContext MQTT context pointer.
+ */
+static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext );
+
+/**
+ * @brief Unsubscribes from the previously subscribed topic as specified
+ * in mqttexampleTOPIC.
+ *
+ * @param[in] pxMQTTContext MQTT context pointer.
+ */
+static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext );
+
+/**
+ * @brief The timer query function provided to the MQTT context.
+ *
+ * @return Time in milliseconds.
+ */
+static uint32_t prvGetTimeMs( void );
+
+/**
+ * @brief Process a response or ack to an MQTT request (PING, PUBLISH,
+ * SUBSCRIBE or UNSUBSCRIBE). This function processes PINGRESP, PUBACK,
+ * SUBACK, and UNSUBACK.
+ *
+ * @param[in] pxIncomingPacket is a pointer to structure containing deserialized
+ * MQTT response.
+ * @param[in] usPacketId is the packet identifier from the ack received.
+ */
+static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
+ uint16_t usPacketId );
+
+/**
+ * @brief Process incoming Publish message.
+ *
+ * @param[in] pxPublishInfo is a pointer to structure containing deserialized
+ * Publish message.
+ */
+static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo );
+
+/**
+ * @brief The application callback function for getting the incoming publishes,
+ * incoming acks, and ping responses reported from the MQTT library.
+ *
+ * @param[in] pxMQTTContext MQTT context pointer.
+ * @param[in] pxPacketInfo Packet Info pointer for the incoming packet.
+ * @param[in] pxDeserializedInfo Deserialized information from the incoming packet.
+ */
+static void prvEventCallback( MQTTContext_t * pxMQTTContext,
+ MQTTPacketInfo_t * pxPacketInfo,
+ MQTTDeserializedInfo_t * pxDeserializedInfo );
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Static buffer used to hold MQTT messages being sent and received.
+ */
+static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ];
+
+/**
+ * @brief Global entry time into the application to use as a reference timestamp
+ * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference
+ * between the current time and the global entry time. This will reduce the chances
+ * of overflow for the 32 bit unsigned integer used for holding the timestamp.
+ */
+static uint32_t ulGlobalEntryTimeMs;
+
+/**
+ * @brief Packet Identifier generated when Publish request was sent to the broker;
+ * it is used to match received Publish ACK to the transmitted Publish packet.
+ */
+static uint16_t usPublishPacketIdentifier;
+
+/**
+ * @brief Packet Identifier generated when Subscribe request was sent to the broker;
+ * it is used to match received Subscribe ACK to the transmitted Subscribe packet.
+ */
+static uint16_t usSubscribePacketIdentifier;
+
+/**
+ * @brief Packet Identifier generated when Unsubscribe request was sent to the broker;
+ * it is used to match received Unsubscribe response to the transmitted Unsubscribe
+ * request.
+ */
+static uint16_t usUnsubscribePacketIdentifier;
+
+/**
+ * @brief A pair containing a topic filter and its SUBACK status.
+ */
+typedef struct topicFilterContext
+{
+ const char * pcTopicFilter;
+ MQTTSubAckStatus_t xSubAckStatus;
+} topicFilterContext_t;
+
+/**
+ * @brief An array containing the context of a SUBACK; the SUBACK status
+ * of a filter is updated when the event callback processes a SUBACK.
+ */
+static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] =
+{
+ { mqttexampleTOPIC, MQTTSubAckFailure }
+};
+
+
+/** @brief Static buffer used to hold MQTT messages being sent and received. */
+static MQTTFixedBuffer_t xBuffer =
+{
+ ucSharedBuffer,
+ democonfigNETWORK_BUFFER_SIZE
+};
+
+/*-----------------------------------------------------------*/
+
+/*
+ * @brief Create the task that demonstrates the MQTT API Demo over a
+ * mutually authenticated network connection with MQTT broker.
+ */
+void vStartSimpleMQTTDemo( void )
+{
+ /* This example uses a single application task, which in turn is used to
+ * connect, subscribe, publish, unsubscribe and disconnect from the MQTT
+ * broker.
+ *
+ * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an
+ * alternative run time model whereby coreMQTT runs in an autonomous
+ * background agent task. Executing the MQTT protocol in an agent task
+ * removes the need for the application writer to explicitly manage any MQTT
+ * state or call the MQTT_ProcessLoop() API function. Using an agent task
+ * also enables multiple application tasks to more easily share a single
+ * MQTT connection. */
+ prvMQTTDemoTask( NULL );
+}
+/*-----------------------------------------------------------*/
+
+/*
+ * @brief The Example shown below uses MQTT APIs to create MQTT messages and
+ * send them over the mutually authenticated network connection established with the
+ * MQTT broker. This example is single threaded and uses statically allocated
+ * memory. It uses QoS1 for sending to and receiving messages from the broker.
+ *
+ * This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the
+ * top of this file by sending a subscribe packet and then waiting for a subscribe
+ * acknowledgment (SUBACK).This client will then publish to the same topic it
+ * subscribed to, so it will expect all the messages it sends to the broker to be
+ * sent back to it from the broker.
+ */
+static void prvMQTTDemoTask( void * pvParameters )
+{
+ uint32_t ulPublishCount = 0U, ulTopicCount = 0U;
+ const uint32_t ulMaxPublishCount = 5UL;
+ NetworkContext_t xNetworkContext = { 0 };
+ TlsTransportParams_t xTlsTransportParams = { 0 };
+ NetworkCredentials_t xNetworkCredentials = { 0 };
+ MQTTContext_t xMQTTContext = { 0 };
+ MQTTStatus_t xMQTTStatus;
+ TlsTransportStatus_t xNetworkStatus;
+
+ /* Remove compiler warnings about unused parameters. */
+ ( void ) pvParameters;
+
+ /* Set the entry time of the demo application. This entry time will be used
+ * to calculate relative time elapsed in the execution of the demo application,
+ * by the timer utility function that is provided to the MQTT library.
+ */
+ ulGlobalEntryTimeMs = prvGetTimeMs();
+
+ /* Set the pParams member of the network context with desired transport. */
+ xNetworkContext.pParams = &xTlsTransportParams;
+
+ for( ; ; )
+ {
+ /****************************** Connect. ******************************/
+
+ /* Attempt to establish TLS session with MQTT broker. If connection fails,
+ * retry after a timeout. Timeout value will be exponentially increased
+ * until the maximum number of attempts are reached or the maximum timeout
+ * value is reached. The function returns a failure status if the TCP
+ * connection cannot be established to the broker after the configured
+ * number of attempts. */
+ xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials,
+ &xNetworkContext );
+ configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS );
+
+ /* Sends an MQTT Connect packet over the already established TLS connection,
+ * and waits for connection acknowledgment (CONNACK) packet. */
+ LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) );
+ prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
+
+ /**************************** Subscribe. ******************************/
+
+ /* If server rejected the subscription request, attempt to resubscribe to
+ * topic. Attempts are made according to the exponential backoff retry
+ * strategy implemented in BackoffAlgorithm. */
+ prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
+
+ /****************** Publish and Keep Alive Loop. **********************/
+ /* Publish messages with QoS1, send and process Keep alive messages. */
+ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
+ {
+ LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
+ prvMQTTPublishToTopic( &xMQTTContext );
+
+ /* Process incoming publish echo, since application subscribed to the
+ * same topic, the broker will send publish message back to the
+ * application. */
+ LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) );
+ xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
+ configASSERT( xMQTTStatus == MQTTSuccess );
+
+ /* Leave Connection Idle for some time. */
+ LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) );
+ vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS );
+ }
+
+ /******************** Unsubscribe from the topic. *********************/
+ LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
+ prvMQTTUnsubscribeFromTopic( &xMQTTContext );
+
+ /* Process incoming UNSUBACK packet from the broker. */
+ xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
+ configASSERT( xMQTTStatus == MQTTSuccess );
+
+ /**************************** Disconnect. *****************************/
+
+ /* Send an MQTT Disconnect packet over the already connected TLS over
+ * TCP connection. There is no corresponding response for the disconnect
+ * packet. After sending disconnect, client must close the network
+ * connection. */
+ LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n",
+ democonfigMQTT_BROKER_ENDPOINT ) );
+ xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
+ configASSERT( xMQTTStatus == MQTTSuccess );
+
+ /* Close the network connection. */
+ TLS_FreeRTOS_Disconnect( &xNetworkContext );
+
+ /* Reset SUBACK status for each topic filter after completion of
+ * subscription request cycle. */
+ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
+ {
+ xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
+ }
+
+ /* Wait for some time between two iterations to ensure that we do not
+ * bombard the broker. */
+ LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. "
+ "Total free heap is %u.\r\n",
+ xPortGetFreeHeapSize() ) );
+ LogInfo( ( "Demo completed successfully.\r\n" ) );
+ LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) );
+ vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS );
+ }
+}
+/*-----------------------------------------------------------*/
+
+static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials,
+ NetworkContext_t * pxNetworkContext )
+{
+ TlsTransportStatus_t xNetworkStatus;
+ BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
+ BackoffAlgorithmContext_t xReconnectParams;
+ uint16_t usNextRetryBackOff = 0U;
+
+ #ifdef democonfigUSE_AWS_IOT_CORE_BROKER
+
+ /* ALPN protocols must be a NULL-terminated list of strings. Therefore,
+ * the first entry will contain the actual ALPN protocol string while the
+ * second entry must remain NULL. */
+ char * pcAlpnProtocols[] = { NULL, NULL };
+
+ /* The ALPN string changes depending on whether username/password authentication is used. */
+ #ifdef democonfigCLIENT_USERNAME
+ pcAlpnProtocols[ 0 ] = AWS_IOT_CUSTOM_AUTH_ALPN;
+ #else
+ pcAlpnProtocols[ 0 ] = AWS_IOT_MQTT_ALPN;
+ #endif
+ pxNetworkCredentials->pAlpnProtos = pcAlpnProtocols;
+ #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
+
+ pxNetworkCredentials->disableSni = democonfigDISABLE_SNI;
+ /* Set the credentials for establishing a TLS connection. */
+ pxNetworkCredentials->pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM;
+ pxNetworkCredentials->rootCaSize = sizeof( democonfigROOT_CA_PEM );
+ #ifdef democonfigCLIENT_CERTIFICATE_PEM
+ pxNetworkCredentials->pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM;
+ pxNetworkCredentials->clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM );
+ pxNetworkCredentials->pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM;
+ pxNetworkCredentials->privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM );
+ #endif
+
+ /* Initialize reconnect attempts and interval. */
+ BackoffAlgorithm_InitializeParams( &xReconnectParams,
+ mqttexampleRETRY_BACKOFF_BASE_MS,
+ mqttexampleRETRY_MAX_BACKOFF_DELAY_MS,
+ mqttexampleRETRY_MAX_ATTEMPTS );
+
+ /* Attempt to connect to MQTT broker. If connection fails, retry after
+ * a timeout. Timeout value will exponentially increase till maximum
+ * attempts are reached.
+ */
+ do
+ {
+ /* Establish a TLS session with the MQTT broker. This example connects to
+ * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and
+ * democonfigMQTT_BROKER_PORT at the top of this file. */
+ LogInfo( ( "Creating a TLS connection to %s:%u.\r\n",
+ democonfigMQTT_BROKER_ENDPOINT,
+ democonfigMQTT_BROKER_PORT ) );
+ /* Attempt to create a mutually authenticated TLS connection. */
+ xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext,
+ democonfigMQTT_BROKER_ENDPOINT,
+ democonfigMQTT_BROKER_PORT,
+ pxNetworkCredentials,
+ mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
+ mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );
+
+ if( xNetworkStatus != TLS_TRANSPORT_SUCCESS )
+ {
+ /* Generate a random number and calculate backoff value (in milliseconds) for
+ * the next connection retry.
+ * Note: It is recommended to seed the random number generator with a device-specific
+ * entropy source so that possibility of multiple devices retrying failed network operations
+ * at similar intervals can be avoided. */
+ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff );
+
+ if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted )
+ {
+ LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
+ }
+ else if( xBackoffAlgStatus == BackoffAlgorithmSuccess )
+ {
+ LogWarn( ( "Connection to the broker failed. "
+ "Retrying connection with backoff and jitter." ) );
+ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) );
+ }
+ }
+ } while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) );
+
+ return xNetworkStatus;
+}
+/*-----------------------------------------------------------*/
+
+static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
+ NetworkContext_t * pxNetworkContext )
+{
+ MQTTStatus_t xResult;
+ MQTTConnectInfo_t xConnectInfo;
+ bool xSessionPresent;
+ TransportInterface_t xTransport;
+
+ /***
+ * For readability, error handling in this function is restricted to the use of
+ * asserts().
+ ***/
+
+ /* Fill in Transport Interface send and receive function pointers. */
+ xTransport.pNetworkContext = pxNetworkContext;
+ xTransport.send = TLS_FreeRTOS_send;
+ xTransport.recv = TLS_FreeRTOS_recv;
+
+ /* Initialize MQTT library. */
+ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer );
+ configASSERT( xResult == MQTTSuccess );
+
+ /* Some fields are not used in this demo so start with everything at 0. */
+ ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
+
+ /* Start with a clean session i.e. direct the MQTT broker to discard any
+ * previous session data. Also, establishing a connection with clean session
+ * will ensure that the broker does not store any data when this client
+ * gets disconnected. */
+ xConnectInfo.cleanSession = true;
+
+ /* The client identifier is used to uniquely identify this MQTT client to
+ * the MQTT broker. In a production device the identifier can be something
+ * unique, such as a device serial number. */
+ xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
+ xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
+
+ /* Set MQTT keep-alive period. If the application does not send packets at an interval less than
+ * the keep-alive period, the MQTT library will send PINGREQ packets. */
+ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
+
+ /* Append metrics when connecting to the AWS IoT Core broker. */
+ #ifdef democonfigUSE_AWS_IOT_CORE_BROKER
+ #ifdef democonfigCLIENT_USERNAME
+ xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS;
+ xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS );
+ xConnectInfo.pPassword = democonfigCLIENT_PASSWORD;
+ xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD );
+ #else
+ xConnectInfo.pUserName = AWS_IOT_METRICS_STRING;
+ xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH;
+ /* Password for authentication is not used. */
+ xConnectInfo.pPassword = NULL;
+ xConnectInfo.passwordLength = 0U;
+ #endif
+ #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
+ #ifdef democonfigCLIENT_USERNAME
+ xConnectInfo.pUserName = democonfigCLIENT_USERNAME;
+ xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME );
+ xConnectInfo.pPassword = democonfigCLIENT_PASSWORD;
+ xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD );
+ #endif /* ifdef democonfigCLIENT_USERNAME */
+ #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
+
+ /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it
+ * is passed as NULL. */
+ xResult = MQTT_Connect( pxMQTTContext,
+ &xConnectInfo,
+ NULL,
+ mqttexampleCONNACK_RECV_TIMEOUT_MS,
+ &xSessionPresent );
+ configASSERT( xResult == MQTTSuccess );
+
+ /* Successfully established and MQTT connection with the broker. */
+ LogInfo( ( "An MQTT connection is established with %s.", democonfigMQTT_BROKER_ENDPOINT ) );
+}
+/*-----------------------------------------------------------*/
+
+static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo )
+{
+ MQTTStatus_t xResult = MQTTSuccess;
+ uint8_t * pucPayload = NULL;
+ size_t ulSize = 0;
+ uint32_t ulTopicCount = 0U;
+
+ xResult = MQTT_GetSubAckStatusCodes( pxPacketInfo, &pucPayload, &ulSize );
+
+ /* MQTT_GetSubAckStatusCodes always returns success if called with packet info
+ * from the event callback and non-NULL parameters. */
+ configASSERT( xResult == MQTTSuccess );
+
+ for( ulTopicCount = 0; ulTopicCount < ulSize; ulTopicCount++ )
+ {
+ xTopicFilterContext[ ulTopicCount ].xSubAckStatus = pucPayload[ ulTopicCount ];
+ }
+}
+/*-----------------------------------------------------------*/
+
+static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
+{
+ MQTTStatus_t xResult = MQTTSuccess;
+ BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
+ BackoffAlgorithmContext_t xRetryParams;
+ uint16_t usNextRetryBackOff = 0U;
+ MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
+ bool xFailedSubscribeToTopic = false;
+ uint32_t ulTopicCount = 0U;
+
+ /* Some fields not used by this demo so start with everything at 0. */
+ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
+
+ /* Get a unique packet id. */
+ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
+
+ /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to
+ * only one topic and uses QoS1. */
+ xMQTTSubscription[ 0 ].qos = MQTTQoS1;
+ xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
+ xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+
+ /* Initialize context for backoff retry attempts if SUBSCRIBE request fails. */
+ BackoffAlgorithm_InitializeParams( &xRetryParams,
+ mqttexampleRETRY_BACKOFF_BASE_MS,
+ mqttexampleRETRY_MAX_BACKOFF_DELAY_MS,
+ mqttexampleRETRY_MAX_ATTEMPTS );
+
+ do
+ {
+ /* The client is now connected to the broker. Subscribe to the topic
+ * as specified in mqttexampleTOPIC at the top of this file by sending a
+ * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
+ * This client will then publish to the same topic it subscribed to, so it
+ * will expect all the messages it sends to the broker to be sent back to it
+ * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish
+ * messages received from the broker will have QOS0. */
+ LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) );
+ xResult = MQTT_Subscribe( pxMQTTContext,
+ xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
+ usSubscribePacketIdentifier );
+ configASSERT( xResult == MQTTSuccess );
+
+ LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) );
+
+ /* Process incoming packet from the broker. After sending the subscribe, the
+ * client may receive a publish before it receives a subscribe ack. Therefore,
+ * call generic incoming packet processing function. Since this demo is
+ * subscribing to the topic to which no one is publishing, probability of
+ * receiving Publish message before subscribe ack is zero; but application
+ * must be ready to receive any packet. This demo uses the generic packet
+ * processing function everywhere to highlight this fact. */
+ xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
+ configASSERT( xResult == MQTTSuccess );
+
+ /* Reset flag before checking suback responses. */
+ xFailedSubscribeToTopic = false;
+
+ /* Check if recent subscription request has been rejected. #xTopicFilterContext is updated
+ * in the event callback to reflect the status of the SUBACK sent by the broker. It represents
+ * either the QoS level granted by the server upon subscription, or acknowledgement of
+ * server rejection of the subscription request. */
+ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
+ {
+ if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure )
+ {
+ xFailedSubscribeToTopic = true;
+
+ /* Generate a random number and calculate backoff value (in milliseconds) for
+ * the next connection retry.
+ * Note: It is recommended to seed the random number generator with a device-specific
+ * entropy source so that possibility of multiple devices retrying failed network operations
+ * at similar intervals can be avoided. */
+ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xRetryParams, uxRand(), &usNextRetryBackOff );
+
+ if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted )
+ {
+ LogError( ( "Server rejected subscription request. All retry attempts have exhausted. Topic=%s",
+ xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) );
+ }
+ else if( xBackoffAlgStatus == BackoffAlgorithmSuccess )
+ {
+ LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.",
+ xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) );
+ /* Backoff before the next re-subscribe attempt. */
+ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) );
+ }
+
+ break;
+ }
+ }
+
+ configASSERT( xBackoffAlgStatus != BackoffAlgorithmRetriesExhausted );
+ } while( ( xFailedSubscribeToTopic == true ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) );
+}
+/*-----------------------------------------------------------*/
+
+static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
+{
+ MQTTStatus_t xResult;
+ MQTTPublishInfo_t xMQTTPublishInfo;
+
+ /***
+ * For readability, error handling in this function is restricted to the use of
+ * asserts().
+ ***/
+
+ /* Some fields are not used by this demo so start with everything at 0. */
+ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
+
+ /* This demo uses QoS1. */
+ xMQTTPublishInfo.qos = MQTTQoS1;
+ xMQTTPublishInfo.retain = false;
+ xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
+ xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+ xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
+ xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
+
+ /* Get a unique packet id. */
+ usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
+
+ /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */
+ xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier );
+
+ configASSERT( xResult == MQTTSuccess );
+}
+/*-----------------------------------------------------------*/
+
+static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
+{
+ MQTTStatus_t xResult;
+ MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
+
+ /* Some fields not used by this demo so start with everything at 0. */
+ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
+
+ /* Get a unique packet id. */
+ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
+
+ /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to
+ * only one topic and uses QoS1. */
+ xMQTTSubscription[ 0 ].qos = MQTTQoS1;
+ xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
+ xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
+
+ /* Get next unique packet identifier. */
+ usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
+
+ /* Send UNSUBSCRIBE packet. */
+ xResult = MQTT_Unsubscribe( pxMQTTContext,
+ xMQTTSubscription,
+ sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
+ usUnsubscribePacketIdentifier );
+
+ configASSERT( xResult == MQTTSuccess );
+}
+/*-----------------------------------------------------------*/
+
+static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
+ uint16_t usPacketId )
+{
+ uint32_t ulTopicCount = 0U;
+
+ switch( pxIncomingPacket->type )
+ {
+ case MQTT_PACKET_TYPE_PUBACK:
+ LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) );
+ /* Make sure ACK packet identifier matches with Request packet identifier. */
+ configASSERT( usPublishPacketIdentifier == usPacketId );
+ break;
+
+ case MQTT_PACKET_TYPE_SUBACK:
+
+ /* A SUBACK from the broker, containing the server response to our subscription request, has been received.
+ * It contains the status code indicating server approval/rejection for the subscription to the single topic
+ * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global
+ * variable #xTopicFilterContext. */
+ prvUpdateSubAckStatus( pxIncomingPacket );
+
+ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
+ {
+ if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure )
+ {
+ LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n",
+ xTopicFilterContext[ ulTopicCount ].pcTopicFilter,
+ xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) );
+ }
+ }
+
+ /* Make sure ACK packet identifier matches with Request packet identifier. */
+ configASSERT( usSubscribePacketIdentifier == usPacketId );
+ break;
+
+ case MQTT_PACKET_TYPE_UNSUBACK:
+ LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) );
+ /* Make sure ACK packet identifier matches with Request packet identifier. */
+ configASSERT( usUnsubscribePacketIdentifier == usPacketId );
+ break;
+
+ case MQTT_PACKET_TYPE_PINGRESP:
+
+ /* Nothing to be done from application as library handles
+ * PINGRESP with the use of MQTT_ProcessLoop API function. */
+ LogWarn( ( "PINGRESP should not be handled by the application "
+ "callback when using MQTT_ProcessLoop.\n" ) );
+ break;
+
+ /* Any other packet type is invalid. */
+ default:
+ LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n",
+ pxIncomingPacket->type ) );
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
+{
+ configASSERT( pxPublishInfo != NULL );
+
+ /* Process incoming Publish. */
+ LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) );
+
+ /* Verify the received publish is for the we have subscribed to. */
+ if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) &&
+ ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) )
+ {
+ LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
+ "Incoming Publish Message : %.*s\r\n",
+ pxPublishInfo->topicNameLength,
+ pxPublishInfo->pTopicName,
+ pxPublishInfo->payloadLength,
+ pxPublishInfo->pPayload ) );
+ }
+ else
+ {
+ LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n",
+ pxPublishInfo->topicNameLength,
+ pxPublishInfo->pTopicName ) );
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static void prvEventCallback( MQTTContext_t * pxMQTTContext,
+ MQTTPacketInfo_t * pxPacketInfo,
+ MQTTDeserializedInfo_t * pxDeserializedInfo )
+{
+ /* The MQTT context is not used for this demo. */
+ ( void ) pxMQTTContext;
+
+ if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
+ {
+ prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo );
+ }
+ else
+ {
+ prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier );
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static uint32_t prvGetTimeMs( void )
+{
+ TickType_t xTickCount = 0;
+ uint32_t ulTimeMs = 0UL;
+
+ /* Get the current tick count. */
+ xTickCount = xTaskGetTickCount();
+
+ /* Convert the ticks to milliseconds. */
+ ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK;
+
+ /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
+ * elapsed time in the application. */
+ ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs );
+
+ return ulTimeMs;
+}
+
+/*-----------------------------------------------------------*/