summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c')
-rw-r--r--FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c1069
1 files changed, 1069 insertions, 0 deletions
diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c
new file mode 100644
index 000000000..a93f1e604
--- /dev/null
+++ b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c
@@ -0,0 +1,1069 @@
+/*
+ * FreeRTOS V202111.00
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * https://www.FreeRTOS.org
+ * https://github.com/FreeRTOS
+ *
+ */
+
+/**
+ * @file mqtt_operations.c
+ *
+ * @brief This file provides wrapper functions for MQTT operations on a mutually
+ * authenticated TLS connection.
+ *
+ * A mutually authenticated TLS connection is used to connect to the AWS IoT
+ * MQTT message broker in this example. Run the setup script
+ * (fleet_provisioning_demo_setup.py) and define democonfigROOT_CA_PEM
+ * in demo_config.h to achieve mutual authentication.
+ */
+
+/* Standard includes. */
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+/* Config include. */
+#include "demo_config.h"
+
+/* Interface include. */
+#include "mqtt_operations.h"
+
+/* MbedTLS transport include. */
+#include "using_mbedtls_pkcs11.h"
+
+/*Include backoff algorithm header for retry logic.*/
+#include "backoff_algorithm.h"
+
+/**
+ * These configurations are required. Throw compilation error if the below
+ * configs are not defined.
+ */
+#ifndef democonfigMQTT_BROKER_ENDPOINT
+ #error "Please define AWS IoT MQTT broker endpoint(democonfigMQTT_BROKER_ENDPOINT) in demo_config.h."
+#endif
+#ifndef democonfigROOT_CA_PEM
+ #error "Please define the PEM-encoded Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h."
+#endif
+#ifndef democonfigCLIENT_IDENTIFIER
+ #error "Please define a unique democonfigCLIENT_IDENTIFIER."
+#endif
+
+/**
+ * Provide default values for undefined configuration settings.
+ */
+#ifndef democonfigMQTT_BROKER_PORT
+ #define democonfigMQTT_BROKER_PORT ( 8883 )
+#endif
+
+#ifndef democonfigNETWORK_BUFFER_SIZE
+ #define democonfigNETWORK_BUFFER_SIZE ( 1024U )
+#endif
+
+/**
+ * @brief Length of the AWS IoT endpoint.
+ */
+#define democonfigMQTT_BROKER_ENDPOINT_LENGTH ( ( uint16_t ) ( sizeof( democonfigMQTT_BROKER_ENDPOINT ) - 1 ) )
+
+/**
+ * @brief Length of the client identifier.
+ */
+#define mqttopCLIENT_IDENTIFIER_LENGTH ( ( uint16_t ) ( sizeof( democonfigCLIENT_IDENTIFIER ) - 1 ) )
+
+/**
+ * @brief ALPN protocol name for AWS IoT MQTT.
+ *
+ * This will be used if the democonfigMQTT_BROKER_PORT is configured as 443 for AWS IoT MQTT
+ * broker. Please see more details about the ALPN protocol for AWS IoT MQTT
+ * endpoint in the link below.
+ * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/
+ *
+ * @note OpenSSL requires that the protocol string passed to it for configuration be encoded
+ * with the prefix of 8-bit length information of the string. Thus, the 14 byte (0x0e) length
+ * information is prefixed to the string.
+ */
+#define mqttopALPN_PROTOCOL_NAME "\x0ex-amzn-mqtt-ca"
+
+/**
+ * @brief Length of ALPN protocol name.
+ */
+#define mqttopALPN_PROTOCOL_NAME_LENGTH ( ( uint16_t ) ( sizeof( mqttopALPN_PROTOCOL_NAME ) - 1 ) )
+
+/**
+ * @brief The maximum number of retries for connecting to server.
+ */
+#define mqttopCONNECTION_RETRY_MAX_ATTEMPTS ( 5U )
+
+/**
+ * @brief The maximum back-off delay (in milliseconds) for retrying connection to server.
+ */
+#define mqttopCONNECTION_RETRY_MAX_BACKOFF_DELAY_MS ( 5000U )
+
+/**
+ * @brief The base back-off delay (in milliseconds) to use for connection retry attempts.
+ */
+#define mqttopCONNECTION_RETRY_BACKOFF_BASE_MS ( 500U )
+
+/**
+ * @brief Timeout for receiving CONNACK packet in milliseconds.
+ */
+#define mqttopCONNACK_RECV_TIMEOUT_MS ( 1000U )
+
+/**
+ * @brief Maximum number of outgoing publishes maintained in the application
+ * until an ack is received from the broker.
+ */
+#define mqttopMAX_OUTGOING_PUBLISHES ( 5U )
+
+/**
+ * @brief Invalid packet identifier for the MQTT packets. Zero is always an
+ * invalid packet identifier as per MQTT 3.1.1 spec.
+ */
+#define mqttopMQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U )
+
+/**
+ * @brief Timeout for MQTT_ProcessLoop function in milliseconds.
+ */
+#define mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS ( 1000U )
+
+/**
+ * @brief The maximum time interval in seconds which is allowed to elapse
+ * between two Control Packets.
+ *
+ * It is the responsibility of the client to ensure that the interval between
+ * control packets being sent does not exceed the this keep-alive value. In the
+ * absence of sending any other control packets, the client MUST send a
+ * PINGREQ packet.
+ */
+#define mqttopMQTT_KEEP_ALIVE_INTERVAL_SECONDS ( 60U )
+
+/**
+ * @brief Timeout in milliseconds for transport send and receive.
+ */
+#define mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS ( 100U )
+
+/**
+ * @brief Milliseconds per second.
+ */
+#define mqttopMILLISECONDS_PER_SECOND ( 1000U )
+
+/**
+ * @brief Milliseconds per FreeRTOS tick.
+ */
+#define mqttopMILLISECONDS_PER_TICK ( mqttopMILLISECONDS_PER_SECOND / configTICK_RATE_HZ )
+
+/**
+ * @brief The MQTT metrics string expected by AWS IoT MQTT Broker.
+ */
+#define mqttopMETRICS_STRING "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB
+
+/**
+ * @brief The length of the MQTT metrics string.
+ */
+#define mqttopMETRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( mqttopMETRICS_STRING ) - 1 ) )
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Structure to keep the MQTT publish packets until an ack is received
+ * for QoS1 publishes.
+ */
+typedef struct PublishPackets
+{
+ /**
+ * @brief Packet identifier of the publish packet.
+ */
+ uint16_t usPacketId;
+
+ /**
+ * @brief Publish info of the publish packet.
+ */
+ MQTTPublishInfo_t xPubInfo;
+} PublishPackets_t;
+
+/* Each compilation unit must define the NetworkContext struct. */
+struct NetworkContext
+{
+ SSLContext_t * pxParams;
+};
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Global entry time into the application to use as a reference timestamp
+ * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference
+ * between the current time and the global entry time. This will reduce the chances
+ * of overflow for the 32 bit unsigned integer used for holding the timestamp.
+ */
+static uint32_t ulGlobalEntryTimeMs;
+
+/**
+ * @brief Packet Identifier generated when Subscribe request was sent to the broker.
+ *
+ * It is used to match received Subscribe ACK to the transmitted subscribe
+ * request.
+ */
+static uint16_t usGlobalSubscribePacketIdentifier = 0U;
+
+/**
+ * @brief Packet Identifier generated when Unsubscribe request was sent to the broker.
+ *
+ * It is used to match received Unsubscribe ACK to the transmitted unsubscribe
+ * request.
+ */
+static uint16_t usGlobalUnsubscribePacketIdentifier = 0U;
+
+/**
+ * @brief Array to keep the outgoing publish messages.
+ *
+ * These stored outgoing publish messages are kept until a successful ack
+ * is received.
+ */
+static PublishPackets_t pxOutgoingPublishPackets[ mqttopMAX_OUTGOING_PUBLISHES ] = { 0 };
+
+/**
+ * @brief The network buffer must remain valid for the lifetime of the MQTT context.
+ */
+static uint8_t pucBuffer[ democonfigNETWORK_BUFFER_SIZE ];
+
+/**
+ * @brief The MQTT context used for MQTT operation.
+ */
+static MQTTContext_t xMqttContext = { 0 };
+
+/**
+ * @brief The network context used for MbedTLS operation.
+ */
+static NetworkContext_t xNetworkContext = { 0 };
+
+/**
+ * @brief The parameters for MbedTLS operation.
+ */
+static SSLContext_t xTlsContext = { 0 };
+
+/**
+ * @brief The flag to indicate that the mqtt session is established.
+ */
+static bool xMqttSessionEstablished = false;
+
+/**
+ * @brief Callback registered when calling xEstablishMqttSession to get incoming
+ * publish messages.
+ */
+static MQTTPublishCallback_t xAppPublishCallback = NULL;
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief The random number generator to use for exponential backoff with
+ * jitter retry logic.
+ *
+ * @return The generated random number.
+ */
+static uint32_t prvGenerateRandomNumber( void );
+
+/**
+ * @brief Connect to the MQTT broker with reconnection retries.
+ *
+ * If connection fails, retry is attempted after a timeout. Timeout value
+ * exponentially increases until maximum timeout value is reached or the number
+ * of attempts are exhausted.
+ *
+ * @param[out] pxNetworkContext The created network context.
+ * @param[in] pcClientCertLabel The client certificate PKCS #11 label to use.
+ * @param[in] pcPrivateKeyLabel The private key PKCS #11 label for the client certificate.
+ *
+ * @return false on failure; true on successful connection.
+ */
+static bool prvConnectToBrokerWithBackoffRetries( NetworkContext_t * pxNetworkContext,
+ char * pcClientCertLabel,
+ char * pcPrivateKeyLabel );
+
+/**
+ * @brief Get the free index in the #pxOutgoingPublishPackets array at which an
+ * outgoing publish can be stored.
+ *
+ * @param[out] pucIndex The index at which an outgoing publish can be stored.
+ *
+ * @return false if no more publishes can be stored;
+ * true if an index to store the next outgoing publish is obtained.
+ */
+static bool prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex );
+
+/**
+ * @brief Clean up the outgoing publish at given index from the
+ * #pxOutgoingPublishPackets array.
+ *
+ * @param[in] ucIndex The ucIndex at which a publish message has to be cleaned up.
+ */
+static void prvCleanupOutgoingPublishAt( uint8_t ucIndex );
+
+/**
+ * @brief Clean up all the outgoing publishes in the #pxOutgoingPublishPackets array.
+ */
+static void prvCleanupOutgoingPublishes( void );
+
+/**
+ * @brief Clean up the publish packet with the given packet id in the
+ * #pxOutgoingPublishPackets array.
+ *
+ * @param[in] usPacketId Packet id of the packet to be clean.
+ */
+static void prvCleanupOutgoingPublishWithPacketID( uint16_t usPacketId );
+
+/**
+ * @brief Callback registered with the MQTT library.
+ *
+ * @param[in] pxMqttContext MQTT context pointer.
+ * @param[in] pxPacketInfo Packet Info pointer for the incoming packet.
+ * @param[in] pxDeserializedInfo Deserialized information from the incoming packet.
+ */
+static void prvMqttCallback( MQTTContext_t * pxMqttContext,
+ MQTTPacketInfo_t * pxPacketInfo,
+ MQTTDeserializedInfo_t * pxDeserializedInfo );
+
+/**
+ * @brief Resend the publishes if a session is re-established with the broker.
+ *
+ * This function handles the resending of the QoS1 publish packets, which are
+ * maintained locally.
+ *
+ * @param[in] pxMqttContext The MQTT context pointer.
+ *
+ * @return true if all the unacknowledged QoS1 publishes are re-sent successfully;
+ * false otherwise.
+ */
+static bool prvHandlePublishResend( MQTTContext_t * pxMqttContext );
+
+/**
+ * @brief The timer query function provided to the MQTT context.
+ *
+ * @return Time in milliseconds.
+ */
+static uint32_t prvGetTimeMs( void );
+
+/*-----------------------------------------------------------*/
+
+static uint32_t prvGenerateRandomNumber()
+{
+ return( ( uint32_t ) rand() );
+}
+
+/*-----------------------------------------------------------*/
+
+static bool prvConnectToBrokerWithBackoffRetries( NetworkContext_t * pxNetworkContext,
+ char * pcClientCertLabel,
+ char * pcPrivateKeyLabel )
+{
+ bool xReturnStatus = false;
+ BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
+ TlsTransportStatus_t xTlsStatus = TLS_TRANSPORT_SUCCESS;
+ BackoffAlgorithmContext_t xReconnectParams;
+ NetworkCredentials_t xTlsCredentials = { 0 };
+ uint16_t usNextRetryBackOff = 0U;
+ const char * pcAlpn[] = { mqttopALPN_PROTOCOL_NAME, NULL };
+
+ /* Set the pParams member of the network context with desired transport. */
+ pxNetworkContext->pxParams = &xTlsContext;
+
+ /* Initialize credentials for establishing TLS session. */
+ xTlsCredentials.pRootCa = democonfigROOT_CA_PEM;
+ xTlsCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM );
+ xTlsCredentials.pClientCertLabel = pcClientCertLabel;
+ xTlsCredentials.pPrivateKeyLabel = pcPrivateKeyLabel;
+
+ /* AWS IoT requires devices to send the Server Name Indication (SNI)
+ * extension to the Transport Layer Security (TLS) protocol and provide
+ * the complete endpoint address in the host_name field. Details about
+ * SNI for AWS IoT can be found in the link below.
+ * https://docs.aws.amazon.com/iot/latest/developerguide/transport-security.html
+ */
+ xTlsCredentials.disableSni = false;
+
+ if( democonfigMQTT_BROKER_PORT == 443 )
+ {
+ /* Pass the ALPN protocol name depending on the port being used.
+ * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint
+ * in the link below.
+ * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/
+ */
+ xTlsCredentials.pAlpnProtos = pcAlpn;
+ }
+
+ /* Initialize reconnect attempts and interval */
+ BackoffAlgorithm_InitializeParams( &xReconnectParams,
+ mqttopCONNECTION_RETRY_BACKOFF_BASE_MS,
+ mqttopCONNECTION_RETRY_MAX_BACKOFF_DELAY_MS,
+ mqttopCONNECTION_RETRY_MAX_ATTEMPTS );
+
+ do
+ {
+ /* Establish a TLS session with the MQTT broker. This example connects
+ * to the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and democonfigMQTT_BROKER_PORT
+ * at the demo config header. */
+ LogDebug( ( "Establishing a TLS session to %.*s:%d.",
+ democonfigMQTT_BROKER_ENDPOINT_LENGTH,
+ democonfigMQTT_BROKER_ENDPOINT,
+ democonfigMQTT_BROKER_PORT ) );
+
+ xTlsStatus = TLS_FreeRTOS_Connect( pxNetworkContext,
+ democonfigMQTT_BROKER_ENDPOINT,
+ democonfigMQTT_BROKER_PORT,
+ &xTlsCredentials,
+ mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS );
+
+ if( xTlsStatus == TLS_TRANSPORT_SUCCESS )
+ {
+ /* Connection successful. */
+ xReturnStatus = true;
+ }
+ else
+ {
+ /* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */
+ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, prvGenerateRandomNumber(), &usNextRetryBackOff );
+
+ if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted )
+ {
+ LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
+ }
+ else if( xBackoffAlgStatus == BackoffAlgorithmSuccess )
+ {
+ LogWarn( ( "Connection to the broker failed. Retrying connection "
+ "after %hu ms backoff.",
+ ( unsigned short ) usNextRetryBackOff ) );
+ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) );
+ }
+ }
+ } while( ( xTlsStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) );
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+static bool prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex )
+{
+ bool xReturnStatus = false;
+ uint8_t ucIndex = 0;
+
+ configASSERT( pxOutgoingPublishPackets != NULL );
+ configASSERT( pucIndex != NULL );
+
+ for( ucIndex = 0; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ )
+ {
+ /* A free index is marked by invalid packet id. Check if the the index
+ * has a free slot. */
+ if( pxOutgoingPublishPackets[ ucIndex ].usPacketId == mqttopMQTT_PACKET_ID_INVALID )
+ {
+ xReturnStatus = true;
+ break;
+ }
+ }
+
+ /* Copy the available index into the output param. */
+ if( xReturnStatus == true )
+ {
+ *pucIndex = ucIndex;
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+static void prvCleanupOutgoingPublishAt( uint8_t ucIndex )
+{
+ configASSERT( pxOutgoingPublishPackets != NULL );
+ configASSERT( ucIndex < mqttopMAX_OUTGOING_PUBLISHES );
+
+ /* Clear the outgoing publish packet. */
+ ( void ) memset( &( pxOutgoingPublishPackets[ ucIndex ] ),
+ 0x00,
+ sizeof( pxOutgoingPublishPackets[ ucIndex ] ) );
+}
+/*-----------------------------------------------------------*/
+
+static void prvCleanupOutgoingPublishes( void )
+{
+ configASSERT( pxOutgoingPublishPackets != NULL );
+
+ /* Clean up all the outgoing publish packets. */
+ ( void ) memset( pxOutgoingPublishPackets, 0x00, sizeof( pxOutgoingPublishPackets ) );
+}
+/*-----------------------------------------------------------*/
+
+static void prvCleanupOutgoingPublishWithPacketID( uint16_t usPacketId )
+{
+ uint8_t ucIndex = 0;
+
+ configASSERT( pxOutgoingPublishPackets != NULL );
+ configASSERT( usPacketId != mqttopMQTT_PACKET_ID_INVALID );
+
+ /* Clean up the saved outgoing publish with packet Id equal to usPacketId. */
+ for( ucIndex = 0; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ )
+ {
+ if( pxOutgoingPublishPackets[ ucIndex ].usPacketId == usPacketId )
+ {
+ prvCleanupOutgoingPublishAt( ucIndex );
+
+ LogDebug( ( "Cleaned up outgoing publish packet with packet id %u.",
+ usPacketId ) );
+
+ break;
+ }
+ }
+}
+/*-----------------------------------------------------------*/
+
+static void prvMqttCallback( MQTTContext_t * pxMqttContext,
+ MQTTPacketInfo_t * pxPacketInfo,
+ MQTTDeserializedInfo_t * pxDeserializedInfo )
+{
+ uint16_t usPacketIdentifier;
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pxPacketInfo != NULL );
+ configASSERT( pxDeserializedInfo != NULL );
+
+ /* Suppress the unused parameter warning when asserts are disabled in
+ * build. */
+ ( void ) pxMqttContext;
+
+ usPacketIdentifier = pxDeserializedInfo->packetIdentifier;
+
+ /* Handle an incoming publish. The lower 4 bits of the publish packet
+ * type is used for the dup, QoS, and retain flags. Hence masking
+ * out the lower bits to check if the packet is publish. */
+ if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
+ {
+ configASSERT( pxDeserializedInfo->pPublishInfo != NULL );
+
+ /* Invoke the application callback for incoming publishes. */
+ if( xAppPublishCallback != NULL )
+ {
+ xAppPublishCallback( pxDeserializedInfo->pPublishInfo, usPacketIdentifier );
+ }
+ }
+ else
+ {
+ /* Handle other packets. */
+ switch( pxPacketInfo->type )
+ {
+ case MQTT_PACKET_TYPE_SUBACK:
+ LogDebug( ( "MQTT Packet type SUBACK received." ) );
+
+ /* Make sure the ACK packet identifier matches with the request
+ * packet identifier. */
+ configASSERT( usGlobalSubscribePacketIdentifier == usPacketIdentifier );
+ break;
+
+ case MQTT_PACKET_TYPE_UNSUBACK:
+ LogDebug( ( "MQTT Packet type UNSUBACK received." ) );
+
+ /* Make sure the ACK packet identifier matches with the request
+ * packet identifier. */
+ configASSERT( usGlobalUnsubscribePacketIdentifier == usPacketIdentifier );
+ break;
+
+ case MQTT_PACKET_TYPE_PINGRESP:
+
+ /* We do not expect to receive PINGRESP as we are using
+ * MQTT_ProcessLoop. */
+ LogWarn( ( "PINGRESP should not be received by the application "
+ "callback when using MQTT_ProcessLoop." ) );
+ break;
+
+ case MQTT_PACKET_TYPE_PUBACK:
+ LogDebug( ( "PUBACK received for packet id %u.",
+ usPacketIdentifier ) );
+
+ /* Cleanup the publish packet from the #pxOutgoingPublishPackets
+ * array when a PUBACK is received. */
+ prvCleanupOutgoingPublishWithPacketID( usPacketIdentifier );
+ break;
+
+ /* Any other packet type is invalid. */
+ default:
+ LogError( ( "Unknown packet type received:(%02x).",
+ pxPacketInfo->type ) );
+ }
+ }
+}
+/*-----------------------------------------------------------*/
+
+static bool prvHandlePublishResend( MQTTContext_t * pxMqttContext )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus = MQTTSuccess;
+ uint8_t ucIndex = 0U;
+
+ configASSERT( pxOutgoingPublishPackets != NULL );
+
+ /* Resend all the QoS1 publishes still in the #pxOutgoingPublishPackets array.
+ * These are the publishes that haven't received a PUBACK yet. When a PUBACK
+ * is received, the corresponding publish is removed from the array. */
+ for( ucIndex = 0U; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ )
+ {
+ if( pxOutgoingPublishPackets[ ucIndex ].usPacketId != mqttopMQTT_PACKET_ID_INVALID )
+ {
+ pxOutgoingPublishPackets[ ucIndex ].xPubInfo.dup = true;
+
+ LogDebug( ( "Sending duplicate PUBLISH with packet id %u.",
+ pxOutgoingPublishPackets[ ucIndex ].usPacketId ) );
+ xMqttStatus = MQTT_Publish( pxMqttContext,
+ &pxOutgoingPublishPackets[ ucIndex ].xPubInfo,
+ pxOutgoingPublishPackets[ ucIndex ].usPacketId );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "Sending duplicate PUBLISH for packet id %u "
+ " failed with status %s.",
+ pxOutgoingPublishPackets[ ucIndex ].usPacketId,
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ break;
+ }
+ else
+ {
+ LogDebug( ( "Sent duplicate PUBLISH successfully for packet id %u.",
+ pxOutgoingPublishPackets[ ucIndex ].usPacketId ) );
+ }
+ }
+ }
+
+ /* Were all the unacknowledged QoS1 publishes successfully re-sent? */
+ if( ucIndex == mqttopMAX_OUTGOING_PUBLISHES )
+ {
+ xReturnStatus = true;
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xEstablishMqttSession( MQTTPublishCallback_t xPublishCallback,
+ char * pcClientCertLabel,
+ char * pcPrivateKeyLabel )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus;
+ MQTTConnectInfo_t xConnectInfo;
+ MQTTFixedBuffer_t xNetworkBuffer;
+ TransportInterface_t xTransport;
+ bool xCreateCleanSession = false;
+ MQTTContext_t * pxMqttContext = &xMqttContext;
+ NetworkContext_t * pxNetworkContext = &xNetworkContext;
+ bool xSessionPresent = false;
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pxNetworkContext != NULL );
+
+ /* Initialize the mqtt context and network context. */
+ ( void ) memset( pxMqttContext, 0U, sizeof( MQTTContext_t ) );
+ ( void ) memset( pxNetworkContext, 0U, sizeof( NetworkContext_t ) );
+
+ xReturnStatus = prvConnectToBrokerWithBackoffRetries( pxNetworkContext,
+ pcClientCertLabel,
+ pcPrivateKeyLabel );
+
+ if( xReturnStatus != true )
+ {
+ /* Log an error to indicate connection failure after all
+ * reconnect attempts are over. */
+ LogError( ( "Failed to connect to MQTT broker %.*s.",
+ democonfigMQTT_BROKER_ENDPOINT_LENGTH,
+ democonfigMQTT_BROKER_ENDPOINT ) );
+ }
+ else
+ {
+ /* Fill in TransportInterface send and receive function pointers.
+ * For this demo, TCP sockets are used to send and receive data
+ * from the network. pxNetworkContext is an SSL context for OpenSSL.*/
+ xTransport.pNetworkContext = pxNetworkContext;
+ xTransport.send = TLS_FreeRTOS_send;
+ xTransport.recv = TLS_FreeRTOS_recv;
+
+ /* Fill the values for network buffer. */
+ xNetworkBuffer.pBuffer = pucBuffer;
+ xNetworkBuffer.size = democonfigNETWORK_BUFFER_SIZE;
+
+ /* Remember the publish callback supplied. */
+ xAppPublishCallback = xPublishCallback;
+
+ /* Initialize the MQTT library. */
+ xMqttStatus = MQTT_Init( pxMqttContext,
+ &xTransport,
+ prvGetTimeMs,
+ prvMqttCallback,
+ &xNetworkBuffer );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ xReturnStatus = false;
+ LogError( ( "MQTT init failed with status %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ /* Establish an MQTT session by sending a CONNECT packet. */
+
+ /* If #xCreateCleanSession is true, start with a clean session
+ * i.e. direct the MQTT broker to discard any previous session data.
+ * If #xCreateCleanSession is false, direct the broker to attempt to
+ * reestablish a session which was already present. */
+ xConnectInfo.cleanSession = xCreateCleanSession;
+
+ /* The client identifier is used to uniquely identify this MQTT client to
+ * the MQTT broker. In a production device the identifier can be something
+ * unique, such as a device serial number. */
+ xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
+ xConnectInfo.clientIdentifierLength = mqttopCLIENT_IDENTIFIER_LENGTH;
+
+ /* The maximum time interval in seconds which is allowed to elapse
+ * between two Control Packets.
+ * It is the responsibility of the client to ensure that the interval between
+ * control packets being sent does not exceed the this keep-alive value. In the
+ * absence of sending any other control packets, the client MUST send a
+ * PINGREQ packet. */
+ xConnectInfo.keepAliveSeconds = mqttopMQTT_KEEP_ALIVE_INTERVAL_SECONDS;
+
+ /* Username and password for authentication. Not used in this demo. */
+ xConnectInfo.pUserName = mqttopMETRICS_STRING;
+ xConnectInfo.userNameLength = mqttopMETRICS_STRING_LENGTH;
+ xConnectInfo.pPassword = NULL;
+ xConnectInfo.passwordLength = 0U;
+
+ /* Send an MQTT CONNECT packet to the broker. */
+ xMqttStatus = MQTT_Connect( pxMqttContext,
+ &xConnectInfo,
+ NULL,
+ mqttopCONNACK_RECV_TIMEOUT_MS,
+ &xSessionPresent );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ xReturnStatus = false;
+ LogError( ( "Connection with MQTT broker failed with status %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ LogDebug( ( "MQTT connection successfully established with broker." ) );
+ }
+ }
+
+ if( xReturnStatus == true )
+ {
+ /* Keep a flag for indicating if MQTT session is established. This
+ * flag will mark that an MQTT DISCONNECT has to be sent at the end
+ * of the demo even if there are intermediate failures. */
+ xMqttSessionEstablished = true;
+ }
+
+ if( xReturnStatus == true )
+ {
+ /* Check if a session is present and if there are any outgoing
+ * publishes that need to be resent. Resending unacknowledged
+ * publishes is needed only if the broker is re-establishing a
+ * session that was already present. */
+ if( xSessionPresent == true )
+ {
+ LogDebug( ( "An MQTT session with broker is re-established. "
+ "Resending unacked publishes." ) );
+
+ /* Handle all the resend of publish messages. */
+ xReturnStatus = prvHandlePublishResend( &xMqttContext );
+ }
+ else
+ {
+ LogDebug( ( "A clean MQTT connection is established."
+ " Cleaning up all the stored outgoing publishes." ) );
+
+ /* Clean up the outgoing publishes waiting for ack as this new
+ * connection doesn't re-establish an existing session. */
+ prvCleanupOutgoingPublishes();
+ }
+ }
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xDisconnectMqttSession( void )
+{
+ MQTTStatus_t xMqttStatus = MQTTSuccess;
+ bool xReturnStatus = false;
+ MQTTContext_t * pxMqttContext = &xMqttContext;
+ NetworkContext_t * pxNetworkContext = &xNetworkContext;
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pxNetworkContext != NULL );
+
+ if( xMqttSessionEstablished == true )
+ {
+ /* Send DISCONNECT. */
+ xMqttStatus = MQTT_Disconnect( pxMqttContext );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "Sending MQTT DISCONNECT failed with status=%u.",
+ xMqttStatus ) );
+ }
+ else
+ {
+ /* MQTT DISCONNECT sent successfully. */
+ xReturnStatus = true;
+ }
+ }
+
+ /* End TLS session, then close TCP connection. */
+ ( void ) TLS_FreeRTOS_Disconnect( pxNetworkContext );
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xSubscribeToTopic( const char * pcTopicFilter,
+ uint16_t usTopicFilterLength )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus;
+ MQTTContext_t * pxMqttContext = &xMqttContext;
+ MQTTSubscribeInfo_t pxSubscriptionList[ 1 ];
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pcTopicFilter != NULL );
+ configASSERT( usTopicFilterLength > 0 );
+
+ /* Start with everything at 0. */
+ ( void ) memset( ( void * ) pxSubscriptionList, 0x00, sizeof( pxSubscriptionList ) );
+
+ /* This example subscribes to only one topic and uses QOS1. */
+ pxSubscriptionList[ 0 ].qos = MQTTQoS1;
+ pxSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter;
+ pxSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength;
+
+ /* Generate packet identifier for the SUBSCRIBE packet. */
+ usGlobalSubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext );
+
+ /* Send SUBSCRIBE packet. */
+ xMqttStatus = MQTT_Subscribe( pxMqttContext,
+ pxSubscriptionList,
+ sizeof( pxSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
+ usGlobalSubscribePacketIdentifier );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ LogDebug( ( "SUBSCRIBE topic %.*s to broker.",
+ usTopicFilterLength,
+ pcTopicFilter ) );
+
+ /* Process incoming packet from the broker. Acknowledgment for subscription
+ * ( SUBACK ) will be received here. However after sending the subscribe, the
+ * client may receive a publish before it receives a subscribe ack. Since this
+ * demo is subscribing to the topic to which no one is publishing, probability
+ * of receiving publish message before subscribe ack is zero; but application
+ * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to
+ * receive packet from network. */
+ xMqttStatus = MQTT_ProcessLoop( pxMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "MQTT_ProcessLoop returned with status = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ xReturnStatus = true;
+ }
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xUnsubscribeFromTopic( const char * pcTopicFilter,
+ uint16_t usTopicFilterLength )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus;
+ MQTTContext_t * pxMqttContext = &xMqttContext;
+ MQTTSubscribeInfo_t pxSubscriptionList[ 1 ];
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pcTopicFilter != NULL );
+ configASSERT( usTopicFilterLength > 0 );
+
+ /* Start with everything at 0. */
+ ( void ) memset( ( void * ) pxSubscriptionList, 0x00, sizeof( pxSubscriptionList ) );
+
+ /* This example subscribes to only one topic and uses QOS1. */
+ pxSubscriptionList[ 0 ].qos = MQTTQoS1;
+ pxSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter;
+ pxSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength;
+
+ /* Generate packet identifier for the UNSUBSCRIBE packet. */
+ usGlobalUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext );
+
+ /* Send UNSUBSCRIBE packet. */
+ xMqttStatus = MQTT_Unsubscribe( pxMqttContext,
+ pxSubscriptionList,
+ sizeof( pxSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
+ usGlobalUnsubscribePacketIdentifier );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ LogDebug( ( "UNSUBSCRIBE sent topic %.*s to broker.",
+ usTopicFilterLength,
+ pcTopicFilter ) );
+
+ /* Process incoming packet from the broker. Acknowledgment for unsubscribe
+ * operation ( UNSUBACK ) will be received here. This demo uses
+ * MQTT_ProcessLoop to receive packet from network. */
+ xMqttStatus = MQTT_ProcessLoop( pxMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "MQTT_ProcessLoop returned with status = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ xReturnStatus = true;
+ }
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xPublishToTopic( const char * pcTopicFilter,
+ uint16_t usTopicFilterLength,
+ const char * pcPayload,
+ size_t xPayloadLength )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus = MQTTSuccess;
+ uint8_t ucPublishIndex = mqttopMAX_OUTGOING_PUBLISHES;
+ MQTTContext_t * pxMqttContext = &xMqttContext;
+
+ configASSERT( pxMqttContext != NULL );
+ configASSERT( pcTopicFilter != NULL );
+ configASSERT( usTopicFilterLength > 0 );
+
+ /* Get the next free index for the outgoing publish. All QoS1 outgoing
+ * publishes are stored until a PUBACK is received. These messages are
+ * stored for supporting a resend if a network connection is broken before
+ * receiving a PUBACK. */
+ xReturnStatus = prvGetNextFreeIndexForOutgoingPublishes( &ucPublishIndex );
+
+ if( xReturnStatus == false )
+ {
+ LogError( ( "Unable to find a free spot for outgoing PUBLISH message." ) );
+ }
+ else
+ {
+ LogDebug( ( "Published payload: %.*s",
+ ( int ) xPayloadLength,
+ ( const char * ) pcPayload ) );
+
+ /* This example publishes to only one topic and uses QOS1. */
+ pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.qos = MQTTQoS1;
+ pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.pTopicName = pcTopicFilter;
+ pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.topicNameLength = usTopicFilterLength;
+ pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.pPayload = pcPayload;
+ pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.payloadLength = xPayloadLength;
+
+ /* Get a new packet id. */
+ pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId = MQTT_GetPacketId( pxMqttContext );
+
+ /* Send PUBLISH packet. */
+ xMqttStatus = MQTT_Publish( pxMqttContext,
+ &pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo,
+ pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "Failed to send PUBLISH packet to broker with error = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ prvCleanupOutgoingPublishAt( ucPublishIndex );
+ xReturnStatus = false;
+ }
+ else
+ {
+ LogDebug( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.",
+ usTopicFilterLength,
+ pcTopicFilter,
+ pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId ) );
+ }
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+bool xProcessLoop( void )
+{
+ bool xReturnStatus = false;
+ MQTTStatus_t xMqttStatus = MQTTSuccess;
+
+ xMqttStatus = MQTT_ProcessLoop( &xMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS );
+
+ if( xMqttStatus != MQTTSuccess )
+ {
+ LogError( ( "MQTT_ProcessLoop returned with status = %s.",
+ MQTT_Status_strerror( xMqttStatus ) ) );
+ }
+ else
+ {
+ LogDebug( ( "MQTT_ProcessLoop successful." ) );
+ xReturnStatus = true;
+ }
+
+ return xReturnStatus;
+}
+/*-----------------------------------------------------------*/
+
+static uint32_t prvGetTimeMs( void )
+{
+ TickType_t xTickCount = 0;
+ uint32_t ulTimeMs = 0UL;
+
+ /* Get the current tick count. */
+ xTickCount = xTaskGetTickCount();
+
+ /* Convert the ticks to milliseconds. */
+ ulTimeMs = ( uint32_t ) xTickCount * mqttopMILLISECONDS_PER_TICK;
+
+ /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
+ * elapsed time in the application. */
+ ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs );
+
+ return ulTimeMs;
+}
+
+/*-----------------------------------------------------------*/