diff options
Diffstat (limited to 'FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/MQTT_Mutual_Auth_Demo_with_BG96/DemoTasks/MutualAuthMQTTExample.c')
-rw-r--r-- | FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/MQTT_Mutual_Auth_Demo_with_BG96/DemoTasks/MutualAuthMQTTExample.c | 1074 |
1 files changed, 1074 insertions, 0 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/MQTT_Mutual_Auth_Demo_with_BG96/DemoTasks/MutualAuthMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/MQTT_Mutual_Auth_Demo_with_BG96/DemoTasks/MutualAuthMQTTExample.c new file mode 100644 index 000000000..849eb874b --- /dev/null +++ b/FreeRTOS-Plus/Demo/FreeRTOS_Cellular_Interface_Windows_Simulator/MQTT_Mutual_Auth_Demo_with_BG96/DemoTasks/MutualAuthMQTTExample.c @@ -0,0 +1,1074 @@ +/* + * FreeRTOS V202107.00 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * https://www.FreeRTOS.org + * https://github.com/FreeRTOS + * + */ + +/* + * Demo for showing use of the MQTT API using a mutually authenticated + * network connection. + * + * The Example shown below uses MQTT APIs to create MQTT messages and send them + * over the mutually authenticated network connection established with the + * MQTT broker. This example is single threaded and uses statically allocated + * memory. It uses QoS1 for sending to and receiving messages from the broker. + * + * A mutually authenticated TLS connection is used to connect to the + * MQTT message broker in this example. Define democonfigMQTT_BROKER_ENDPOINT, + * democonfigROOT_CA_PEM, democonfigCLIENT_CERTIFICATE_PEM, + * and democonfigCLIENT_PRIVATE_KEY_PEM in demo_config.h to establish a + * mutually authenticated connection. + * + * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an + * alternative run time model whereby coreMQTT runs in an autonomous + * background agent task. Executing the MQTT protocol in an agent task + * removes the need for the application writer to explicitly manage any MQTT + * state or call the MQTT_ProcessLoop() API function. Using an agent task + * also enables multiple application tasks to more easily share a single + * MQTT connection. + */ + +/* Standard includes. */ +#include <string.h> +#include <stdio.h> + +/* Kernel includes. */ +#include "FreeRTOS.h" +#include "task.h" + +/* Demo Specific configs. */ +#include "demo_config.h" + +/* MQTT library includes. */ +#include "core_mqtt.h" + +/* Exponential backoff retry include. */ +#include "backoff_algorithm.h" + +/* Transport interface implementation include header for TLS. */ +#include "using_mbedtls.h" + +/*-----------------------------------------------------------*/ + +/* Compile time error for undefined configs. */ +#ifndef democonfigMQTT_BROKER_ENDPOINT + #error "Define the config democonfigMQTT_BROKER_ENDPOINT by following the instructions in file demo_config.h." +#endif +#ifndef democonfigROOT_CA_PEM + #error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h." +#endif + +/* If no username is defined, then a client certificate/key is required. */ +#ifndef democonfigCLIENT_USERNAME + +/* + *!!! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for + *!!! convenience of demonstration only. Production devices should + *!!! store keys securely, such as within a secure element. + */ + + #ifndef democonfigCLIENT_CERTIFICATE_PEM + #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h." + #endif + #ifndef democonfigCLIENT_PRIVATE_KEY_PEM + #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h." + #endif +#else + +/* If a username is defined, a client password also would need to be defined for + * client authentication. */ + #ifndef democonfigCLIENT_PASSWORD + #error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username/password." + #endif + +/* AWS IoT MQTT broker port needs to be 443 for client authentication based on + * username/password. */ + #if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443 + #error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username/password in AWS IoT Core." + #endif +#endif /* ifndef democonfigCLIENT_USERNAME */ + +/*-----------------------------------------------------------*/ + +/* Default values for configs. */ +#ifndef democonfigCLIENT_IDENTIFIER + +/** + * @brief The MQTT client identifier used in this example. Each client identifier + * must be unique so edit as required to ensure no two clients connecting to the + * same broker use the same client identifier. + * + * @note Appending __TIME__ to the client id string will help to create a unique + * client id every time an application binary is built. Only a single instance of + * this application's compiled binary may be used at a time, since the client ID + * will always be the same. + */ + #define democonfigCLIENT_IDENTIFIER "testClient"__TIME__ +#endif + +#ifndef democonfigMQTT_BROKER_PORT + +/** + * @brief The port to use for the demo. + */ + #define democonfigMQTT_BROKER_PORT ( 8883 ) +#endif + +/*-----------------------------------------------------------*/ + +/** + * @brief The maximum number of retries for network operation with server. + */ +#define mqttexampleRETRY_MAX_ATTEMPTS ( 5U ) + +/** + * @brief The maximum back-off delay (in milliseconds) for retrying failed operation + * with server. + */ +#define mqttexampleRETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) + +/** + * @brief The base back-off delay (in milliseconds) to use for network operation retry + * attempts. + */ +#define mqttexampleRETRY_BACKOFF_BASE_MS ( 500U ) + +/** + * @brief Timeout for receiving CONNACK packet in milliseconds. + */ +#define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 5000U ) + +/** + * @brief The topic to subscribe and publish to in the example. + * + * The topic name starts with the client identifier to ensure that each demo + * interacts with a unique topic name. + */ +#define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic" + +/** + * @brief The number of topic filters to subscribe. + */ +#define mqttexampleTOPIC_COUNT ( 1 ) + +/** + * @brief The MQTT message published in this example. + */ +#define mqttexampleMESSAGE "Hello World!" + +/** + * @brief Time in ticks to wait between each cycle of the demo implemented + * by prvMQTTDemoTask(). + */ +#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) ) + +/** + * @brief Timeout for MQTT_ProcessLoop in milliseconds. + */ +#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 5000U ) + +/** + * @brief Keep alive time reported to the broker while establishing + * an MQTT connection. + * + * It is the responsibility of the Client to ensure that the interval between + * Control Packets being sent does not exceed the this Keep Alive value. In the + * absence of sending any other Control Packets, the Client MUST send a + * PINGREQ Packet. + */ +#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) + +/** + * @brief Delay (in ticks) between consecutive cycles of MQTT publish operations in a + * demo iteration. + * + * Note that the process loop also has a timeout, so the total time between + * publishes is the sum of the two delays. + */ +#define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) ) + +/** + * @brief Transport timeout in milliseconds for transport send and receive. + */ +#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 10000U ) + +/** + * @brief ALPN (Application-Layer Protocol Negotiation) protocol name for AWS IoT MQTT. + * + * This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker. + * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint + * in the link below. + * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ + */ +#define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca" + +/** + * @brief This is the ALPN (Application-Layer Protocol Negotiation) string + * required by AWS IoT for password-based authentication using TCP port 443. + */ +#define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt" + +/** + * Provide default values for undefined configuration settings. + */ +#ifndef democonfigOS_NAME + #define democonfigOS_NAME "FreeRTOS" +#endif + +#ifndef democonfigOS_VERSION + #define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER +#endif + +#ifndef democonfigHARDWARE_PLATFORM_NAME + #define democonfigHARDWARE_PLATFORM_NAME "WinSim" +#endif + +#ifndef democonfigMQTT_LIB + #define democonfigMQTT_LIB "core-mqtt@"MQTT_LIBRARY_VERSION +#endif + +/** + * @brief The MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING \ + "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \ + "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB + +/** + * @brief The length of the MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) ) + +#ifdef democonfigCLIENT_USERNAME + +/** + * @brief Append the username with the metrics string if #democonfigCLIENT_USERNAME is defined. + * + * This is to support both metrics reporting and username/password based client + * authentication by AWS IoT. + */ + #define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING +#endif + +/** + * @brief Milliseconds per second. + */ +#define MILLISECONDS_PER_SECOND ( 1000U ) + +/** + * @brief Milliseconds per FreeRTOS tick. + */ +#define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) + +/*-----------------------------------------------------------*/ + +/** + * @brief Each compilation unit that consumes the NetworkContext must define it. + * It should contain a single pointer to the type of your desired transport. + * When using multiple transports in the same compilation unit, define this pointer as void *. + * + * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport. + */ +struct NetworkContext +{ + TlsTransportParams_t * pParams; +}; + +/*-----------------------------------------------------------*/ + +/** + * @brief The task used to demonstrate the MQTT API. + * + * @param[in] pvParameters Parameters as passed at the time of task creation. Not + * used in this example. + */ +static void prvMQTTDemoTask( void * pvParameters ); + + +/** + * @brief Connect to MQTT broker with reconnection retries. + * + * If connection fails, retry is attempted after a timeout. + * Timeout value will exponentially increase until maximum + * timeout value is reached or the number of attempts are exhausted. + * + * @param[out] pxNetworkContext The parameter to return the created network context. + * + * @return The status of the final connection attempt. + */ +static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, + NetworkContext_t * pNetworkContext ); + +/** + * @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection. + * + * @param[in, out] pxMQTTContext MQTT context pointer. + * @param[in] xNetworkContext Network context. + */ +static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, + NetworkContext_t * pxNetworkContext ); + +/** + * @brief Function to update variable #xTopicFilterContext with status + * information from Subscribe ACK. Called by the event callback after processing + * an incoming SUBACK packet. + * + * @param[in] Server response to the subscription request. + */ +static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ); + +/** + * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of + * this file. In the case of a Subscribe ACK failure, then subscription is + * retried using an exponential backoff strategy with jitter. + * + * @param[in] pxMQTTContext MQTT context pointer. + */ +static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ); + +/** + * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic. + * + * @param[in] pxMQTTContext MQTT context pointer. + */ +static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ); + +/** + * @brief Unsubscribes from the previously subscribed topic as specified + * in mqttexampleTOPIC. + * + * @param[in] pxMQTTContext MQTT context pointer. + */ +static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); + +/** + * @brief The timer query function provided to the MQTT context. + * + * @return Time in milliseconds. + */ +static uint32_t prvGetTimeMs( void ); + +/** + * @brief Process a response or ack to an MQTT request (PING, PUBLISH, + * SUBSCRIBE or UNSUBSCRIBE). This function processes PINGRESP, PUBACK, + * SUBACK, and UNSUBACK. + * + * @param[in] pxIncomingPacket is a pointer to structure containing deserialized + * MQTT response. + * @param[in] usPacketId is the packet identifier from the ack received. + */ +static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, + uint16_t usPacketId ); + +/** + * @brief Process incoming Publish message. + * + * @param[in] pxPublishInfo is a pointer to structure containing deserialized + * Publish message. + */ +static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); + +/** + * @brief The application callback function for getting the incoming publishes, + * incoming acks, and ping responses reported from the MQTT library. + * + * @param[in] pxMQTTContext MQTT context pointer. + * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. + * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. + */ +static void prvEventCallback( MQTTContext_t * pxMQTTContext, + MQTTPacketInfo_t * pxPacketInfo, + MQTTDeserializedInfo_t * pxDeserializedInfo ); + +/*-----------------------------------------------------------*/ + +/** + * @brief Static buffer used to hold MQTT messages being sent and received. + */ +static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ]; + +/** + * @brief Global entry time into the application to use as a reference timestamp + * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference + * between the current time and the global entry time. This will reduce the chances + * of overflow for the 32 bit unsigned integer used for holding the timestamp. + */ +static uint32_t ulGlobalEntryTimeMs; + +/** + * @brief Packet Identifier generated when Publish request was sent to the broker; + * it is used to match received Publish ACK to the transmitted Publish packet. + */ +static uint16_t usPublishPacketIdentifier; + +/** + * @brief Packet Identifier generated when Subscribe request was sent to the broker; + * it is used to match received Subscribe ACK to the transmitted Subscribe packet. + */ +static uint16_t usSubscribePacketIdentifier; + +/** + * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; + * it is used to match received Unsubscribe response to the transmitted Unsubscribe + * request. + */ +static uint16_t usUnsubscribePacketIdentifier; + +/** + * @brief A pair containing a topic filter and its SUBACK status. + */ +typedef struct topicFilterContext +{ + const char * pcTopicFilter; + MQTTSubAckStatus_t xSubAckStatus; +} topicFilterContext_t; + +/** + * @brief An array containing the context of a SUBACK; the SUBACK status + * of a filter is updated when the event callback processes a SUBACK. + */ +static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] = +{ + { mqttexampleTOPIC, MQTTSubAckFailure } +}; + + +/** @brief Static buffer used to hold MQTT messages being sent and received. */ +static MQTTFixedBuffer_t xBuffer = +{ + ucSharedBuffer, + democonfigNETWORK_BUFFER_SIZE +}; + +/*-----------------------------------------------------------*/ + +/* + * @brief Create the task that demonstrates the MQTT API Demo over a + * mutually authenticated network connection with MQTT broker. + */ +void vStartSimpleMQTTDemo( void ) +{ + /* This example uses a single application task, which in turn is used to + * connect, subscribe, publish, unsubscribe and disconnect from the MQTT + * broker. + * + * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an + * alternative run time model whereby coreMQTT runs in an autonomous + * background agent task. Executing the MQTT protocol in an agent task + * removes the need for the application writer to explicitly manage any MQTT + * state or call the MQTT_ProcessLoop() API function. Using an agent task + * also enables multiple application tasks to more easily share a single + * MQTT connection. */ + prvMQTTDemoTask( NULL ); +} +/*-----------------------------------------------------------*/ + +/* + * @brief The Example shown below uses MQTT APIs to create MQTT messages and + * send them over the mutually authenticated network connection established with the + * MQTT broker. This example is single threaded and uses statically allocated + * memory. It uses QoS1 for sending to and receiving messages from the broker. + * + * This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the + * top of this file by sending a subscribe packet and then waiting for a subscribe + * acknowledgment (SUBACK).This client will then publish to the same topic it + * subscribed to, so it will expect all the messages it sends to the broker to be + * sent back to it from the broker. + */ +static void prvMQTTDemoTask( void * pvParameters ) +{ + uint32_t ulPublishCount = 0U, ulTopicCount = 0U; + const uint32_t ulMaxPublishCount = 5UL; + NetworkContext_t xNetworkContext = { 0 }; + TlsTransportParams_t xTlsTransportParams = { 0 }; + NetworkCredentials_t xNetworkCredentials = { 0 }; + MQTTContext_t xMQTTContext = { 0 }; + MQTTStatus_t xMQTTStatus; + TlsTransportStatus_t xNetworkStatus; + + /* Remove compiler warnings about unused parameters. */ + ( void ) pvParameters; + + /* Set the entry time of the demo application. This entry time will be used + * to calculate relative time elapsed in the execution of the demo application, + * by the timer utility function that is provided to the MQTT library. + */ + ulGlobalEntryTimeMs = prvGetTimeMs(); + + /* Set the pParams member of the network context with desired transport. */ + xNetworkContext.pParams = &xTlsTransportParams; + + for( ; ; ) + { + /****************************** Connect. ******************************/ + + /* Attempt to establish TLS session with MQTT broker. If connection fails, + * retry after a timeout. Timeout value will be exponentially increased + * until the maximum number of attempts are reached or the maximum timeout + * value is reached. The function returns a failure status if the TCP + * connection cannot be established to the broker after the configured + * number of attempts. */ + xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials, + &xNetworkContext ); + configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS ); + + /* Sends an MQTT Connect packet over the already established TLS connection, + * and waits for connection acknowledgment (CONNACK) packet. */ + LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); + + /**************************** Subscribe. ******************************/ + + /* If server rejected the subscription request, attempt to resubscribe to + * topic. Attempts are made according to the exponential backoff retry + * strategy implemented in BackoffAlgorithm. */ + prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); + + /****************** Publish and Keep Alive Loop. **********************/ + /* Publish messages with QoS1, send and process Keep alive messages. */ + for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) + { + LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + prvMQTTPublishToTopic( &xMQTTContext ); + + /* Process incoming publish echo, since application subscribed to the + * same topic, the broker will send publish message back to the + * application. */ + LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); + xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + configASSERT( xMQTTStatus == MQTTSuccess ); + + /* Leave Connection Idle for some time. */ + LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) ); + vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ); + } + + /******************** Unsubscribe from the topic. *********************/ + LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + prvMQTTUnsubscribeFromTopic( &xMQTTContext ); + + /* Process incoming UNSUBACK packet from the broker. */ + xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + configASSERT( xMQTTStatus == MQTTSuccess ); + + /**************************** Disconnect. *****************************/ + + /* Send an MQTT Disconnect packet over the already connected TLS over + * TCP connection. There is no corresponding response for the disconnect + * packet. After sending disconnect, client must close the network + * connection. */ + LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", + democonfigMQTT_BROKER_ENDPOINT ) ); + xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); + configASSERT( xMQTTStatus == MQTTSuccess ); + + /* Close the network connection. */ + TLS_FreeRTOS_Disconnect( &xNetworkContext ); + + /* Reset SUBACK status for each topic filter after completion of + * subscription request cycle. */ + for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) + { + xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; + } + + /* Wait for some time between two iterations to ensure that we do not + * bombard the broker. */ + LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. " + "Total free heap is %u.\r\n", + xPortGetFreeHeapSize() ) ); + LogInfo( ( "Demo completed successfully.\r\n" ) ); + LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); + vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ); + } +} +/*-----------------------------------------------------------*/ + +static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, + NetworkContext_t * pxNetworkContext ) +{ + TlsTransportStatus_t xNetworkStatus; + BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; + BackoffAlgorithmContext_t xReconnectParams; + uint16_t usNextRetryBackOff = 0U; + + #ifdef democonfigUSE_AWS_IOT_CORE_BROKER + + /* ALPN protocols must be a NULL-terminated list of strings. Therefore, + * the first entry will contain the actual ALPN protocol string while the + * second entry must remain NULL. */ + char * pcAlpnProtocols[] = { NULL, NULL }; + + /* The ALPN string changes depending on whether username/password authentication is used. */ + #ifdef democonfigCLIENT_USERNAME + pcAlpnProtocols[ 0 ] = AWS_IOT_CUSTOM_AUTH_ALPN; + #else + pcAlpnProtocols[ 0 ] = AWS_IOT_MQTT_ALPN; + #endif + pxNetworkCredentials->pAlpnProtos = pcAlpnProtocols; + #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + + pxNetworkCredentials->disableSni = democonfigDISABLE_SNI; + /* Set the credentials for establishing a TLS connection. */ + pxNetworkCredentials->pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM; + pxNetworkCredentials->rootCaSize = sizeof( democonfigROOT_CA_PEM ); + #ifdef democonfigCLIENT_CERTIFICATE_PEM + pxNetworkCredentials->pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM; + pxNetworkCredentials->clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM ); + pxNetworkCredentials->pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM; + pxNetworkCredentials->privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM ); + #endif + + /* Initialize reconnect attempts and interval. */ + BackoffAlgorithm_InitializeParams( &xReconnectParams, + mqttexampleRETRY_BACKOFF_BASE_MS, + mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, + mqttexampleRETRY_MAX_ATTEMPTS ); + + /* Attempt to connect to MQTT broker. If connection fails, retry after + * a timeout. Timeout value will exponentially increase till maximum + * attempts are reached. + */ + do + { + /* Establish a TLS session with the MQTT broker. This example connects to + * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and + * democonfigMQTT_BROKER_PORT at the top of this file. */ + LogInfo( ( "Creating a TLS connection to %s:%u.\r\n", + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT ) ); + /* Attempt to create a mutually authenticated TLS connection. */ + xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext, + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT, + pxNetworkCredentials, + mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, + mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); + + if( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) + { + /* Generate a random number and calculate backoff value (in milliseconds) for + * the next connection retry. + * Note: It is recommended to seed the random number generator with a device-specific + * entropy source so that possibility of multiple devices retrying failed network operations + * at similar intervals can be avoided. */ + xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff ); + + if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) + { + LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); + } + else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) + { + LogWarn( ( "Connection to the broker failed. " + "Retrying connection with backoff and jitter." ) ); + vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); + } + } + } while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); + + return xNetworkStatus; +} +/*-----------------------------------------------------------*/ + +static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, + NetworkContext_t * pxNetworkContext ) +{ + MQTTStatus_t xResult; + MQTTConnectInfo_t xConnectInfo; + bool xSessionPresent; + TransportInterface_t xTransport; + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + /* Fill in Transport Interface send and receive function pointers. */ + xTransport.pNetworkContext = pxNetworkContext; + xTransport.send = TLS_FreeRTOS_send; + xTransport.recv = TLS_FreeRTOS_recv; + + /* Initialize MQTT library. */ + xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); + configASSERT( xResult == MQTTSuccess ); + + /* Some fields are not used in this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); + + /* Start with a clean session i.e. direct the MQTT broker to discard any + * previous session data. Also, establishing a connection with clean session + * will ensure that the broker does not store any data when this client + * gets disconnected. */ + xConnectInfo.cleanSession = true; + + /* The client identifier is used to uniquely identify this MQTT client to + * the MQTT broker. In a production device the identifier can be something + * unique, such as a device serial number. */ + xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; + xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); + + /* Set MQTT keep-alive period. If the application does not send packets at an interval less than + * the keep-alive period, the MQTT library will send PINGREQ packets. */ + xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; + + /* Append metrics when connecting to the AWS IoT Core broker. */ + #ifdef democonfigUSE_AWS_IOT_CORE_BROKER + #ifdef democonfigCLIENT_USERNAME + xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS; + xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS ); + xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; + xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); + #else + xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; + xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH; + /* Password for authentication is not used. */ + xConnectInfo.pPassword = NULL; + xConnectInfo.passwordLength = 0U; + #endif + #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + #ifdef democonfigCLIENT_USERNAME + xConnectInfo.pUserName = democonfigCLIENT_USERNAME; + xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME ); + xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; + xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); + #endif /* ifdef democonfigCLIENT_USERNAME */ + #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + + /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it + * is passed as NULL. */ + xResult = MQTT_Connect( pxMQTTContext, + &xConnectInfo, + NULL, + mqttexampleCONNACK_RECV_TIMEOUT_MS, + &xSessionPresent ); + configASSERT( xResult == MQTTSuccess ); + + /* Successfully established and MQTT connection with the broker. */ + LogInfo( ( "An MQTT connection is established with %s.", democonfigMQTT_BROKER_ENDPOINT ) ); +} +/*-----------------------------------------------------------*/ + +static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ) +{ + MQTTStatus_t xResult = MQTTSuccess; + uint8_t * pucPayload = NULL; + size_t ulSize = 0; + uint32_t ulTopicCount = 0U; + + xResult = MQTT_GetSubAckStatusCodes( pxPacketInfo, &pucPayload, &ulSize ); + + /* MQTT_GetSubAckStatusCodes always returns success if called with packet info + * from the event callback and non-NULL parameters. */ + configASSERT( xResult == MQTTSuccess ); + + for( ulTopicCount = 0; ulTopicCount < ulSize; ulTopicCount++ ) + { + xTopicFilterContext[ ulTopicCount ].xSubAckStatus = pucPayload[ ulTopicCount ]; + } +} +/*-----------------------------------------------------------*/ + +static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) +{ + MQTTStatus_t xResult = MQTTSuccess; + BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; + BackoffAlgorithmContext_t xRetryParams; + uint16_t usNextRetryBackOff = 0U; + MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; + bool xFailedSubscribeToTopic = false; + uint32_t ulTopicCount = 0U; + + /* Some fields not used by this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); + + /* Get a unique packet id. */ + usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); + + /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to + * only one topic and uses QoS1. */ + xMQTTSubscription[ 0 ].qos = MQTTQoS1; + xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; + xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + + /* Initialize context for backoff retry attempts if SUBSCRIBE request fails. */ + BackoffAlgorithm_InitializeParams( &xRetryParams, + mqttexampleRETRY_BACKOFF_BASE_MS, + mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, + mqttexampleRETRY_MAX_ATTEMPTS ); + + do + { + /* The client is now connected to the broker. Subscribe to the topic + * as specified in mqttexampleTOPIC at the top of this file by sending a + * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). + * This client will then publish to the same topic it subscribed to, so it + * will expect all the messages it sends to the broker to be sent back to it + * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish + * messages received from the broker will have QOS0. */ + LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + xResult = MQTT_Subscribe( pxMQTTContext, + xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + usSubscribePacketIdentifier ); + configASSERT( xResult == MQTTSuccess ); + + LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) ); + + /* Process incoming packet from the broker. After sending the subscribe, the + * client may receive a publish before it receives a subscribe ack. Therefore, + * call generic incoming packet processing function. Since this demo is + * subscribing to the topic to which no one is publishing, probability of + * receiving Publish message before subscribe ack is zero; but application + * must be ready to receive any packet. This demo uses the generic packet + * processing function everywhere to highlight this fact. */ + xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + configASSERT( xResult == MQTTSuccess ); + + /* Reset flag before checking suback responses. */ + xFailedSubscribeToTopic = false; + + /* Check if recent subscription request has been rejected. #xTopicFilterContext is updated + * in the event callback to reflect the status of the SUBACK sent by the broker. It represents + * either the QoS level granted by the server upon subscription, or acknowledgement of + * server rejection of the subscription request. */ + for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) + { + if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure ) + { + xFailedSubscribeToTopic = true; + + /* Generate a random number and calculate backoff value (in milliseconds) for + * the next connection retry. + * Note: It is recommended to seed the random number generator with a device-specific + * entropy source so that possibility of multiple devices retrying failed network operations + * at similar intervals can be avoided. */ + xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xRetryParams, uxRand(), &usNextRetryBackOff ); + + if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) + { + LogError( ( "Server rejected subscription request. All retry attempts have exhausted. Topic=%s", + xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); + } + else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) + { + LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.", + xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); + /* Backoff before the next re-subscribe attempt. */ + vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); + } + + break; + } + } + + configASSERT( xBackoffAlgStatus != BackoffAlgorithmRetriesExhausted ); + } while( ( xFailedSubscribeToTopic == true ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) +{ + MQTTStatus_t xResult; + MQTTPublishInfo_t xMQTTPublishInfo; + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + /* Some fields are not used by this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); + + /* This demo uses QoS1. */ + xMQTTPublishInfo.qos = MQTTQoS1; + xMQTTPublishInfo.retain = false; + xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; + xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; + xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); + + /* Get a unique packet id. */ + usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); + + /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */ + xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); + + configASSERT( xResult == MQTTSuccess ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) +{ + MQTTStatus_t xResult; + MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; + + /* Some fields not used by this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); + + /* Get a unique packet id. */ + usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); + + /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to + * only one topic and uses QoS1. */ + xMQTTSubscription[ 0 ].qos = MQTTQoS1; + xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; + xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + + /* Get next unique packet identifier. */ + usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); + + /* Send UNSUBSCRIBE packet. */ + xResult = MQTT_Unsubscribe( pxMQTTContext, + xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + usUnsubscribePacketIdentifier ); + + configASSERT( xResult == MQTTSuccess ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, + uint16_t usPacketId ) +{ + uint32_t ulTopicCount = 0U; + + switch( pxIncomingPacket->type ) + { + case MQTT_PACKET_TYPE_PUBACK: + LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usPublishPacketIdentifier == usPacketId ); + break; + + case MQTT_PACKET_TYPE_SUBACK: + + /* A SUBACK from the broker, containing the server response to our subscription request, has been received. + * It contains the status code indicating server approval/rejection for the subscription to the single topic + * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global + * variable #xTopicFilterContext. */ + prvUpdateSubAckStatus( pxIncomingPacket ); + + for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) + { + if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure ) + { + LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n", + xTopicFilterContext[ ulTopicCount ].pcTopicFilter, + xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) ); + } + } + + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usSubscribePacketIdentifier == usPacketId ); + break; + + case MQTT_PACKET_TYPE_UNSUBACK: + LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usUnsubscribePacketIdentifier == usPacketId ); + break; + + case MQTT_PACKET_TYPE_PINGRESP: + + /* Nothing to be done from application as library handles + * PINGRESP with the use of MQTT_ProcessLoop API function. */ + LogWarn( ( "PINGRESP should not be handled by the application " + "callback when using MQTT_ProcessLoop.\n" ) ); + break; + + /* Any other packet type is invalid. */ + default: + LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", + pxIncomingPacket->type ) ); + } +} + +/*-----------------------------------------------------------*/ + +static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) +{ + configASSERT( pxPublishInfo != NULL ); + + /* Process incoming Publish. */ + LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) ); + + /* Verify the received publish is for the we have subscribed to. */ + if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && + ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) + { + LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" + "Incoming Publish Message : %.*s\r\n", + pxPublishInfo->topicNameLength, + pxPublishInfo->pTopicName, + pxPublishInfo->payloadLength, + pxPublishInfo->pPayload ) ); + } + else + { + LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", + pxPublishInfo->topicNameLength, + pxPublishInfo->pTopicName ) ); + } +} + +/*-----------------------------------------------------------*/ + +static void prvEventCallback( MQTTContext_t * pxMQTTContext, + MQTTPacketInfo_t * pxPacketInfo, + MQTTDeserializedInfo_t * pxDeserializedInfo ) +{ + /* The MQTT context is not used for this demo. */ + ( void ) pxMQTTContext; + + if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) + { + prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); + } + else + { + prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); + } +} + +/*-----------------------------------------------------------*/ + +static uint32_t prvGetTimeMs( void ) +{ + TickType_t xTickCount = 0; + uint32_t ulTimeMs = 0UL; + + /* Get the current tick count. */ + xTickCount = xTaskGetTickCount(); + + /* Convert the ticks to milliseconds. */ + ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; + + /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the + * elapsed time in the application. */ + ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); + + return ulTimeMs; +} + +/*-----------------------------------------------------------*/ |