summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMuneeb Ahmed <54290492+muneebahmed10@users.noreply.github.com>2020-06-30 16:57:50 -0700
committerGitHub <noreply@github.com>2020-06-30 16:57:50 -0700
commit57df8b82109bc1e20a7a1816e985feb55975ce90 (patch)
treece87879f4d05803b02a0cdf2f032e339e8d0d0c1
parent0bcbf43fee889df413137678094a8bb314591ed7 (diff)
downloadfreertos-git-57df8b82109bc1e20a7a1816e985feb55975ce90.tar.gz
Copy MQTT files from 10174c4d of CSDK development (#111)
* Copy MQTT files from 6d4e47f3 of CSDK development * Change case * Update MQTT to commit a4ad8baf * Update MQTT to commit 10174c4d
-rwxr-xr-xFreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt3
-rwxr-xr-xFreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt2
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h353
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h527
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h154
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c1939
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c2094
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c1100
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h23
9 files changed, 6195 insertions, 0 deletions
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt
new file mode 100755
index 000000000..ad96185c7
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt
@@ -0,0 +1,3 @@
++ standard
+Contains the implementation of IoT libraries that implement standard protocols
+and interfaces, such as MQTT.
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt
new file mode 100755
index 000000000..721866f72
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt
@@ -0,0 +1,2 @@
++ mqtt
+Contains the implementation of the MQTT library.
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h
new file mode 100644
index 000000000..7dfef1285
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h
@@ -0,0 +1,353 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef MQTT_H
+#define MQTT_H
+
+/* Include config file before other headers. */
+#include "mqtt_config.h"
+#include "mqtt_lightweight.h"
+
+/**
+ * @brief Invalid packet identifier.
+ *
+ * Zero is an invalid packet identifier as per MQTT v3.1.1 spec.
+ */
+#define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U )
+
+struct MQTTApplicationCallbacks;
+typedef struct MQTTApplicationCallbacks MQTTApplicationCallbacks_t;
+
+struct MQTTPubAckInfo;
+typedef struct MQTTPubAckInfo MQTTPubAckInfo_t;
+
+struct MQTTContext;
+typedef struct MQTTContext MQTTContext_t;
+
+struct MQTTTransportInterface;
+typedef struct MQTTTransportInterface MQTTTransportInterface_t;
+
+typedef int32_t (* MQTTTransportSendFunc_t )( NetworkContext_t context,
+ const void * pMessage,
+ size_t bytesToSend );
+
+typedef uint32_t (* MQTTGetCurrentTimeFunc_t )( void );
+
+typedef void (* MQTTEventCallback_t )( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pPacketInfo,
+ uint16_t packetIdentifier,
+ MQTTPublishInfo_t * pPublishInfo );
+
+typedef enum MQTTConnectionStatus
+{
+ MQTTNotConnected,
+ MQTTConnected
+} MQTTConnectionStatus_t;
+
+typedef enum MQTTPublishState
+{
+ MQTTStateNull = 0,
+ MQTTPublishSend,
+ MQTTPubAckSend,
+ MQTTPubRecSend,
+ MQTTPubRelSend,
+ MQTTPubCompSend,
+ MQTTPubAckPending,
+ MQTTPubRelPending,
+ MQTTPubRecPending,
+ MQTTPubCompPending,
+ MQTTPublishDone
+} MQTTPublishState_t;
+
+typedef enum MQTTPubAckType
+{
+ MQTTPuback,
+ MQTTPubrec,
+ MQTTPubrel,
+ MQTTPubcomp
+} MQTTPubAckType_t;
+
+struct MQTTTransportInterface
+{
+ MQTTTransportSendFunc_t send;
+ MQTTTransportRecvFunc_t recv;
+ NetworkContext_t networkContext;
+};
+
+struct MQTTApplicationCallbacks
+{
+ MQTTGetCurrentTimeFunc_t getTime;
+ MQTTEventCallback_t appCallback;
+};
+
+struct MQTTPubAckInfo
+{
+ uint16_t packetId;
+ MQTTQoS_t qos;
+ MQTTPublishState_t publishState;
+};
+
+struct MQTTContext
+{
+ MQTTPubAckInfo_t outgoingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ];
+ size_t outgoingPublishCount;
+ MQTTPubAckInfo_t incomingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ];
+ size_t incomingPublishCount;
+
+ MQTTTransportInterface_t transportInterface;
+ MQTTFixedBuffer_t networkBuffer;
+
+ uint16_t nextPacketId;
+ MQTTConnectionStatus_t connectStatus;
+ MQTTApplicationCallbacks_t callbacks;
+ uint32_t lastPacketTime;
+ bool controlPacketSent;
+
+ /* Keep alive members. */
+ uint16_t keepAliveIntervalSec;
+ uint32_t pingReqSendTimeMs;
+ uint32_t pingRespTimeoutMs;
+ bool waitingForPingResp;
+};
+
+/**
+ * @brief Initialize an MQTT context.
+ *
+ * This function must be called on an MQTT context before any other function.
+ *
+ * @note The getTime callback function must be defined. If there is no time
+ * implementation, it is the responsibility of the application to provide a
+ * dummy function to always return 0, and provide 0 timeouts for functions. This
+ * will ensure all time based functions will run for a single iteration.
+ *
+ * @brief param[in] pContext The context to initialize.
+ * @brief param[in] pTransportInterface The transport interface to use with the context.
+ * @brief param[in] pCallbacks Callbacks to use with the context.
+ * @brief param[in] pNetworkBuffer Network buffer provided for the context.
+ *
+ * @return #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
+ const MQTTTransportInterface_t * pTransportInterface,
+ const MQTTApplicationCallbacks_t * pCallbacks,
+ const MQTTFixedBuffer_t * pNetworkBuffer );
+
+/**
+ * @brief Establish an MQTT session.
+ *
+ * This function will send MQTT CONNECT packet and receive a CONNACK packet. The
+ * send and receive from the network is done through the transport interface.
+ *
+ * The maximum time this function waits for a CONNACK is decided in one of the
+ * following ways:
+ * 1. If #timeoutMs is greater than 0:
+ * #getTime is used to ensure that the function does not wait more than #timeoutMs
+ * for CONNACK.
+ * 2. If #timeoutMs is 0:
+ * The network receive for CONNACK is retried up to the number of times configured
+ * by #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pConnectInfo MQTT CONNECT packet information.
+ * @param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and
+ * Testament is not used.
+ * @param[in] timeoutMs Maximum time in milliseconds to wait for a CONNACK packet.
+ * A zero timeout makes use of the retries for receiving CONNACK as configured with
+ * #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT .
+ * @param[out] pSessionPresent Whether a previous session was present.
+ * Only relevant if not establishing a clean session.
+ *
+ * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to
+ * hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport send failed;
+ * #MQTTRecvFailed if transport receive failed for CONNACK;
+ * #MQTTNoDataAvailable if no data available to receive in transport until
+ * the #timeoutMs for CONNACK;
+ * #MQTTSuccess otherwise.
+ *
+ * @note This API may spend more time than provided in the timeoutMS parameters in
+ * certain conditions as listed below:
+ *
+ * 1. Timeouts are incorrectly configured - If the timeoutMS is less than the
+ * transport receive timeout and if a CONNACK packet is not received within
+ * the transport receive timeout, the API will spend the transport receive
+ * timeout (which is more time than the timeoutMs). It is the case of incorrect
+ * timeout configuration as the timeoutMs parameter passed to this API must be
+ * greater than the transport receive timeout. Please refer to the transport
+ * interface documentation for more details about timeout configurations.
+ *
+ * 2. Partial CONNACK packet is received right before the expiry of the timeout - It
+ * is possible that first two bytes of CONNACK packet (packet type and remaining
+ * length) are received right before the expiry of the timeoutMS. In that case,
+ * the API makes one more network receive call in an attempt to receive the remaining
+ * 2 bytes. In the worst case, it can happen that the remaining 2 bytes are never
+ * received and this API will end up spending timeoutMs + transport receive timeout.
+ */
+MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ uint32_t timeoutMs,
+ bool * pSessionPresent );
+
+/**
+ * @brief Sends MQTT SUBSCRIBE for the given list of topic filters to
+ * the broker.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ *
+ * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to
+ * hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport write failed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId );
+
+/**
+ * @brief Publishes a message to the given topic name.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @brief param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport write failed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId );
+
+/**
+ * @brief Sends an MQTT PINGREQ to broker.
+ *
+ * @param[in] pContext Initialized and connected MQTT context.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport write failed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext );
+
+/**
+ * @brief Sends MQTT UNSUBSCRIBE for the given list of topic filters to
+ * the broker.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ *
+ * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to
+ * hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport write failed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId );
+
+/**
+ * @brief Disconnect an MQTT session.
+ *
+ * @param[in] pContext Initialized and connected MQTT context.
+ *
+ * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to
+ * hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSendFailed if transport send failed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext );
+
+/**
+ * @brief Loop to receive packets from the transport interface. Handles keep
+ * alive.
+ *
+ * @param[in] pContext Initialized and connected MQTT context.
+ * @param[in] timeoutMs Minimum time in milliseconds that the receive loop will
+ * run, unless an error occurs.
+ *
+ * @return #MQTTBadParameter if context is NULL;
+ * #MQTTRecvFailed if a network error occurs during reception;
+ * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
+ * #MQTTBadResponse if an invalid packet is received;
+ * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
+ * pContext->pingRespTimeoutMs milliseconds;
+ * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
+ * invalid transition for the internal state machine;
+ * #MQTTSuccess on success.
+ */
+MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
+ uint32_t timeoutMs );
+
+/**
+ * @brief Loop to receive packets from the transport interface. Does not handle
+ * keep alive.
+ *
+ * @note Passing a timeout value of 0 will run the loop for a single iteration.
+ *
+ * @param[in] pContext Initialized and connected MQTT context.
+ * @param[in] timeoutMs Minimum time in milliseconds that the receive loop will
+ * run, unless an error occurs.
+ *
+ * @return #MQTTBadParameter if context is NULL;
+ * #MQTTRecvFailed if a network error occurs during reception;
+ * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
+ * #MQTTBadResponse if an invalid packet is received;
+ * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
+ * invalid transition for the internal state machine;
+ * #MQTTSuccess on success.
+ */
+MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
+ uint32_t timeoutMs );
+
+/**
+ * @brief Get a packet ID that is valid according to the MQTT 3.1.1 spec.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ *
+ * @return A non-zero number.
+ */
+uint16_t MQTT_GetPacketId( MQTTContext_t * pContext );
+
+/**
+ * @brief Error code to string conversion for MQTT statuses.
+ *
+ * @param[in] status The status to convert to a string.
+ *
+ * @return The string representation of the status.
+ */
+const char * MQTT_Status_strerror( MQTTStatus_t status );
+
+#endif /* ifndef MQTT_H */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h
new file mode 100644
index 000000000..e52b0790f
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h
@@ -0,0 +1,527 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef MQTT_LIGHTWEIGHT_H
+#define MQTT_LIGHTWEIGHT_H
+
+#include <stddef.h>
+#include <stdint.h>
+
+/* bools are only defined in C99+ */
+#if defined( __cplusplus ) || ( defined( __STDC_VERSION__ ) && ( __STDC_VERSION__ >= 199901L ) )
+ #include <stdbool.h>
+#elif !defined( bool )
+ #define bool int8_t
+ #define false ( int8_t ) 0
+ #define true ( int8_t ) 1
+#endif
+
+/* Include config file before other headers. */
+#include "mqtt_config.h"
+
+/* MQTT packet types. */
+#define MQTT_PACKET_TYPE_CONNECT ( ( uint8_t ) 0x10U ) /**< @brief CONNECT (client-to-server). */
+#define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBACK ( ( uint8_t ) 0x40U ) /**< @brief PUBACK (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREC ( ( uint8_t ) 0x50U ) /**< @brief PUBREC (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBREL ( ( uint8_t ) 0x62U ) /**< @brief PUBREL (bidirectional). */
+#define MQTT_PACKET_TYPE_PUBCOMP ( ( uint8_t ) 0x70U ) /**< @brief PUBCOMP (bidirectional). */
+#define MQTT_PACKET_TYPE_SUBSCRIBE ( ( uint8_t ) 0x82U ) /**< @brief SUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_UNSUBSCRIBE ( ( uint8_t ) 0xA2U ) /**< @brief UNSUBSCRIBE (client-to-server). */
+#define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xB0U ) /**< @brief UNSUBACK (server-to-client). */
+#define MQTT_PACKET_TYPE_PINGREQ ( ( uint8_t ) 0xC0U ) /**< @brief PINGREQ (client-to-server). */
+#define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xD0U ) /**< @brief PINGRESP (server-to-client). */
+#define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xE0U ) /**< @brief DISCONNECT (client-to-server). */
+
+
+/**
+ * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec.
+ */
+#define MQTT_PUBLISH_ACK_PACKET_SIZE ( 4UL )
+
+struct MQTTFixedBuffer;
+typedef struct MQTTFixedBuffer MQTTFixedBuffer_t;
+
+struct MQTTConnectInfo;
+typedef struct MQTTConnectInfo MQTTConnectInfo_t;
+
+struct MQTTSubscribeInfo;
+typedef struct MQTTSubscribeInfo MQTTSubscribeInfo_t;
+
+struct MqttPublishInfo;
+typedef struct MqttPublishInfo MQTTPublishInfo_t;
+
+struct MQTTPacketInfo;
+typedef struct MQTTPacketInfo MQTTPacketInfo_t;
+
+/**
+ * @brief Signature of the transport interface receive function.
+ *
+ * A function with this signature must be provided to the MQTT library to read
+ * data off the network.
+ *
+ * @param[in] context The network context provided with this function.
+ * @param[out] pBuffer Buffer to receive network data.
+ * @param[in] bytesToRecv Bytes to receive from the network. pBuffer must be at
+ * least this size.
+ *
+ * @return The number of bytes received; negative value on failure.
+ */
+typedef int32_t (* MQTTTransportRecvFunc_t )( NetworkContext_t context,
+ void * pBuffer,
+ size_t bytesToRecv );
+
+/**
+ * @brief Return codes from MQTT functions.
+ */
+typedef enum MQTTStatus
+{
+ MQTTSuccess = 0, /**< Function completed successfully. */
+ MQTTBadParameter, /**< At least one parameter was invalid. */
+ MQTTNoMemory, /**< A provided buffer was too small. */
+ MQTTSendFailed, /**< The transport send function failed. */
+ MQTTRecvFailed, /**< The transport receive function failed. */
+ MQTTBadResponse, /**< An invalid packet was received from the server. */
+ MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */
+ MQTTNoDataAvailable, /**< No data available from the transport interface. */
+ MQTTIllegalState, /**< An illegal state in the state record. */
+ MQTTStateCollision, /**< A collision with an existing state record entry. */
+ MQTTKeepAliveTimeout /**< Timeout while waiting for PINGRESP. */
+} MQTTStatus_t;
+
+/**
+ * @brief MQTT Quality of Service values.
+ */
+typedef enum MQTTQoS
+{
+ MQTTQoS0 = 0, /**< Delivery at most once. */
+ MQTTQoS1 = 1, /**< Delivery at least once. */
+ MQTTQoS2 = 2 /**< Delivery exactly once. */
+} MQTTQoS_t;
+
+/**
+ * @brief Buffer passed to MQTT library.
+ *
+ * These buffers are not copied and must remain in scope for the duration of the
+ * MQTT operation.
+ */
+struct MQTTFixedBuffer
+{
+ uint8_t * pBuffer; /**< @brief Pointer to buffer. */
+ size_t size; /**< @brief Size of buffer. */
+};
+
+/**
+ * @brief MQTT CONNECT packet parameters.
+ */
+struct MQTTConnectInfo
+{
+ /**
+ * @brief Whether to establish a new, clean session or resume a previous session.
+ */
+ bool cleanSession;
+
+ /**
+ * @brief MQTT keep alive period.
+ */
+ uint16_t keepAliveSeconds;
+
+ /**
+ * @brief MQTT client identifier. Must be unique per client.
+ */
+ const char * pClientIdentifier;
+
+ /**
+ * @brief Length of the client identifier.
+ */
+ uint16_t clientIdentifierLength;
+
+ /**
+ * @brief MQTT user name. Set to NULL if not used.
+ */
+ const char * pUserName;
+
+ /**
+ * @brief Length of MQTT user name. Set to 0 if not used.
+ */
+ uint16_t userNameLength;
+
+ /**
+ * @brief MQTT password. Set to NULL if not used.
+ */
+ const char * pPassword;
+
+ /**
+ * @brief Length of MQTT password. Set to 0 if not used.
+ */
+ uint16_t passwordLength;
+};
+
+/**
+ * @brief MQTT SUBSCRIBE packet parameters.
+ */
+struct MQTTSubscribeInfo
+{
+ /**
+ * @brief Quality of Service for subscription.
+ */
+ MQTTQoS_t qos;
+
+ /**
+ * @brief Topic filter to subscribe to.
+ */
+ const char * pTopicFilter;
+
+ /**
+ * @brief Length of subscription topic filter.
+ */
+ uint16_t topicFilterLength;
+};
+
+/**
+ * @brief MQTT PUBLISH packet parameters.
+ */
+struct MqttPublishInfo
+{
+ /**
+ * @brief Quality of Service for message.
+ */
+ MQTTQoS_t qos;
+
+ /**
+ * @brief Whether this is a retained message.
+ */
+ bool retain;
+
+ /**
+ * @brief Whether this is a duplicate publish message.
+ */
+ bool dup;
+
+ /**
+ * @brief Topic name on which the message is published.
+ */
+ const char * pTopicName;
+
+ /**
+ * @brief Length of topic name.
+ */
+ uint16_t topicNameLength;
+
+ /**
+ * @brief Message payload.
+ */
+ const void * pPayload;
+
+ /**
+ * @brief Message payload length.
+ */
+ size_t payloadLength;
+};
+
+/**
+ * @brief MQTT incoming packet parameters.
+ */
+struct MQTTPacketInfo
+{
+ /**
+ * @brief Type of incoming MQTT packet.
+ */
+ uint8_t type;
+
+ /**
+ * @brief Remaining serialized data in the MQTT packet.
+ */
+ uint8_t * pRemainingData;
+
+ /**
+ * @brief Length of remaining serialized data.
+ */
+ size_t remainingLength;
+};
+
+/**
+ * @brief Get the size and Remaining Length of an MQTT CONNECT packet.
+ *
+ * @param[in] pConnectInfo MQTT CONNECT packet parameters.
+ * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT CONNECT packet.
+ * @param[out] pPacketSize The total size of the MQTT CONNECT packet.
+ *
+ * @return #MQTTBadParameter if the packet would exceed the size allowed by the
+ * MQTT spec; #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT CONNECT packet in the given buffer.
+ *
+ * @param[in] pConnectInfo MQTT CONNECT packet parameters.
+ * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Get packet size and Remaining Length of an MQTT SUBSCRIBE packet.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT SUBSCRIBE packet.
+ * @param[out] pPacketSize The total size of the MQTT SUBSCRIBE packet.
+ *
+ * @return #MQTTBadParameter if the packet would exceed the size allowed by the
+ * MQTT spec; #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT SUBSCRIBE packet in the given buffer.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetSubscribePacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Get packet size and Remaining Length of an MQTT UNSUBSCRIBE packet.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT UNSUBSCRIBE packet.
+ * @param[out] pPacketSize The total size of the MQTT UNSUBSCRIBE packet.
+ *
+ * @return #MQTTBadParameter if the packet would exceed the size allowed by the
+ * MQTT spec; #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT UNSUBSCRIBE packet in the given buffer.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetUnsubscribePacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Get the packet size and remaining length of an MQTT PUBLISH packet.
+ *
+ * @param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT PUBLISH packet.
+ * @param[out] pPacketSize The total size of the MQTT PUBLISH packet.
+ *
+ * @return #MQTTBadParameter if the packet would exceed the size allowed by the
+ * MQTT spec or if invalid parameters are passed; #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT PUBLISH packet in the given buffer.
+ *
+ * This function will serialize complete MQTT PUBLISH packet into
+ * the given buffer. If the PUBLISH payload can be sent separately,
+ * consider using #MQTT_SerializePublishHeader, which will serialize
+ * only the PUBLISH header into the buffer.
+ *
+ * @param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Serialize an MQTT PUBLISH packet header in the given buffer.
+ *
+ * This function serializes PUBLISH header in to the given buffer. Payload
+ * for PUBLISH will not be copied over to the buffer. This will help reduce
+ * the memory needed for the buffer and avoid an unwanted copy operataion of
+ * PUBLISH payload into the buffer. If payload also would need to be part of
+ * the serialized buffer, consider using #MQTT_SerializePublish.
+ *
+ * @param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @param[in] packetId packet ID generated by #MQTT_GetPacketId.
+ * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize.
+ * @param[out] pBuffer Buffer for packet serialization.
+ * @param[out] pHeaderSize Size of the serialized MQTT PUBLISH header.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer,
+ size_t * pHeaderSize );
+
+/**
+ * @brief Serialize an MQTT PUBACK, PUBREC, PUBREL, or PUBCOMP into the given
+ * buffer.
+ *
+ * @param[out] pBuffer Buffer for packet serialization.
+ * @param[in] packetType Byte of the corresponding packet fixed header per the
+ * MQTT spec.
+ * @param[in] packetId Packet ID of the publish.
+ *
+ * @return #MQTTBadParameter, #MQTTNoMemory, or #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pBuffer,
+ uint8_t packetType,
+ uint16_t packetId );
+
+/**
+ * @brief Get the size of an MQTT DISCONNECT packet.
+ *
+ * @param[out] pPacketSize The size of the MQTT DISCONNECT packet.
+ *
+ * @return Always returns #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT DISCONNECT packet into the given buffer.
+ *
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Get the size of an MQTT PINGREQ packet.
+ *
+ * @param[out] pPacketSize The size of the MQTT PINGREQ packet.
+ *
+ * @return #MQTTSuccess or #MQTTBadParameter if pPacketSize is NULL.
+ */
+MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize );
+
+/**
+ * @brief Serialize an MQTT PINGREQ packet into the given buffer.
+ *
+ * @param[out] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Deserialize an MQTT PUBLISH packet.
+ *
+ * @param[in] pIncomingPacket #MQTTPacketInfo_t containing the buffer.
+ * @param[out] pPacketId The packet ID obtained from the buffer.
+ * @param[out] pPublishInfo Struct containing information about the publish.
+ *
+ * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket,
+ uint16_t * pPacketId,
+ MQTTPublishInfo_t * pPublishInfo );
+
+/**
+ * @brief Deserialize an MQTT CONNACK, SUBACK, UNSUBACK, PUBACK, PUBREC, PUBREL,
+ * PUBCOMP, or PINGRESP.
+ *
+ * @param[in] pIncomingPacket #MQTTPacketInfo_t containing the buffer.
+ * @param[out] pPacketId The packet ID of obtained from the buffer. Not used
+ * in CONNACK or PINGRESP.
+ * @param[out] pSessionPresent Boolean flag from a CONNACK indicating present session.
+ *
+ * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket,
+ uint16_t * pPacketId,
+ bool * pSessionPresent );
+
+/**
+ * @brief Extract the MQTT packet type and length from incoming packet.
+ *
+ * @param[in] readFunc Transport layer read function pointer.
+ * @param[out] pIncomingPacket Pointer to MQTTPacketInfo_t structure. This is
+ * where type, remaining length and packet identifier are stored.
+ *
+ * @return #MQTTSuccess on successful extraction of type and length,
+ * #MQTTBadParameter if @p pIncomingPacket is invalid,
+ * #MQTTRecvFailed on transport receive failure,
+ * #MQTTBadResponse if an invalid packet is read, and
+ * #MQTTNoDataAvailable if there is nothing to read.
+ */
+MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( MQTTTransportRecvFunc_t readFunc,
+ NetworkContext_t networkContext,
+ MQTTPacketInfo_t * pIncomingPacket );
+
+#endif /* ifndef MQTT_LIGHTWEIGHT_H */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h
new file mode 100644
index 000000000..75a54de7d
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef MQTT_STATE_H
+#define MQTT_STATE_H
+
+#include "mqtt.h"
+
+#define MQTT_STATE_CURSOR_INITIALIZER ( size_t ) 0
+
+/**
+ * @brief Value indicating either send or receive.
+ */
+typedef enum MQTTStateOperation
+{
+ MQTT_SEND,
+ MQTT_RECEIVE
+} MQTTStateOperation_t;
+
+/**
+ * @brief Cursor for iterating through state records.
+ */
+typedef size_t MQTTStateCursor_t;
+
+/**
+ * @brief Reserve an entry for an outgoing QoS 1/2 publish.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in] packetId The ID of the publish packet.
+ * @param[in] qos 1 or 2.
+ *
+ * @return MQTTSuccess, MQTTNoMemory, or MQTTStateCollision.
+ */
+MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTQoS_t qos );
+
+/**
+ * @brief Calculate the new state for a publish from its qos and operation type.
+ *
+ * @param[in] opType Send or Receive.
+ * @param[in] qos 0, 1, or 2.
+ *
+ * @return The calculated state.
+ */
+MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType,
+ MQTTQoS_t qos );
+
+/**
+ * @brief Update the state record for a PUBLISH packet.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in] packetId ID of the PUBLISH packet.
+ * @param[in] opType Send or Receive.
+ * @param[in] qos 0, 1, or 2.
+ * @param[out] pNewState Updated state of the publish.
+ *
+ * @return #MQTTBadParameter, #MQTTIllegalState, #MQTTStateCollision or
+ * #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos,
+ MQTTPublishState_t * pNewState );
+
+/**
+ * @brief Calculate the state from a PUBACK, PUBREC, PUBREL, or PUBCOMP.
+ *
+ * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
+ * @param[in] opType Send or Receive.
+ * @param[in] qos 1 or 2.
+ *
+ * @return The calculated state.
+ */
+MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos );
+
+/**
+ * @brief Update the state record for an ACKed publish.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in] packetId ID of the ack packet.
+ * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
+ * @param[in] opType Send or Receive.
+ * @param[out] pNewState Updated state of the publish.
+ *
+ * @return #MQTTBadParameter, #MQTTIllegalState, or #MQTTSuccess.
+ */
+MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType,
+ MQTTPublishState_t * pNewState );
+
+/**
+ * @brief Get the packet ID of next pending PUBREL ack to be resent.
+ *
+ * This function will need to be called to get the packet for which a PUBREL
+ * need to be sent when a session is reestablished. Calling this function
+ * repeatedly until packet id is 0 will give all the packets for which
+ * a PUBREL need to be resent in the correct order.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in,out] pCursor Index at which to start searching.
+ * @param[out] pState State indicating that PUBREL packet need to be sent.
+ */
+uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext,
+ MQTTStateCursor_t * pCursor,
+ MQTTPublishState_t * pState );
+
+/**
+ * @brief Get the packet ID of next pending publish to be resent.
+ *
+ * This function will need to be called to get the packet for which a publish
+ * need to be sent when a session is reestablished. Calling this function
+ * repeatedly until packet id is 0 will give all the packets for which
+ * a publish need to be resent in the correct order.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in,out] pCursor Index at which to start searching.
+ */
+uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
+ MQTTStateCursor_t * pCursor );
+
+/**
+ * @brief State to string conversion for state engine.
+ *
+ * @param[in] state The state to convert to a string.
+ *
+ * @return The string representation of the state.
+ */
+const char * MQTT_State_strerror( MQTTPublishState_t state );
+
+#endif /* ifndef MQTT_STATE_H */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c
new file mode 100644
index 000000000..9efe1b618
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c
@@ -0,0 +1,1939 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "mqtt.h"
+#include "mqtt_state.h"
+#include "private/mqtt_internal.h"
+
+
+/**
+ * @brief The number of retries for receiving CONNACK.
+ *
+ * The MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT will be used only when the
+ * timeoutMs parameter of #MQTT_Connect() is passed as 0 . The transport
+ * receive for CONNACK will be retried MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
+ * times before timing out. A value of 0 for this config will cause the
+ * transport receive for CONNACK to be invoked only once.
+ */
+#ifndef MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
+ /* Default value for the CONNACK receive retries. */
+ #define MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ( 5U )
+#endif
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Sends provided buffer to network using transport send.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pBufferToSend Buffer to be sent to network.
+ * @brief param[in] bytesToSend Number of bytes to be sent.
+ *
+ * @return Total number of bytes sent; -1 if there is an error.
+ */
+static int32_t sendPacket( MQTTContext_t * pContext,
+ const uint8_t * pBufferToSend,
+ size_t bytesToSend );
+
+/**
+ * @brief Calculate the interval between two millisecond timestamps, including
+ * when the later value has overflowed.
+ *
+ * @note In C, the operands are promoted to signed integers in subtraction.
+ * Using this function avoids the need to cast the result of subtractions back
+ * to uint32_t.
+ *
+ * @param[in] later The later time stamp, in milliseconds.
+ * @param[in] start The earlier time stamp, in milliseconds.
+ *
+ * @return later - start.
+ */
+static uint32_t calculateElapsedTime( uint32_t later,
+ uint32_t start );
+
+/**
+ * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
+ *
+ * @param[in] packetType First byte of fixed header.
+ *
+ * @return Type of ack.
+ */
+static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
+
+/**
+ * @brief Receive bytes into the network buffer, with a timeout.
+ *
+ * @param[in] pContext Initialized MQTT Context.
+ * @param[in] bytesToRecv Number of bytes to receive.
+ * @param[in] timeoutMs Time remaining to receive the packet.
+ *
+ * @return Number of bytes received, or negative number on network error.
+ */
+static int32_t recvExact( const MQTTContext_t * pContext,
+ size_t bytesToRecv,
+ uint32_t timeoutMs );
+
+/**
+ * @brief Discard a packet from the transport interface.
+ *
+ * @param[in] PContext MQTT Connection context.
+ * @param[in] remainingLength Remaining length of the packet to dump.
+ * @param[in] timeoutMs Time remaining to discard the packet.
+ *
+ * @return #MQTTRecvFailed or #MQTTNoDataAvailable.
+ */
+static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
+ size_t remainingLength,
+ uint32_t timeoutMs );
+
+/**
+ * @brief Receive a packet from the transport interface.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] incomingPacket packet struct with remaining length.
+ * @param[in] remainingTimeMs Time remaining to receive the packet.
+ *
+ * @return #MQTTSuccess or #MQTTRecvFailed.
+ */
+static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
+ MQTTPacketInfo_t incomingPacket,
+ uint32_t remainingTimeMs );
+
+/**
+ * @brief Get the correct ack type to send.
+ *
+ * @param[in] state Current state of publish.
+ *
+ * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
+ * those should be sent, else 0.
+ */
+static uint8_t getAckTypeToSend( MQTTPublishState_t state );
+
+/**
+ * @brief Send acks for received QoS 1/2 publishes.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] packetId packet ID of original PUBLISH.
+ * @param[in] publishState Current publish state in record.
+ *
+ * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
+ */
+static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
+ uint16_t packetId,
+ MQTTPublishState_t publishState );
+
+/**
+ * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
+ *
+ * @param[in] pContext Initialized MQTT Context.
+ *
+ * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
+ * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
+ */
+static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
+
+/**
+ * @brief Handle received MQTT PUBLISH packet.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] pIncomingPacket Incoming packet.
+ *
+ * @return MQTTSuccess, MQTTIllegalState or deserialization error.
+ */
+static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket );
+
+/**
+ * @brief Handle received MQTT publish acks.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] pIncomingPacket Incoming packet.
+ *
+ * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
+ */
+static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket );
+
+/**
+ * @brief Handle received MQTT ack.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] pIncomingPacket Incoming packet.
+ * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
+ * to the application
+ *
+ * @return MQTTSuccess, MQTTIllegalState, or deserialization error.
+ */
+static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket,
+ bool manageKeepAlive );
+
+/**
+ * @brief Run a single iteration of the receive loop.
+ *
+ * @param[in] pContext MQTT Connection context.
+ * @param[in] remainingTimeMs Remaining time for the loop in milliseconds.
+ * @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
+ *
+ * @return #MQTTRecvFailed if a network error occurs during reception;
+ * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
+ * #MQTTBadResponse if an invalid packet is received;
+ * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
+ * pContext->pingRespTimeoutMs milliseconds;
+ * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
+ * invalid transition for the internal state machine;
+ * #MQTTSuccess on success.
+ */
+static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
+ uint32_t remainingTimeMs,
+ bool manageKeepAlive );
+
+/**
+ * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId Packet identifier.
+ *
+ * @return #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId );
+
+/**
+ * @brief Send serialized publish packet using transport send.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @brief param[in] headerSize Header size of the PUBLISH packet.
+ *
+ * @return #MQTTSendFailed if transport write failed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ size_t headerSize );
+
+/**
+ * @brief Receives a CONNACK MQTT packet.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] timeoutMs Timeout for waiting for CONNACK packet.
+ * @param[in] cleanSession Clean session flag set by application.
+ * @param[out] pIncomingPacket List of MQTT subscription info.
+ * @param[out] pSessionPresent Whether a previous session was present.
+ * Only relevant if not establishing a clean session.
+ *
+ * @return #MQTTBadResponse if a bad response is received;
+ * #MQTTNoDataAvailable if no data available for transport recv;
+ * ##MQTTRecvFailed if transport recv failed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
+ uint32_t timeoutMs,
+ bool cleanSession,
+ MQTTPacketInfo_t * pIncomingPacket,
+ bool * pSessionPresent );
+
+/**
+ * @brief Resends pending acks for a re-established MQTT session.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ *
+ * @return #MQTTSendFailed if transport send failed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext );
+
+/**
+ * @brief Serializes a PUBLISH message.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @brief param[in] packetId Packet Id of the publish packet.
+ * @brief param[out] pHeaderSize Size of the serialized PUBLISH header.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t * const pHeaderSize );
+
+/**
+ * @brief Function to validate #MQTT_Publish parameters.
+ *
+ * @brief param[in] pContext Initialized MQTT context.
+ * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
+ *
+ * @return #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId );
+
+/*-----------------------------------------------------------*/
+
+static int32_t sendPacket( MQTTContext_t * pContext,
+ const uint8_t * pBufferToSend,
+ size_t bytesToSend )
+{
+ const uint8_t * pIndex = pBufferToSend;
+ size_t bytesRemaining = bytesToSend;
+ int32_t totalBytesSent = 0, bytesSent;
+ uint32_t sendTime = 0U;
+
+ assert( pContext != NULL );
+ assert( pContext->callbacks.getTime != NULL );
+
+ bytesRemaining = bytesToSend;
+
+ /* Record the time of transmission. */
+ sendTime = pContext->callbacks.getTime();
+
+ /* Loop until the entire packet is sent. */
+ while( bytesRemaining > 0UL )
+ {
+ bytesSent = pContext->transportInterface.send( pContext->transportInterface.networkContext,
+ pIndex,
+ bytesRemaining );
+
+ if( bytesSent > 0 )
+ {
+ bytesRemaining -= ( size_t ) bytesSent;
+ totalBytesSent += bytesSent;
+ pIndex += bytesSent;
+ LogDebug( ( "Bytes sent=%d, bytes remaining=%ul,"
+ "total bytes sent=%d.",
+ bytesSent,
+ bytesRemaining,
+ totalBytesSent ) );
+ }
+ else
+ {
+ LogError( ( "Transport send failed." ) );
+ totalBytesSent = -1;
+ break;
+ }
+ }
+
+ /* Update time of last transmission if the entire packet is successfully sent. */
+ if( totalBytesSent > 0 )
+ {
+ pContext->lastPacketTime = sendTime;
+ LogDebug( ( "Successfully sent packet at time %u.",
+ sendTime ) );
+ }
+
+ return totalBytesSent;
+}
+
+/*-----------------------------------------------------------*/
+
+static uint32_t calculateElapsedTime( uint32_t later,
+ uint32_t start )
+{
+ return later - start;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
+{
+ MQTTPubAckType_t ackType = MQTTPuback;
+
+ switch( packetType )
+ {
+ case MQTT_PACKET_TYPE_PUBACK:
+ ackType = MQTTPuback;
+ break;
+
+ case MQTT_PACKET_TYPE_PUBREC:
+ ackType = MQTTPubrec;
+ break;
+
+ case MQTT_PACKET_TYPE_PUBREL:
+ ackType = MQTTPubrel;
+ break;
+
+ case MQTT_PACKET_TYPE_PUBCOMP:
+ default:
+
+ /* This function is only called after checking the type is one of
+ * the above four values, so packet type must be PUBCOMP here. */
+ assert( packetType == MQTT_PACKET_TYPE_PUBCOMP );
+ ackType = MQTTPubcomp;
+ break;
+ }
+
+ return ackType;
+}
+
+/*-----------------------------------------------------------*/
+
+static int32_t recvExact( const MQTTContext_t * pContext,
+ size_t bytesToRecv,
+ uint32_t timeoutMs )
+{
+ uint8_t * pIndex = NULL;
+ size_t bytesRemaining = bytesToRecv;
+ int32_t totalBytesRecvd = 0, bytesRecvd;
+ uint32_t entryTimeMs = 0U, elapsedTimeMs = 0U;
+ MQTTTransportRecvFunc_t recvFunc = NULL;
+ MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
+ bool receiveError = false;
+
+ assert( pContext != NULL );
+ assert( bytesToRecv <= pContext->networkBuffer.size );
+ assert( pContext->callbacks.getTime != NULL );
+ pIndex = pContext->networkBuffer.pBuffer;
+ recvFunc = pContext->transportInterface.recv;
+ getTimeStampMs = pContext->callbacks.getTime;
+
+ entryTimeMs = getTimeStampMs();
+
+ while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
+ {
+ bytesRecvd = recvFunc( pContext->transportInterface.networkContext,
+ pIndex,
+ bytesRemaining );
+
+ if( bytesRecvd >= 0 )
+ {
+ bytesRemaining -= ( size_t ) bytesRecvd;
+ totalBytesRecvd += ( int32_t ) bytesRecvd;
+ pIndex += bytesRecvd;
+ }
+ else
+ {
+ LogError( ( "Network error while receiving packet: ReturnCode=%d",
+ bytesRecvd ) );
+ totalBytesRecvd = bytesRecvd;
+ receiveError = true;
+ }
+
+ elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+
+ if( ( bytesRemaining > 0U ) && ( elapsedTimeMs >= timeoutMs ) )
+ {
+ LogError( ( "Time expired while receiving packet." ) );
+ receiveError = true;
+ }
+ }
+
+ return totalBytesRecvd;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
+ size_t remainingLength,
+ uint32_t timeoutMs )
+{
+ MQTTStatus_t status = MQTTRecvFailed;
+ int32_t bytesReceived = 0;
+ size_t bytesToReceive = 0U;
+ uint32_t totalBytesReceived = 0U, entryTimeMs = 0U, elapsedTimeMs = 0U;
+ uint32_t remainingTimeMs = timeoutMs;
+ MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
+ bool receiveError = false;
+
+ assert( pContext != NULL );
+ assert( pContext->callbacks.getTime != NULL );
+ bytesToReceive = pContext->networkBuffer.size;
+ getTimeStampMs = pContext->callbacks.getTime;
+
+ entryTimeMs = getTimeStampMs();
+
+ while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) )
+ {
+ if( ( remainingLength - totalBytesReceived ) < bytesToReceive )
+ {
+ bytesToReceive = remainingLength - totalBytesReceived;
+ }
+
+ bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs );
+
+ if( bytesReceived != ( int32_t ) bytesToReceive )
+ {
+ LogError( ( "Receive error while discarding packet."
+ "ReceivedBytes=%d, ExpectedBytes=%lu.",
+ bytesReceived,
+ bytesToReceive ) );
+ receiveError = true;
+ }
+ else
+ {
+ totalBytesReceived += ( uint32_t ) bytesReceived;
+
+ elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+
+ /* Update remaining time and check for timeout. */
+ if( elapsedTimeMs < timeoutMs )
+ {
+ remainingTimeMs = timeoutMs - elapsedTimeMs;
+ }
+ else
+ {
+ LogError( ( "Time expired while discarding packet." ) );
+ receiveError = true;
+ }
+ }
+ }
+
+ if( totalBytesReceived == remainingLength )
+ {
+ LogError( ( "Dumped packet. DumpedBytes=%d.",
+ totalBytesReceived ) );
+ /* Packet dumped, so no data is available. */
+ status = MQTTNoDataAvailable;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
+ MQTTPacketInfo_t incomingPacket,
+ uint32_t remainingTimeMs )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ int32_t bytesReceived = 0;
+ size_t bytesToReceive = 0U;
+
+ assert( pContext != NULL );
+
+ if( incomingPacket.remainingLength > pContext->networkBuffer.size )
+ {
+ LogError( ( "Incoming packet will be dumped: "
+ "Packet length exceeds network buffer size."
+ "PacketSize=%lu, NetworkBufferSize=%lu",
+ incomingPacket.remainingLength,
+ pContext->networkBuffer.size ) );
+ status = discardPacket( pContext,
+ incomingPacket.remainingLength,
+ remainingTimeMs );
+ }
+ else
+ {
+ bytesToReceive = incomingPacket.remainingLength;
+ bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs );
+
+ if( bytesReceived == ( int32_t ) bytesToReceive )
+ {
+ /* Receive successful, bytesReceived == bytesToReceive. */
+ LogInfo( ( "Packet received. ReceivedBytes=%d.",
+ bytesReceived ) );
+ }
+ else
+ {
+ LogError( ( "Packet reception failed. ReceivedBytes=%d, "
+ "ExpectedBytes=%lu.",
+ bytesReceived,
+ bytesToReceive ) );
+ status = MQTTRecvFailed;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static uint8_t getAckTypeToSend( MQTTPublishState_t state )
+{
+ uint8_t packetTypeByte = 0U;
+
+ switch( state )
+ {
+ case MQTTPubAckSend:
+ packetTypeByte = MQTT_PACKET_TYPE_PUBACK;
+ break;
+
+ case MQTTPubRecSend:
+ packetTypeByte = MQTT_PACKET_TYPE_PUBREC;
+ break;
+
+ case MQTTPubRelSend:
+ packetTypeByte = MQTT_PACKET_TYPE_PUBREL;
+ break;
+
+ case MQTTPubCompSend:
+ packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP;
+ break;
+
+ default:
+ /* Take no action for states that do not require sending an ack. */
+ break;
+ }
+
+ return packetTypeByte;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
+ uint16_t packetId,
+ MQTTPublishState_t publishState )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ MQTTPublishState_t newState = MQTTStateNull;
+ int32_t bytesSent = 0;
+ uint8_t packetTypeByte = 0U;
+ MQTTPubAckType_t packetType;
+
+ assert( pContext != NULL );
+
+ packetTypeByte = getAckTypeToSend( publishState );
+
+ if( packetTypeByte != 0U )
+ {
+ packetType = getAckFromPacketType( packetTypeByte );
+
+ status = MQTT_SerializeAck( &( pContext->networkBuffer ),
+ packetTypeByte,
+ packetId );
+
+ if( status == MQTTSuccess )
+ {
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ MQTT_PUBLISH_ACK_PACKET_SIZE );
+ }
+
+ if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE )
+ {
+ pContext->controlPacketSent = true;
+ status = MQTT_UpdateStateAck( pContext,
+ packetId,
+ packetType,
+ MQTT_SEND,
+ &newState );
+
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Failed to update state of publish %u.", packetId ) );
+ }
+ }
+ else
+ {
+ LogError( ( "Failed to send ACK packet: PacketType=%02x, "
+ "SentBytes=%d, "
+ "PacketSize=%lu.",
+ packetTypeByte,
+ bytesSent,
+ MQTT_PUBLISH_ACK_PACKET_SIZE ) );
+ status = MQTTSendFailed;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ uint32_t now = 0U, keepAliveMs = 0U;
+
+ assert( pContext != NULL );
+ now = pContext->callbacks.getTime();
+ keepAliveMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
+
+ /* If keep alive interval is 0, it is disabled. */
+ if( ( keepAliveMs != 0U ) &&
+ ( calculateElapsedTime( now, pContext->lastPacketTime ) > keepAliveMs ) )
+ {
+ if( pContext->waitingForPingResp == true )
+ {
+ /* Has time expired? */
+ if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) >
+ pContext->pingRespTimeoutMs )
+ {
+ status = MQTTKeepAliveTimeout;
+ }
+ }
+ else
+ {
+ status = MQTT_Ping( pContext );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket )
+{
+ MQTTStatus_t status = MQTTBadParameter;
+ MQTTPublishState_t publishRecordState = MQTTStateNull;
+ uint16_t packetIdentifier = 0U;
+ MQTTPublishInfo_t publishInfo;
+ bool duplicatePublish = false;
+
+ assert( pContext != NULL );
+ assert( pIncomingPacket != NULL );
+
+ status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
+ LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%d", status ) );
+
+ if( status == MQTTSuccess )
+ {
+ status = MQTT_UpdateStatePublish( pContext,
+ packetIdentifier,
+ MQTT_RECEIVE,
+ publishInfo.qos,
+ &publishRecordState );
+
+ if( status == MQTTSuccess )
+ {
+ LogInfo( ( "State record updated. New state=%s.",
+ MQTT_State_strerror( publishRecordState ) ) );
+ }
+
+ /* Different cases in which an incoming publish with duplicate flag is
+ * handled are as listed below.
+ * 1. No collision - This is the first instance of the incoming publish
+ * packet received or an earlier received packet state is lost. This
+ * will be handled as a new incoming publish for both QoS1 and QoS2
+ * publishes.
+ * 2. Collision - The incoming packet was received before and a state
+ * record is present in the state engine. For QoS1 and QoS2 publishes
+ * this case can happen at 2 different cases and handling is
+ * different.
+ * a. QoS1 - If a PUBACK is not successfully sent for the incoming
+ * publish due to a connection issue, it can result in broker
+ * sending out a duplicate publish with dup flag set, when a
+ * session is reestablished. It can result in a collision in
+ * state engine. This will be handled by processing the incoming
+ * publish as a new publish ignoring the
+ * #MQTTStateCollision status from the state engine. The publish
+ * data is not passed to the application.
+ * b. QoS2 - If a PUBREC is not successfully sent for the incoming
+ * publish or the PUBREC sent is not successfully received by the
+ * broker due to a connection issue, it can result in broker
+ * sending out a duplicate publish with dup flag set, when a
+ * session is reestablished. It can result in a collision in
+ * state engine. This will be handled by ignoring the
+ * #MQTTStateCollision status from the state engine. The publish
+ * data is not passed to the application. */
+ else if( ( status == MQTTStateCollision ) && ( publishInfo.dup == true ) )
+ {
+ status = MQTTSuccess;
+ duplicatePublish = true;
+
+ /* Calculate the state for the ack packet that needs to be sent out
+ * for the duplicate incoming publish. */
+ publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
+ publishInfo.qos );
+ LogDebug( ( "Incoming publish packet with packet id %u already exists.",
+ packetIdentifier ) );
+ }
+ else
+ {
+ LogError( ( "Error in updating publish state for incoming publish with packet id %u."
+ " Error is %s",
+ packetIdentifier,
+ MQTT_Status_strerror( status ) ) );
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Invoke application callback to hand the buffer over to application
+ * before sending acks.
+ * Application callback will be invoked for all publishes, except for
+ * duplicate incoming publishes. */
+ if( duplicatePublish == false )
+ {
+ pContext->callbacks.appCallback( pContext,
+ pIncomingPacket,
+ packetIdentifier,
+ &publishInfo );
+ }
+
+ /* Send PUBACK or PUBREC if necessary. */
+ status = sendPublishAcks( pContext,
+ packetIdentifier,
+ publishRecordState );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket )
+{
+ MQTTStatus_t status = MQTTBadResponse;
+ MQTTPublishState_t publishRecordState = MQTTStateNull;
+ uint16_t packetIdentifier;
+ MQTTPubAckType_t ackType;
+ MQTTEventCallback_t appCallback;
+
+ assert( pContext != NULL );
+ assert( pIncomingPacket != NULL );
+ assert( pContext->callbacks.appCallback != NULL );
+
+ appCallback = pContext->callbacks.appCallback;
+
+ ackType = getAckFromPacketType( pIncomingPacket->type );
+ status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
+ LogInfo( ( "Ack packet deserialized with result: %s.",
+ MQTT_Status_strerror( status ) ) );
+
+ if( status == MQTTSuccess )
+ {
+ status = MQTT_UpdateStateAck( pContext,
+ packetIdentifier,
+ ackType,
+ MQTT_RECEIVE,
+ &publishRecordState );
+
+ if( status == MQTTSuccess )
+ {
+ LogInfo( ( "State record updated. New state=%s.",
+ MQTT_State_strerror( publishRecordState ) ) );
+ }
+ else
+ {
+ LogError( ( "Updating the state engine for packet id %u"
+ " failed with error %s.",
+ packetIdentifier,
+ MQTT_Status_strerror( status ) ) );
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Invoke application callback to hand the buffer over to application
+ * before sending acks. */
+ appCallback( pContext, pIncomingPacket, packetIdentifier, NULL );
+
+ /* Send PUBREL or PUBCOMP if necessary. */
+ status = sendPublishAcks( pContext,
+ packetIdentifier,
+ publishRecordState );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
+ MQTTPacketInfo_t * pIncomingPacket,
+ bool manageKeepAlive )
+{
+ MQTTStatus_t status = MQTTBadResponse;
+ uint16_t packetIdentifier;
+ /* Need a dummy variable for MQTT_DeserializeAck(). */
+ bool sessionPresent = false;
+
+ /* We should always invoke the app callback unless we receive a PINGRESP
+ * and are managing keep alive, or if we receive an unknown packet. We
+ * initialize this to false since the callback must be invoked before
+ * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
+ * at the end to reduce the complexity of this function. */
+ bool invokeAppCallback = false;
+ MQTTEventCallback_t appCallback = NULL;
+
+ assert( pContext != NULL );
+ assert( pIncomingPacket != NULL );
+
+ appCallback = pContext->callbacks.appCallback;
+
+ switch( pIncomingPacket->type )
+ {
+ case MQTT_PACKET_TYPE_PUBACK:
+ case MQTT_PACKET_TYPE_PUBREC:
+ case MQTT_PACKET_TYPE_PUBREL:
+ case MQTT_PACKET_TYPE_PUBCOMP:
+
+ /* Handle all the publish acks. */
+ status = handlePublishAcks( pContext, pIncomingPacket );
+
+ break;
+
+ case MQTT_PACKET_TYPE_PINGRESP:
+ status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, &sessionPresent );
+ invokeAppCallback = ( manageKeepAlive == true ) ? false : true;
+
+ if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
+ {
+ pContext->waitingForPingResp = false;
+ }
+
+ break;
+
+ case MQTT_PACKET_TYPE_SUBACK:
+ case MQTT_PACKET_TYPE_UNSUBACK:
+ /* Deserialize and give these to the app provided callback. */
+ status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, &sessionPresent );
+ invokeAppCallback = true;
+ break;
+
+ default:
+ /* Bad response from the server. */
+ LogError( ( "Unexpected packet type from server: PacketType=%02x.",
+ pIncomingPacket->type ) );
+ status = MQTTBadResponse;
+ break;
+ }
+
+ if( ( status == MQTTSuccess ) && ( invokeAppCallback == true ) )
+ {
+ appCallback( pContext, pIncomingPacket, packetIdentifier, NULL );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
+ uint32_t remainingTimeMs,
+ bool manageKeepAlive )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ MQTTPacketInfo_t incomingPacket;
+
+ assert( pContext != NULL );
+
+ status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
+ pContext->transportInterface.networkContext,
+ &incomingPacket );
+
+ if( status == MQTTNoDataAvailable )
+ {
+ if( manageKeepAlive == true )
+ {
+ /* Assign status so an error can be bubbled up to application,
+ * but reset it on success. */
+ status = handleKeepAlive( pContext );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Reset the status to indicate that we should not try to read
+ * a packet from the transport interface. */
+ status = MQTTNoDataAvailable;
+ }
+ }
+ else if( status != MQTTSuccess )
+ {
+ LogError( ( "Receiving incoming packet length failed. Status=%s",
+ MQTT_Status_strerror( status ) ) );
+ }
+ else
+ {
+ /* Receive packet. Remaining time is recalculated before calling this
+ * function. */
+ status = receivePacket( pContext, incomingPacket, remainingTimeMs );
+ }
+
+ /* Handle received packet. If no data was read then this will not execute. */
+ if( status == MQTTSuccess )
+ {
+ incomingPacket.pRemainingData = pContext->networkBuffer.pBuffer;
+
+ /* PUBLISH packets allow flags in the lower four bits. For other
+ * packet types, they are reserved. */
+ if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
+ {
+ status = handleIncomingPublish( pContext, &incomingPacket );
+ }
+ else
+ {
+ status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive );
+ }
+ }
+
+ if( status == MQTTNoDataAvailable )
+ {
+ /* No data available is not an error. Reset to MQTTSuccess so the
+ * return code will indicate success. */
+ status = MQTTSuccess;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate all the parameters. */
+ if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pContext=%p, "
+ "pSubscriptionList=%p.",
+ pContext,
+ pSubscriptionList ) );
+ status = MQTTBadParameter;
+ }
+ else if( subscriptionCount == 0UL )
+ {
+ LogError( ( "Subscription count is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else if( packetId == 0U )
+ {
+ LogError( ( "Packet Id for subscription packet is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ size_t headerSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ int32_t bytesSent = 0;
+
+ assert( pContext != NULL );
+ assert( pPublishInfo != NULL );
+ assert( headerSize > 0 );
+
+ /* Send header first. */
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ headerSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for PUBLISH header." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of PUBLISH header.",
+ bytesSent ) );
+
+ /* Send Payload. */
+ bytesSent = sendPacket( pContext,
+ pPublishInfo->pPayload,
+ pPublishInfo->payloadLength );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for PUBLISH payload." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of PUBLISH payload.",
+ bytesSent ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
+ uint32_t timeoutMs,
+ bool cleanSession,
+ MQTTPacketInfo_t * pIncomingPacket,
+ bool * pSessionPresent )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ MQTTGetCurrentTimeFunc_t getTimeStamp = NULL;
+ uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U;
+ bool breakFromLoop = false;
+ uint16_t loopCount = 0U;
+
+ assert( pContext != NULL );
+ assert( pIncomingPacket != NULL );
+ assert( pContext->callbacks.getTime != NULL );
+
+ getTimeStamp = pContext->callbacks.getTime;
+
+ /* Get the entry time for the function. */
+ entryTimeMs = getTimeStamp();
+
+ do
+ {
+ /* Transport read for incoming CONNACK packet type and length.
+ * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is
+ * returned after a transport receive timeout, an error, or a successful
+ * receive of packet type and length. */
+ status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
+ pContext->transportInterface.networkContext,
+ pIncomingPacket );
+
+ /* The loop times out based on 2 conditions.
+ * 1. If timeoutMs is greater than 0:
+ * Loop times out based on the timeout calculated by getTime()
+ * function.
+ * 2. If timeoutMs is 0:
+ * Loop times out based on the maximum number of retries config
+ * MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control
+ * maximum the number of retry attempts to read the CONNACK packet.
+ * A value of 0 for the config will try once to read CONNACK. */
+ if( timeoutMs > 0U )
+ {
+ breakFromLoop = ( calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs ) ? true : false;
+ }
+ else
+ {
+ breakFromLoop = ( loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ) ? true : false;
+ loopCount++;
+ }
+
+ /* Loop until there is data to read or if we have exceeded the timeout/retries. */
+ } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) );
+
+ if( status == MQTTSuccess )
+ {
+ /* Time taken in this function so far. */
+ timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs );
+
+ if( timeTakenMs < timeoutMs )
+ {
+ /* Calculate remaining time for receiving the remainder of
+ * the packet. */
+ remainingTimeMs = timeoutMs - timeTakenMs;
+ }
+
+ /* Reading the remainder of the packet by transport recv.
+ * Attempt to read once even if the timeout has expired.
+ * Invoking receivePacket with remainingTime as 0 would attempt to
+ * recv from network once. If using retries, the remainder of the
+ * CONNACK packet is tried to be read only once. Reading once would be
+ * good as the packet type and remaining length was already read. Hence,
+ * the probability of the remaining 2 bytes available to read is very high. */
+ if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK )
+ {
+ status = receivePacket( pContext,
+ *pIncomingPacket,
+ remainingTimeMs );
+ }
+ else
+ {
+ LogError( ( "Incorrect packet type %X received while expecting"
+ " CONNACK(%X).",
+ pIncomingPacket->type,
+ MQTT_PACKET_TYPE_CONNACK ) );
+ status = MQTTBadResponse;
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Update the packet info pointer to the buffer read. */
+ pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer;
+
+ /* Deserialize CONNACK. */
+ status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent );
+ }
+
+ /* If a clean session is requested, a session present should not be set by
+ * broker. */
+ if( status == MQTTSuccess )
+ {
+ if( ( cleanSession == true ) && ( *pSessionPresent == true ) )
+ {
+ LogError( ( "Unexpected session present flag in CONNACK response from broker."
+ " CONNECT request with clean session was made with broker." ) );
+ status = MQTTBadResponse;
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ LogInfo( ( "Received MQTT CONNACK successfully from broker." ) );
+ }
+ else
+ {
+ LogError( ( "CONNACK recv failed with status = %s.",
+ MQTT_Status_strerror( status ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
+ uint16_t packetId = MQTT_PACKET_ID_INVALID;
+ MQTTPublishState_t state = MQTTStateNull;
+
+ assert( pContext != NULL );
+
+ /* Get the next packet Id for which a PUBREL need to be resent. */
+ packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
+
+ /* Resend all the PUBREL acks after session is reestablished. */
+ while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
+ ( status == MQTTSuccess ) )
+ {
+ status = sendPublishAcks( pContext, packetId, state );
+
+ packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t * const pHeaderSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ size_t remainingLength = 0UL, packetSize = 0UL;
+
+ assert( pContext != NULL );
+ assert( pPublishInfo != NULL );
+ assert( pHeaderSize != NULL );
+
+ /* Get the remaining length and packet size.*/
+ status = MQTT_GetPublishPacketSize( pPublishInfo,
+ &remainingLength,
+ &packetSize );
+ LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.",
+ packetSize,
+ remainingLength ) );
+
+ if( status == MQTTSuccess )
+ {
+ status = MQTT_SerializePublishHeader( pPublishInfo,
+ packetId,
+ remainingLength,
+ &( pContext->networkBuffer ),
+ pHeaderSize );
+ LogDebug( ( "Serialized PUBLISH header size is %lu.",
+ *pHeaderSize ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate arguments. */
+ if( ( pContext == NULL ) || ( pPublishInfo == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pContext=%p, "
+ "pPublishInfo=%p.",
+ pContext,
+ pPublishInfo ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
+ {
+ LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
+ pPublishInfo->qos ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
+ const MQTTTransportInterface_t * pTransportInterface,
+ const MQTTApplicationCallbacks_t * pCallbacks,
+ const MQTTFixedBuffer_t * pNetworkBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate arguments. */
+ if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
+ ( pCallbacks == NULL ) || ( pNetworkBuffer == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pContext=%p, "
+ "pTransportInterface=%p, "
+ "pCallbacks=%p, "
+ "pNetworkBuffer=%p.",
+ pContext,
+ pTransportInterface,
+ pCallbacks,
+ pNetworkBuffer ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pCallbacks->getTime == NULL ) || ( pCallbacks->appCallback == NULL ) ||
+ ( pTransportInterface->recv == NULL ) || ( pTransportInterface->send == NULL ) )
+ {
+ LogError( ( "Functions cannot be NULL: getTime=%p, appCallback=%p, recv=%p, send=%p.",
+ pCallbacks->getTime,
+ pCallbacks->appCallback,
+ pTransportInterface->recv,
+ pTransportInterface->send ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) );
+
+ pContext->connectStatus = MQTTNotConnected;
+ pContext->transportInterface = *pTransportInterface;
+ pContext->callbacks = *pCallbacks;
+ pContext->networkBuffer = *pNetworkBuffer;
+
+ /* Zero is not a valid packet ID per MQTT spec. Start from 1. */
+ pContext->nextPacketId = 1;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,
+ const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ uint32_t timeoutMs,
+ bool * pSessionPresent )
+{
+ size_t remainingLength = 0UL, packetSize = 0UL;
+ int32_t bytesSent;
+ MQTTStatus_t status = MQTTSuccess;
+ MQTTPacketInfo_t incomingPacket = { 0 };
+
+ incomingPacket.type = ( uint8_t ) 0;
+
+ if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pContext=%p, "
+ "pConnectInfo=%p, pSessionPresent=%p.",
+ pContext,
+ pConnectInfo,
+ pSessionPresent ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Get MQTT connect packet size and remaining length. */
+ status = MQTT_GetConnectPacketSize( pConnectInfo,
+ pWillInfo,
+ &remainingLength,
+ &packetSize );
+ LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.",
+ packetSize,
+ remainingLength ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ status = MQTT_SerializeConnect( pConnectInfo,
+ pWillInfo,
+ remainingLength,
+ &( pContext->networkBuffer ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ packetSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for CONNECT packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of CONNECT packet.",
+ bytesSent ) );
+ }
+ }
+
+ /* Read CONNACK from transport layer. */
+ if( status == MQTTSuccess )
+ {
+ status = receiveConnack( pContext,
+ timeoutMs,
+ pConnectInfo->cleanSession,
+ &incomingPacket,
+ pSessionPresent );
+ }
+
+ /* Resend all the PUBREL when reestablishing a session. */
+ if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
+ {
+ status = resendPendingAcks( pContext );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ LogInfo( ( "MQTT connection established with the broker." ) );
+ pContext->connectStatus = MQTTConnected;
+ }
+ else
+ {
+ LogError( ( "MQTT connection failed with status = %s.",
+ MQTT_Status_strerror( status ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId )
+{
+ size_t remainingLength = 0UL, packetSize = 0UL;
+ int32_t bytesSent = 0;
+
+ /* Validate arguments. */
+ MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId );
+
+ if( status == MQTTSuccess )
+ {
+ /* Get the remaining length and packet size.*/
+ status = MQTT_GetSubscribePacketSize( pSubscriptionList,
+ subscriptionCount,
+ &remainingLength,
+ &packetSize );
+ LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.",
+ packetSize,
+ remainingLength ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Serialize MQTT SUBSCRIBE packet. */
+ status = MQTT_SerializeSubscribe( pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength,
+ &( pContext->networkBuffer ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Send serialized MQTT SUBSCRIBE packet to transport layer. */
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ packetSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for SUBSCRIBE packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of SUBSCRIBE packet.",
+ bytesSent ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
+ const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId )
+{
+ size_t headerSize = 0UL;
+ MQTTPublishState_t publishStatus = MQTTStateNull;
+
+ /* Validate arguments. */
+ MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
+
+ if( status == MQTTSuccess )
+ {
+ /* Serialize PUBLISH packet. */
+ status = serializePublish( pContext,
+ pPublishInfo,
+ packetId,
+ &headerSize );
+ }
+
+ if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
+ {
+ /* Reserve state for publish message. Only to be done for QoS1 or QoS2. */
+ status = MQTT_ReserveState( pContext,
+ packetId,
+ pPublishInfo->qos );
+
+ /* State already exists for a duplicate packet.
+ * If a state doesn't exist, it will be handled as a new publish in
+ * state engine. */
+ if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
+ {
+ status = MQTTSuccess;
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Sends the serialized publish packet over network. */
+ status = sendPublish( pContext,
+ pPublishInfo,
+ headerSize );
+ }
+
+ if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
+ {
+ /* Update state machine after PUBLISH is sent.
+ * Only to be done for QoS1 or QoS2. */
+ status = MQTT_UpdateStatePublish( pContext,
+ packetId,
+ MQTT_SEND,
+ pPublishInfo->qos,
+ &publishStatus );
+
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Update state for publish failed with status %s."
+ " However PUBLISH packet was sent to the broker."
+ " Any further handling of ACKs for the packet Id"
+ " will fail.",
+ MQTT_Status_strerror( status ) ) );
+ }
+ }
+
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "MQTT PUBLISH failed with status %s.",
+ MQTT_Status_strerror( status ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
+{
+ int32_t bytesSent = 0;
+ MQTTStatus_t status = MQTTSuccess;
+ size_t packetSize;
+
+ if( pContext == NULL )
+ {
+ LogError( ( "pContext is NULL." ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Get MQTT PINGREQ packet size. */
+ status = MQTT_GetPingreqPacketSize( &packetSize );
+
+ if( status == MQTTSuccess )
+ {
+ LogDebug( ( "MQTT PINGREQ packet size is %lu.",
+ packetSize ) );
+ }
+ else
+ {
+ LogError( ( "Failed to get the PINGREQ packet size." ) );
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Serialize MQTT PINGREQ. */
+ status = MQTT_SerializePingreq( &( pContext->networkBuffer ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Send the serialized PINGREQ packet to transport layer. */
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ packetSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for PINGREQ packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ pContext->pingReqSendTimeMs = pContext->lastPacketTime;
+ pContext->waitingForPingResp = true;
+ LogDebug( ( "Sent %d bytes of PINGREQ packet.",
+ bytesSent ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
+ const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId )
+{
+ size_t remainingLength = 0UL, packetSize = 0UL;
+ int32_t bytesSent = 0;
+
+ /* Validate arguments. */
+ MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
+ pSubscriptionList,
+ subscriptionCount,
+ packetId );
+
+ if( status == MQTTSuccess )
+ {
+ /* Get the remaining length and packet size.*/
+ status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
+ subscriptionCount,
+ &remainingLength,
+ &packetSize );
+ LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
+ packetSize,
+ remainingLength ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Serialize MQTT UNSUBSCRIBE packet. */
+ status = MQTT_SerializeUnsubscribe( pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength,
+ &( pContext->networkBuffer ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Send serialized MQTT UNSUBSCRIBE packet to transport layer. */
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ packetSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for UNSUBSCRIBE packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of UNSUBSCRIBE packet.",
+ bytesSent ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext )
+{
+ size_t packetSize;
+ int32_t bytesSent;
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate arguments. */
+ if( pContext == NULL )
+ {
+ LogError( ( "pContext cannot be NULL." ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Get MQTT DISCONNECT packet size. */
+ status = MQTT_GetDisconnectPacketSize( &packetSize );
+ LogDebug( ( "MQTT DISCONNECT packet size is %lu.",
+ packetSize ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Serialize MQTT DISCONNECT packet. */
+ status = MQTT_SerializeDisconnect( &( pContext->networkBuffer ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ bytesSent = sendPacket( pContext,
+ pContext->networkBuffer.pBuffer,
+ packetSize );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for DISCONNECT packet." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of DISCONNECT packet.",
+ bytesSent ) );
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ LogInfo( ( "Disconnected from the broker." ) );
+ pContext->connectStatus = MQTTNotConnected;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
+ uint32_t timeoutMs )
+{
+ MQTTStatus_t status = MQTTBadParameter;
+ MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
+ uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;
+
+ if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) )
+ {
+ getTimeStampMs = pContext->callbacks.getTime;
+ entryTimeMs = getTimeStampMs();
+ status = MQTTSuccess;
+ pContext->controlPacketSent = false;
+ }
+ else if( pContext == NULL )
+ {
+ LogError( ( "MQTT Context cannot be NULL." ) );
+ }
+ else
+ {
+ LogError( ( "MQTT Context must set callbacks.getTime." ) );
+ }
+
+ while( status == MQTTSuccess )
+ {
+ status = receiveSingleIteration( pContext, remainingTimeMs, true );
+
+ /* We don't need to break here since the status is already checked in
+ * the loop condition, and we do not want multiple breaks in a loop. */
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Exiting process loop. Error status=%s",
+ MQTT_Status_strerror( status ) ) );
+ }
+ else
+ {
+ /* Recalculate remaining time and check if loop should exit. This is
+ * done at the end so the loop will run at least a single iteration. */
+ elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+
+ if( elapsedTimeMs > timeoutMs )
+ {
+ break;
+ }
+
+ remainingTimeMs = timeoutMs - elapsedTimeMs;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
+ uint32_t timeoutMs )
+{
+ MQTTStatus_t status = MQTTBadParameter;
+ MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
+ uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;
+
+ if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) )
+ {
+ getTimeStampMs = pContext->callbacks.getTime;
+ entryTimeMs = getTimeStampMs();
+ status = MQTTSuccess;
+ }
+ else if( pContext == NULL )
+ {
+ LogError( ( "MQTT Context cannot be NULL." ) );
+ }
+ else
+ {
+ LogError( ( "MQTT Context must set callbacks.getTime." ) );
+ }
+
+ while( status == MQTTSuccess )
+ {
+ status = receiveSingleIteration( pContext, remainingTimeMs, false );
+
+ /* We don't need to break here since the status is already checked in
+ * the loop condition, and we do not want multiple breaks in a loop. */
+ if( status != MQTTSuccess )
+ {
+ LogError( ( "Exiting receive loop. Error status=%s",
+ MQTT_Status_strerror( status ) ) );
+ }
+ else
+ {
+ /* Recalculate remaining time and check if loop should exit. This is
+ * done at the end so the loop will run at least a single iteration. */
+ elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+
+ if( elapsedTimeMs >= timeoutMs )
+ {
+ break;
+ }
+
+ remainingTimeMs = timeoutMs - elapsedTimeMs;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+uint16_t MQTT_GetPacketId( MQTTContext_t * pContext )
+{
+ uint16_t packetId = 0U;
+
+ if( pContext != NULL )
+ {
+ packetId = pContext->nextPacketId;
+
+ if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX )
+ {
+ pContext->nextPacketId = 1;
+ }
+ else
+ {
+ pContext->nextPacketId++;
+ }
+ }
+
+ return packetId;
+}
+
+/*-----------------------------------------------------------*/
+
+const char * MQTT_Status_strerror( MQTTStatus_t status )
+{
+ const char * str = NULL;
+
+ switch( status )
+ {
+ case MQTTSuccess:
+ str = "MQTTSuccess";
+ break;
+
+ case MQTTBadParameter:
+ str = "MQTTBadParameter";
+ break;
+
+ case MQTTNoMemory:
+ str = "MQTTNoMemory";
+ break;
+
+ case MQTTSendFailed:
+ str = "MQTTSendFailed";
+ break;
+
+ case MQTTRecvFailed:
+ str = "MQTTRecvFailed";
+ break;
+
+ case MQTTBadResponse:
+ str = "MQTTBadResponse";
+ break;
+
+ case MQTTServerRefused:
+ str = "MQTTServerRefused";
+ break;
+
+ case MQTTNoDataAvailable:
+ str = "MQTTNoDataAvailable";
+ break;
+
+ case MQTTIllegalState:
+ str = "MQTTIllegalState";
+ break;
+
+ case MQTTStateCollision:
+ str = "MQTTStateCollision";
+ break;
+
+ case MQTTKeepAliveTimeout:
+ str = "MQTTKeepAliveTimeout";
+ break;
+
+ default:
+ str = "Invalid MQTT Status code";
+ break;
+ }
+
+ return str;
+}
+
+/*-----------------------------------------------------------*/
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c
new file mode 100644
index 000000000..302dd6f1b
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c
@@ -0,0 +1,2094 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <string.h>
+#include <assert.h>
+
+#include "mqtt_lightweight.h"
+#include "private/mqtt_internal.h"
+
+/**
+ * @brief MQTT protocol version 3.1.1.
+ */
+#define MQTT_VERSION_3_1_1 ( ( uint8_t ) 4U )
+
+/**
+ * @brief Size of the fixed and variable header of a CONNECT packet.
+ */
+#define MQTT_PACKET_CONNECT_HEADER_SIZE ( 10UL )
+
+/**
+ * @brief Maximum size of an MQTT CONNECT packet, per MQTT spec.
+ */
+#define MQTT_PACKET_CONNECT_MAX_SIZE ( 327700UL )
+
+/* MQTT CONNECT flags. */
+#define MQTT_CONNECT_FLAG_CLEAN ( 1 ) /**< @brief Clean session. */
+#define MQTT_CONNECT_FLAG_WILL ( 2 ) /**< @brief Will present. */
+#define MQTT_CONNECT_FLAG_WILL_QOS1 ( 3 ) /**< @brief Will QoS 1. */
+#define MQTT_CONNECT_FLAG_WILL_QOS2 ( 4 ) /**< @brief Will QoS 2. */
+#define MQTT_CONNECT_FLAG_WILL_RETAIN ( 5 ) /**< @brief Will retain. */
+#define MQTT_CONNECT_FLAG_PASSWORD ( 6 ) /**< @brief Password present. */
+#define MQTT_CONNECT_FLAG_USERNAME ( 7 ) /**< @brief User name present. */
+
+/*
+ * Positions of each flag in the first byte of an MQTT PUBLISH packet's
+ * fixed header.
+ */
+#define MQTT_PUBLISH_FLAG_RETAIN ( 0 ) /**< @brief MQTT PUBLISH retain flag. */
+#define MQTT_PUBLISH_FLAG_QOS1 ( 1 ) /**< @brief MQTT PUBLISH QoS1 flag. */
+#define MQTT_PUBLISH_FLAG_QOS2 ( 2 ) /**< @brief MQTT PUBLISH QoS2 flag. */
+#define MQTT_PUBLISH_FLAG_DUP ( 3 ) /**< @brief MQTT PUBLISH duplicate flag. */
+
+/**
+ * @brief The size of MQTT DISCONNECT packets, per MQTT spec.
+ */
+#define MQTT_DISCONNECT_PACKET_SIZE ( 2UL )
+
+/*
+ * @brief A PINGREQ packet is always 2 bytes in size, defined by MQTT 3.1.1 spec.
+ */
+#define MQTT_PACKET_PINGREQ_SIZE ( 2U )
+
+/**
+ * @brief The Remaining Length field of MQTT disconnect packets, per MQTT spec.
+ */
+#define MQTT_DISCONNECT_REMAINING_LENGTH ( ( uint8_t ) 0 )
+
+/*
+ * Constants relating to CONNACK packets, defined by MQTT 3.1.1 spec.
+ */
+#define MQTT_PACKET_CONNACK_REMAINING_LENGTH ( ( uint8_t ) 2U ) /**< @brief A CONNACK packet always has a "Remaining length" of 2. */
+#define MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ( ( uint8_t ) 0x01U ) /**< @brief The "Session Present" bit is always the lowest bit. */
+
+/*
+ * UNSUBACK, PUBACK, PUBREC, PUBREL, and PUBCOMP always have a remaining length
+ * of 2.
+ */
+#define MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ( ( uint8_t ) 2 ) /**< @brief PUBACK, PUBREC, PUBREl, PUBCOMP, UNSUBACK Remaining length. */
+#define MQTT_PACKET_PINGRESP_REMAINING_LENGTH ( 0U ) /**< @brief A PINGRESP packet always has a "Remaining length" of 0. */
+
+/**
+ * @brief Per the MQTT 3.1.1 spec, the largest "Remaining Length" of an MQTT
+ * packet is this value.
+ */
+#define MQTT_MAX_REMAINING_LENGTH ( 268435455UL )
+
+/**
+ * @brief Set a bit in an 8-bit unsigned integer.
+ */
+#define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) )
+
+/**
+ * @brief Macro for checking if a bit is set in a 1-byte unsigned int.
+ *
+ * @param[in] x The unsigned int to check.
+ * @param[in] position Which bit to check.
+ */
+#define UINT8_CHECK_BIT( x, position ) ( ( ( x ) & ( 0x01U << ( position ) ) ) == ( 0x01U << ( position ) ) )
+
+/**
+ * @brief Get the high byte of a 16-bit unsigned integer.
+ */
+#define UINT16_HIGH_BYTE( x ) ( ( uint8_t ) ( ( x ) >> 8 ) )
+
+/**
+ * @brief Get the low byte of a 16-bit unsigned integer.
+ */
+#define UINT16_LOW_BYTE( x ) ( ( uint8_t ) ( ( x ) & 0x00ffU ) )
+
+/**
+ * @brief Macro for decoding a 2-byte unsigned int from a sequence of bytes.
+ *
+ * @param[in] ptr A uint8_t* that points to the high byte.
+ */
+#define UINT16_DECODE( ptr ) \
+ ( uint16_t ) ( ( ( ( uint16_t ) ( *( ptr ) ) ) << 8 ) | \
+ ( ( uint16_t ) ( *( ( ptr ) + 1 ) ) ) )
+
+/**
+ * @brief A value that represents an invalid remaining length.
+ *
+ * This value is greater than what is allowed by the MQTT specification.
+ */
+#define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 )
+
+/**
+ * @brief The minimum remaining length for a QoS 0 PUBLISH.
+ *
+ * Includes two bytes for topic name length and one byte for topic name.
+ */
+#define MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 ( 3U )
+
+/*-----------------------------------------------------------*/
+
+/* MQTT Subscription packet types. */
+typedef enum MQTTSubscriptionType
+{
+ MQTT_SUBSCRIBE,
+ MQTT_UNSUBSCRIBE
+} MQTTSubscriptionType_t;
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Serializes MQTT PUBLISH packet into the buffer provided.
+ *
+ * This function serializes MQTT PUBLISH packet into #pFixedBuffer.pBuffer.
+ * Copy of the payload into the buffer is done as part of the serialization
+ * only if #serializePayload is true.
+ *
+ * @brief param[in] pPublishInfo Publish information.
+ * @brief param[in] remainingLength Remaining length of the PUBLISH packet.
+ * @brief param[in] packetIdentifier Packet identifier of PUBLISH packet.
+ * @brief param[in, out] pFixedBuffer Buffer to which PUBLISH packet will be
+ * serialized.
+ * @brief param[in] serializePayload Copy payload to the serialized buffer
+ * only if true. Only PUBLISH header will be serialized if false.
+ *
+ * @return Total number of bytes sent; -1 if there is an error.
+ */
+static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo,
+ size_t remainingLength,
+ uint16_t packetIdentifier,
+ const MQTTFixedBuffer_t * pFixedBuffer,
+ bool serializePayload );
+
+/**
+ * @brief Calculates the packet size and remaining length of an MQTT
+ * PUBLISH packet.
+ *
+ * @param[in] pPublishInfo MQTT PUBLISH packet parameters.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT PUBLISH packet.
+ * @param[out] pPacketSize The total size of the MQTT PUBLISH packet.
+ *
+ * @return false if the packet would exceed the size allowed by the
+ * MQTT spec; true otherwise.
+ */
+static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize );
+
+/**
+ * @brief Calculates the packet size and remaining length of an MQTT
+ * SUBSCRIBE or UNSUBSCRIBE packet.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[out] pRemainingLength The Remaining Length of the MQTT SUBSCRIBE or
+ * UNSUBSCRIBE packet.
+ * @param[out] pPacketSize The total size of the MQTT MQTT SUBSCRIBE or
+ * UNSUBSCRIBE packet.
+ * @param[in] subscriptionType #MQTT_SUBSCRIBE or #MQTT_UNSUBSCRIBE.
+ *
+ * #MQTTBadParameter if the packet would exceed the size allowed by the
+ * MQTT spec; #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize,
+ MQTTSubscriptionType_t subscriptionType );
+
+/**
+ * @brief Validates parameters of #MQTT_SerializeSubscribe or
+ * #MQTT_SerializeUnsubscribe.
+ *
+ * @param[in] pSubscriptionList List of MQTT subscription info.
+ * @param[in] subscriptionCount The number of elements in pSubscriptionList.
+ * @param[in] packetId Packet identifier.
+ * @param[in] remainingLength Remaining length of the packet.
+ * @param[in] pBuffer Buffer for packet serialization.
+ *
+ * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
+ * #MQTTBadParameter if invalid parameters are passed;
+ * #MQTTSuccess otherwise.
+ */
+static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/**
+ * @brief Serialize an MQTT CONNECT packet in the given buffer.
+ *
+ * @param[in] pConnectInfo MQTT CONNECT packet parameters.
+ * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used.
+ * @param[in] remainingLength Remaining Length of MQTT CONNECT packet.
+ * @param[out] pBuffer Buffer for packet serialization.
+ */
+static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer );
+
+/*-----------------------------------------------------------*/
+
+static size_t remainingLengthEncodedSize( size_t length )
+{
+ size_t encodedSize;
+
+ /* Determine how many bytes are needed to encode length.
+ * The values below are taken from the MQTT 3.1.1 spec. */
+
+ /* 1 byte is needed to encode lengths between 0 and 127. */
+ if( length < 128U )
+ {
+ encodedSize = 1U;
+ }
+ /* 2 bytes are needed to encode lengths between 128 and 16,383. */
+ else if( length < 16384U )
+ {
+ encodedSize = 2U;
+ }
+ /* 3 bytes are needed to encode lengths between 16,384 and 2,097,151. */
+ else if( length < 2097152U )
+ {
+ encodedSize = 3U;
+ }
+ /* 4 bytes are needed to encode lengths between 2,097,152 and 268,435,455. */
+ else
+ {
+ encodedSize = 4U;
+ }
+
+ LogDebug( ( "Encoded size for length =%ul is %ul.",
+ length,
+ encodedSize ) );
+
+ return encodedSize;
+}
+
+/*-----------------------------------------------------------*/
+
+static uint8_t * encodeRemainingLength( uint8_t * pDestination,
+ size_t length )
+{
+ uint8_t lengthByte;
+ uint8_t * pLengthEnd = NULL;
+ size_t remainingLength = length;
+
+ assert( pDestination != NULL );
+
+ pLengthEnd = pDestination;
+
+ /* This algorithm is copied from the MQTT v3.1.1 spec. */
+ do
+ {
+ lengthByte = ( uint8_t ) ( remainingLength % 128U );
+ remainingLength = remainingLength / 128U;
+
+ /* Set the high bit of this byte, indicating that there's more data. */
+ if( remainingLength > 0U )
+ {
+ UINT8_SET_BIT( lengthByte, 7 );
+ }
+
+ /* Output a single encoded byte. */
+ *pLengthEnd = lengthByte;
+ pLengthEnd++;
+ } while( remainingLength > 0U );
+
+ return pLengthEnd;
+}
+
+/*-----------------------------------------------------------*/
+
+static uint8_t * encodeString( uint8_t * pDestination,
+ const char * source,
+ uint16_t sourceLength )
+{
+ uint8_t * pBuffer = NULL;
+
+ /* Typecast const char * typed source buffer to const uint8_t *.
+ * This is to use same type buffers in memcpy. */
+ const uint8_t * pSourceBuffer = ( const uint8_t * ) source;
+
+ assert( pDestination != NULL );
+
+ pBuffer = pDestination;
+
+ /* The first byte of a UTF-8 string is the high byte of the string length. */
+ *pBuffer = UINT16_HIGH_BYTE( sourceLength );
+ pBuffer++;
+
+ /* The second byte of a UTF-8 string is the low byte of the string length. */
+ *pBuffer = UINT16_LOW_BYTE( sourceLength );
+ pBuffer++;
+
+ /* Copy the string into pBuffer. */
+ if( pSourceBuffer != NULL )
+ {
+ ( void ) memcpy( pBuffer, pSourceBuffer, sourceLength );
+ }
+
+ /* Return the pointer to the end of the encoded string. */
+ pBuffer += sourceLength;
+
+ return pBuffer;
+}
+
+/*-----------------------------------------------------------*/
+
+static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize )
+{
+ bool status = true;
+ size_t packetSize = 0, payloadLimit = 0;
+
+ assert( pPublishInfo != NULL );
+ assert( pRemainingLength != NULL );
+ assert( pPacketSize != NULL );
+
+ /* The variable header of a PUBLISH packet always contains the topic name.
+ * The first 2 bytes of UTF-8 string contains length of the string.
+ */
+ packetSize += pPublishInfo->topicNameLength + sizeof( uint16_t );
+
+ /* The variable header of a QoS 1 or 2 PUBLISH packet contains a 2-byte
+ * packet identifier. */
+ if( pPublishInfo->qos > MQTTQoS0 )
+ {
+ packetSize += sizeof( uint16_t );
+ }
+
+ /* Calculate the maximum allowed size of the payload for the given parameters.
+ * This calculation excludes the "Remaining length" encoding, whose size is not
+ * yet known. */
+ payloadLimit = MQTT_MAX_REMAINING_LENGTH - packetSize - 1U;
+
+ /* Ensure that the given payload fits within the calculated limit. */
+ if( pPublishInfo->payloadLength > payloadLimit )
+ {
+ LogError( ( "PUBLISH payload length of %lu cannot exceed "
+ "%lu so as not to exceed the maximum "
+ "remaining length of MQTT 3.1.1 packet( %lu ).",
+ pPublishInfo->payloadLength,
+ payloadLimit,
+ MQTT_MAX_REMAINING_LENGTH ) );
+ status = false;
+ }
+ else
+ {
+ /* Add the length of the PUBLISH payload. At this point, the "Remaining length"
+ * has been calculated. */
+ packetSize += pPublishInfo->payloadLength;
+
+ /* Now that the "Remaining length" is known, recalculate the payload limit
+ * based on the size of its encoding. */
+ payloadLimit -= remainingLengthEncodedSize( packetSize );
+
+ /* Check that the given payload fits within the size allowed by MQTT spec. */
+ if( pPublishInfo->payloadLength > payloadLimit )
+ {
+ LogError( ( "PUBLISH payload length of %lu cannot exceed "
+ "%lu so as not to exceed the maximum "
+ "remaining length of MQTT 3.1.1 packet( %lu ).",
+ pPublishInfo->payloadLength,
+ payloadLimit,
+ MQTT_MAX_REMAINING_LENGTH ) );
+ status = false;
+ }
+ else
+ {
+ /* Set the "Remaining length" output parameter and calculate the full
+ * size of the PUBLISH packet. */
+ *pRemainingLength = packetSize;
+
+ packetSize += 1U + remainingLengthEncodedSize( packetSize );
+ *pPacketSize = packetSize;
+ }
+ }
+
+ LogDebug( ( "PUBLISH packet remaining length=%lu and packet size=%lu.",
+ *pRemainingLength,
+ *pPacketSize ) );
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo,
+ size_t remainingLength,
+ uint16_t packetIdentifier,
+ const MQTTFixedBuffer_t * pFixedBuffer,
+ bool serializePayload )
+{
+ uint8_t * pIndex = NULL;
+ const uint8_t * pPayloadBuffer = NULL;
+
+ /* The first byte of a PUBLISH packet contains the packet type and flags. */
+ uint8_t publishFlags = MQTT_PACKET_TYPE_PUBLISH;
+
+ assert( pPublishInfo != NULL );
+ assert( pFixedBuffer != NULL );
+ /* Packet Id should be non zero for QoS1 and QoS2. */
+ assert( pPublishInfo->qos == MQTTQoS0 || packetIdentifier != 0U );
+ /* Duplicate flag should be set only for Qos1 or Qos2. */
+ assert( ( !pPublishInfo->dup ) || ( pPublishInfo->qos > MQTTQoS0 ) );
+
+ /* Get the start address of the buffer. */
+ pIndex = pFixedBuffer->pBuffer;
+
+ if( pPublishInfo->qos == MQTTQoS1 )
+ {
+ LogDebug( ( "Adding QoS as QoS1 in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 );
+ }
+ else if( pPublishInfo->qos == MQTTQoS2 )
+ {
+ LogDebug( ( "Adding QoS as QoS2 in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 );
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ if( pPublishInfo->retain == true )
+ {
+ LogDebug( ( "Adding retain bit in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN );
+ }
+
+ if( pPublishInfo->dup == true )
+ {
+ LogDebug( ( "Adding dup bit in PUBLISH flags." ) );
+ UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP );
+ }
+
+ *pIndex = publishFlags;
+ pIndex++;
+
+ /* The "Remaining length" is encoded from the second byte. */
+ pIndex = encodeRemainingLength( pIndex, remainingLength );
+
+ /* The topic name is placed after the "Remaining length". */
+ pIndex = encodeString( pIndex,
+ pPublishInfo->pTopicName,
+ pPublishInfo->topicNameLength );
+
+ /* A packet identifier is required for QoS 1 and 2 messages. */
+ if( pPublishInfo->qos > MQTTQoS0 )
+ {
+ LogDebug( ( "Adding packet Id in PUBLISH packet." ) );
+ /* Place the packet identifier into the PUBLISH packet. */
+ *pIndex = UINT16_HIGH_BYTE( packetIdentifier );
+ *( pIndex + 1 ) = UINT16_LOW_BYTE( packetIdentifier );
+ pIndex += 2;
+ }
+
+ /* The payload is placed after the packet identifier.
+ * Payload is copied over only if required by the flag serializePayload.
+ * This will help reduce an unnecessary copy of the payload into the buffer.
+ */
+ if( ( pPublishInfo->payloadLength > 0U ) &&
+ ( serializePayload == true ) )
+ {
+ LogDebug( ( "Copying PUBLISH payload of length =%lu to buffer",
+ pPublishInfo->payloadLength ) );
+
+ /* Typecast const void * typed payload buffer to const uint8_t *.
+ * This is to use same type buffers in memcpy. */
+ pPayloadBuffer = ( const uint8_t * ) pPublishInfo->pPayload;
+
+ ( void ) memcpy( pIndex, pPayloadBuffer, pPublishInfo->payloadLength );
+ pIndex += pPublishInfo->payloadLength;
+ }
+
+ /* Ensure that the difference between the end and beginning of the buffer
+ * is less than the buffer size. */
+ assert( ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) <= pFixedBuffer->size );
+}
+
+static size_t getRemainingLength( MQTTTransportRecvFunc_t recvFunc,
+ NetworkContext_t networkContext )
+{
+ size_t remainingLength = 0, multiplier = 1, bytesDecoded = 0, expectedSize = 0;
+ uint8_t encodedByte = 0;
+ int32_t bytesReceived = 0;
+
+ /* This algorithm is copied from the MQTT v3.1.1 spec. */
+ do
+ {
+ if( multiplier > 2097152U ) /* 128 ^ 3 */
+ {
+ remainingLength = MQTT_REMAINING_LENGTH_INVALID;
+ }
+ else
+ {
+ bytesReceived = recvFunc( networkContext, &encodedByte, 1U );
+
+ if( bytesReceived == 1 )
+ {
+ remainingLength += ( ( size_t ) encodedByte & 0x7FU ) * multiplier;
+ multiplier *= 128U;
+ bytesDecoded++;
+ }
+ else
+ {
+ remainingLength = MQTT_REMAINING_LENGTH_INVALID;
+ }
+ }
+
+ if( remainingLength == MQTT_REMAINING_LENGTH_INVALID )
+ {
+ break;
+ }
+ } while( ( encodedByte & 0x80U ) != 0U );
+
+ /* Check that the decoded remaining length conforms to the MQTT specification. */
+ if( remainingLength != MQTT_REMAINING_LENGTH_INVALID )
+ {
+ expectedSize = remainingLengthEncodedSize( remainingLength );
+
+ if( bytesDecoded != expectedSize )
+ {
+ remainingLength = MQTT_REMAINING_LENGTH_INVALID;
+ }
+ }
+
+ return remainingLength;
+}
+
+/*-----------------------------------------------------------*/
+
+static bool incomingPacketValid( uint8_t packetType )
+{
+ bool status = false;
+
+ /* Check packet type. Mask out lower bits to ignore flags. */
+ switch( packetType & 0xF0U )
+ {
+ /* Valid incoming packet types. */
+ case MQTT_PACKET_TYPE_CONNACK:
+ case MQTT_PACKET_TYPE_PUBLISH:
+ case MQTT_PACKET_TYPE_PUBACK:
+ case MQTT_PACKET_TYPE_PUBREC:
+ case MQTT_PACKET_TYPE_PUBCOMP:
+ case MQTT_PACKET_TYPE_SUBACK:
+ case MQTT_PACKET_TYPE_UNSUBACK:
+ case MQTT_PACKET_TYPE_PINGRESP:
+ status = true;
+ break;
+
+ case ( MQTT_PACKET_TYPE_PUBREL & 0xF0U ):
+
+ /* The second bit of a PUBREL must be set. */
+ if( ( packetType & 0x02U ) > 0U )
+ {
+ status = true;
+ }
+
+ break;
+
+ /* Any other packet type is invalid. */
+ default:
+ LogWarn( ( "Incoming packet invalid: Packet type=%u",
+ packetType ) );
+ break;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t checkPublishRemainingLength( size_t remainingLength,
+ MQTTQoS_t qos,
+ size_t qos0Minimum )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Sanity checks for "Remaining length". */
+ if( qos == MQTTQoS0 )
+ {
+ /* Check that the "Remaining length" is greater than the minimum. */
+ if( remainingLength < qos0Minimum )
+ {
+ LogDebug( ( "QoS 0 PUBLISH cannot have a remaining length less than %lu.",
+ qos0Minimum ) );
+
+ status = MQTTBadResponse;
+ }
+ }
+ else
+ {
+ /* Check that the "Remaining length" is greater than the minimum. For
+ * QoS 1 or 2, this will be two bytes greater than for QoS 0 due to the
+ * packet identifier. */
+ if( remainingLength < ( qos0Minimum + 2U ) )
+ {
+ LogDebug( ( "QoS 1 or 2 PUBLISH cannot have a remaining length less than %lu.",
+ qos0Minimum + 2U ) );
+
+ status = MQTTBadResponse;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t processPublishFlags( uint8_t publishFlags,
+ MQTTPublishInfo_t * pPublishInfo )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ assert( pPublishInfo != NULL );
+
+ /* Check for QoS 2. */
+ if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 ) )
+ {
+ /* PUBLISH packet is invalid if both QoS 1 and QoS 2 bits are set. */
+ if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ) )
+ {
+ LogDebug( ( "Bad QoS: 3." ) );
+
+ status = MQTTBadResponse;
+ }
+ else
+ {
+ pPublishInfo->qos = MQTTQoS2;
+ }
+ }
+ /* Check for QoS 1. */
+ else if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ) )
+ {
+ pPublishInfo->qos = MQTTQoS1;
+ }
+ /* If the PUBLISH isn't QoS 1 or 2, then it's QoS 0. */
+ else
+ {
+ pPublishInfo->qos = MQTTQoS0;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ LogDebug( ( "QoS is %d.", pPublishInfo->qos ) );
+
+ /* Parse the Retain bit. */
+ pPublishInfo->retain = ( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN ) ) ? true : false;
+
+ LogDebug( ( "Retain bit is %d.", pPublishInfo->retain ) );
+
+ /* Parse the DUP bit. */
+ pPublishInfo->dup = ( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP ) ) ? true : false;
+
+ LogDebug( ( "DUP bit is %d.", pPublishInfo->dup ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static void logConnackResponse( uint8_t responseCode )
+{
+ const char * const pConnackResponses[ 6 ] =
+ {
+ "Connection accepted.", /* 0 */
+ "Connection refused: unacceptable protocol version.", /* 1 */
+ "Connection refused: identifier rejected.", /* 2 */
+ "Connection refused: server unavailable", /* 3 */
+ "Connection refused: bad user name or password.", /* 4 */
+ "Connection refused: not authorized." /* 5 */
+ };
+
+ /* Avoid unused parameter warning when assert and logs are disabled. */
+ ( void ) responseCode;
+ ( void ) pConnackResponses;
+
+ assert( responseCode <= 5 );
+
+ if( responseCode == 0u )
+ {
+ /* Log at Info level for a success CONNACK response. */
+ LogInfo( ( "%s", pConnackResponses[ 0 ] ) );
+ }
+ else
+ {
+ /* Log an error based on the CONNACK response code. */
+ LogError( ( "%s", pConnackResponses[ responseCode ] ) );
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t deserializeConnack( const MQTTPacketInfo_t * pConnack,
+ bool * pSessionPresent )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ const uint8_t * pRemainingData = NULL;
+
+ assert( pConnack != NULL );
+ assert( pSessionPresent != NULL );
+ pRemainingData = pConnack->pRemainingData;
+
+ /* According to MQTT 3.1.1, the second byte of CONNACK must specify a
+ * "Remaining length" of 2. */
+ if( pConnack->remainingLength != MQTT_PACKET_CONNACK_REMAINING_LENGTH )
+ {
+ LogError( ( "CONNACK does not have remaining length of %d.",
+ MQTT_PACKET_CONNACK_REMAINING_LENGTH ) );
+
+ status = MQTTBadResponse;
+ }
+
+ /* Check the reserved bits in CONNACK. The high 7 bits of the second byte
+ * in CONNACK must be 0. */
+ else if( ( pRemainingData[ 0 ] | 0x01U ) != 0x01U )
+ {
+ LogError( ( "Reserved bits in CONNACK incorrect." ) );
+
+ status = MQTTBadResponse;
+ }
+ else
+ {
+ /* Determine if the "Session Present" bit is set. This is the lowest bit of
+ * the second byte in CONNACK. */
+ if( ( pRemainingData[ 0 ] & MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK )
+ == MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK )
+ {
+ LogWarn( ( "CONNACK session present bit set." ) );
+ *pSessionPresent = true;
+
+ /* MQTT 3.1.1 specifies that the fourth byte in CONNACK must be 0 if the
+ * "Session Present" bit is set. */
+ if( pRemainingData[ 1 ] != 0U )
+ {
+ status = MQTTBadResponse;
+ }
+ }
+ else
+ {
+ LogInfo( ( "CONNACK session present bit not set." ) );
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* In MQTT 3.1.1, only values 0 through 5 are valid CONNACK response codes. */
+ if( pRemainingData[ 1 ] > 5U )
+ {
+ LogError( ( "CONNACK response %hhu is not valid.",
+ pRemainingData[ 1 ] ) );
+
+ status = MQTTBadResponse;
+ }
+ else
+ {
+ /* Print the appropriate message for the CONNACK response code if logs are
+ * enabled. */
+ logConnackResponse( pRemainingData[ 1 ] );
+
+ /* A nonzero CONNACK response code means the connection was refused. */
+ if( pRemainingData[ 1 ] > 0U )
+ {
+ status = MQTTServerRefused;
+ }
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize,
+ MQTTSubscriptionType_t subscriptionType )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ size_t i = 0, packetSize = 0;
+
+ assert( pSubscriptionList != NULL );
+ assert( subscriptionCount != 0U );
+ assert( pRemainingLength != NULL );
+ assert( pPacketSize != NULL );
+
+ /* The variable header of a subscription packet consists of a 2-byte packet
+ * identifier. */
+ packetSize += sizeof( uint16_t );
+
+ /* Sum the lengths of all subscription topic filters; add 1 byte for each
+ * subscription's QoS if type is MQTT_SUBSCRIBE. */
+ for( i = 0; i < subscriptionCount; i++ )
+ {
+ /* Add the length of the topic filter. MQTT strings are prepended
+ * with 2 byte string length field. Hence 2 bytes are added to size. */
+ packetSize += pSubscriptionList[ i ].topicFilterLength + sizeof( uint16_t );
+
+ /* Only SUBSCRIBE packets include the QoS. */
+ if( subscriptionType == MQTT_SUBSCRIBE )
+ {
+ packetSize += 1U;
+ }
+ }
+
+ /* At this point, the "Remaining length" has been calculated. Return error
+ * if the "Remaining length" exceeds what is allowed by MQTT 3.1.1. Otherwise,
+ * set the output parameter.*/
+ if( packetSize > MQTT_MAX_REMAINING_LENGTH )
+ {
+ LogError( ( "Subscription packet length of %lu exceeds"
+ "the MQTT 3.1.1 maximum packet length of %lu.",
+ packetSize,
+ MQTT_MAX_REMAINING_LENGTH ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ *pRemainingLength = packetSize;
+
+ /* Calculate the full size of the subscription packet by adding
+ * number of bytes required to encode the "Remaining length" field
+ * plus 1 byte for the "Packet type" field. */
+ packetSize += 1U + remainingLengthEncodedSize( packetSize );
+
+ /*Set the pPacketSize output parameter. */
+ *pPacketSize = packetSize;
+ }
+
+ LogDebug( ( "Subscription packet remaining length=%lu and packet size=%lu.",
+ *pRemainingLength,
+ *pPacketSize ) );
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t readSubackStatus( size_t statusCount,
+ const uint8_t * pStatusStart )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ uint8_t subscriptionStatus = 0;
+ size_t i = 0;
+
+ assert( pStatusStart != NULL );
+
+ /* Iterate through each status byte in the SUBACK packet. */
+ for( i = 0; i < statusCount; i++ )
+ {
+ /* Read a single status byte in SUBACK. */
+ subscriptionStatus = pStatusStart[ i ];
+
+ /* MQTT 3.1.1 defines the following values as status codes. */
+ switch( subscriptionStatus )
+ {
+ case 0x00:
+ case 0x01:
+ case 0x02:
+
+ LogDebug( ( "Topic filter %lu accepted, max QoS %hhu.",
+ ( unsigned long ) i, subscriptionStatus ) );
+ break;
+
+ case 0x80:
+
+ LogDebug( ( "Topic filter %lu refused.", ( unsigned long ) i ) );
+
+ /* Application should remove subscription from the list */
+ status = MQTTServerRefused;
+
+ break;
+
+ default:
+ LogDebug( ( "Bad SUBSCRIBE status %hhu.", subscriptionStatus ) );
+
+ status = MQTTBadResponse;
+
+ break;
+ }
+
+ /* Stop parsing the subscription statuses if a bad response was received. */
+ if( status == MQTTBadResponse )
+ {
+ break;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t deserializeSuback( const MQTTPacketInfo_t * pSuback,
+ uint16_t * pPacketIdentifier )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ size_t remainingLength;
+ const uint8_t * pVariableHeader = NULL;
+
+ assert( pSuback != NULL );
+ assert( pPacketIdentifier != NULL );
+
+ remainingLength = pSuback->remainingLength;
+ pVariableHeader = pSuback->pRemainingData;
+
+ /* A SUBACK must have a remaining length of at least 3 to accommodate the
+ * packet identifier and at least 1 return code. */
+ if( remainingLength < 3U )
+ {
+ LogDebug( ( "SUBACK cannot have a remaining length less than 3." ) );
+ status = MQTTBadResponse;
+ }
+ else
+ {
+ /* Extract the packet identifier (first 2 bytes of variable header) from SUBACK. */
+ *pPacketIdentifier = UINT16_DECODE( pVariableHeader );
+
+ LogDebug( ( "Packet identifier %hu.", *pPacketIdentifier ) );
+
+ status = readSubackStatus( remainingLength - sizeof( uint16_t ),
+ pVariableHeader + sizeof( uint16_t ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* The serialized packet size = First byte
+ * + length of encoded size of remaining length
+ * + remaining length. */
+ size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength )
+ + remainingLength;
+
+ /* Validate all the parameters. */
+ if( ( pBuffer == NULL ) || ( pSubscriptionList == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pBuffer=%p, "
+ "pSubscriptionList=%p.",
+ pBuffer,
+ pSubscriptionList ) );
+ status = MQTTBadParameter;
+ }
+ else if( subscriptionCount == 0U )
+ {
+ LogError( ( "Subscription count is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else if( packetId == 0U )
+ {
+ LogError( ( "Packet Id for subscription packet is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else if( packetSize > pBuffer->size )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized packet of size of %lu.",
+ pBuffer->size,
+ packetSize ) );
+ status = MQTTNoMemory;
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t deserializePublish( const MQTTPacketInfo_t * pIncomingPacket,
+ uint16_t * pPacketId,
+ MQTTPublishInfo_t * pPublishInfo )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ const uint8_t * pVariableHeader, * pPacketIdentifierHigh;
+
+ assert( pIncomingPacket != NULL );
+ assert( pPacketId != NULL );
+ assert( pPublishInfo != NULL );
+ pVariableHeader = pIncomingPacket->pRemainingData;
+ /* The flags are the lower 4 bits of the first byte in PUBLISH. */
+ status = processPublishFlags( ( pIncomingPacket->type & 0x0FU ), pPublishInfo );
+
+ if( status == MQTTSuccess )
+ {
+ /* Sanity checks for "Remaining length". A QoS 0 PUBLISH must have a remaining
+ * length of at least 3 to accommodate topic name length (2 bytes) and topic
+ * name (at least 1 byte). A QoS 1 or 2 PUBLISH must have a remaining length of
+ * at least 5 for the packet identifier in addition to the topic name length and
+ * topic name. */
+ status = checkPublishRemainingLength( pIncomingPacket->remainingLength,
+ pPublishInfo->qos,
+ MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Extract the topic name starting from the first byte of the variable header.
+ * The topic name string starts at byte 3 in the variable header. */
+ pPublishInfo->topicNameLength = UINT16_DECODE( pVariableHeader );
+
+ /* Sanity checks for topic name length and "Remaining length". The remaining
+ * length must be at least as large as the variable length header. */
+ status = checkPublishRemainingLength( pIncomingPacket->remainingLength,
+ pPublishInfo->qos,
+ pPublishInfo->topicNameLength + sizeof( uint16_t ) );
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Parse the topic. */
+ pPublishInfo->pTopicName = ( const char * ) ( pVariableHeader + sizeof( uint16_t ) );
+ LogDebug( ( "Topic name length %hu: %.*s",
+ pPublishInfo->topicNameLength,
+ pPublishInfo->topicNameLength,
+ pPublishInfo->pTopicName ) );
+
+ /* Extract the packet identifier for QoS 1 or 2 PUBLISH packets. Packet
+ * identifier starts immediately after the topic name. */
+ pPacketIdentifierHigh = ( const uint8_t * ) ( pPublishInfo->pTopicName + pPublishInfo->topicNameLength );
+
+ if( pPublishInfo->qos > MQTTQoS0 )
+ {
+ *pPacketId = UINT16_DECODE( pPacketIdentifierHigh );
+
+ LogDebug( ( "Packet identifier %hu.", *pPacketId ) );
+
+ /* Packet identifier cannot be 0. */
+ if( *pPacketId == 0U )
+ {
+ status = MQTTBadResponse;
+ }
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Calculate the length of the payload. QoS 1 or 2 PUBLISH packets contain
+ * a packet identifier, but QoS 0 PUBLISH packets do not. */
+ if( pPublishInfo->qos == MQTTQoS0 )
+ {
+ pPublishInfo->payloadLength = ( pIncomingPacket->remainingLength - pPublishInfo->topicNameLength - sizeof( uint16_t ) );
+ pPublishInfo->pPayload = pPacketIdentifierHigh;
+ }
+ else
+ {
+ pPublishInfo->payloadLength = ( pIncomingPacket->remainingLength - pPublishInfo->topicNameLength - 2U * sizeof( uint16_t ) );
+ pPublishInfo->pPayload = pPacketIdentifierHigh + sizeof( uint16_t );
+ }
+
+ LogDebug( ( "Payload length %hu.", pPublishInfo->payloadLength ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t deserializeSimpleAck( const MQTTPacketInfo_t * pAck,
+ uint16_t * pPacketIdentifier )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ assert( pAck != NULL );
+ assert( pPacketIdentifier != NULL );
+
+ /* Check that the "Remaining length" of the received ACK is 2. */
+ if( pAck->remainingLength != MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH )
+ {
+ LogError( ( "ACK does not have remaining length of %d.",
+ MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ) );
+
+ status = MQTTBadResponse;
+ }
+ else
+ {
+ /* Extract the packet identifier (third and fourth bytes) from ACK. */
+ *pPacketIdentifier = UINT16_DECODE( pAck->pRemainingData );
+
+ LogDebug( ( "Packet identifier %hu.", *pPacketIdentifier ) );
+
+ /* Packet identifier cannot be 0. */
+ if( *pPacketIdentifier == 0U )
+ {
+ status = MQTTBadResponse;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ assert( pPingresp != NULL );
+
+ /* Check the "Remaining length" (second byte) of the received PINGRESP is 0. */
+ if( pPingresp->remainingLength != MQTT_PACKET_PINGRESP_REMAINING_LENGTH )
+ {
+ LogError( ( "PINGRESP does not have remaining length of %d.",
+ MQTT_PACKET_PINGRESP_REMAINING_LENGTH ) );
+
+ status = MQTTBadResponse;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ uint8_t connectFlags = 0U;
+ uint8_t * pIndex = NULL;
+
+ assert( pConnectInfo != NULL );
+ assert( pBuffer != NULL );
+
+ pIndex = pBuffer->pBuffer;
+ /* The first byte in the CONNECT packet is the control packet type. */
+ *pIndex = MQTT_PACKET_TYPE_CONNECT;
+ pIndex++;
+
+ /* The remaining length of the CONNECT packet is encoded starting from the
+ * second byte. The remaining length does not include the length of the fixed
+ * header or the encoding of the remaining length. */
+ pIndex = encodeRemainingLength( pIndex, remainingLength );
+
+ /* The string "MQTT" is placed at the beginning of the CONNECT packet's variable
+ * header. This string is 4 bytes long. */
+ pIndex = encodeString( pIndex, "MQTT", 4 );
+
+ /* The MQTT protocol version is the second field of the variable header. */
+ *pIndex = MQTT_VERSION_3_1_1;
+ pIndex++;
+
+ /* Set the clean session flag if needed. */
+ if( pConnectInfo->cleanSession == true )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_CLEAN );
+ }
+
+ /* Set the flags for username and password if provided. */
+ if( pConnectInfo->pUserName != NULL )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_USERNAME );
+ }
+
+ if( pConnectInfo->pPassword != NULL )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_PASSWORD );
+ }
+
+ /* Set will flag if a Last Will and Testament is provided. */
+ if( pWillInfo != NULL )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL );
+
+ /* Flags only need to be changed for Will QoS 1 or 2. */
+ if( pWillInfo->qos == MQTTQoS1 )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_QOS1 );
+ }
+ else if( pWillInfo->qos == MQTTQoS2 )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_QOS2 );
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ if( pWillInfo->retain == true )
+ {
+ UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_RETAIN );
+ }
+ }
+
+ *pIndex = connectFlags;
+ pIndex++;
+
+ /* Write the 2 bytes of the keep alive interval into the CONNECT packet. */
+ *pIndex = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds );
+ *( pIndex + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds );
+ pIndex += 2;
+
+ /* Write the client identifier into the CONNECT packet. */
+ pIndex = encodeString( pIndex,
+ pConnectInfo->pClientIdentifier,
+ pConnectInfo->clientIdentifierLength );
+
+ /* Write the will topic name and message into the CONNECT packet if provided. */
+ if( pWillInfo != NULL )
+ {
+ pIndex = encodeString( pIndex,
+ pWillInfo->pTopicName,
+ pWillInfo->topicNameLength );
+
+ pIndex = encodeString( pIndex,
+ pWillInfo->pPayload,
+ ( uint16_t ) pWillInfo->payloadLength );
+ }
+
+ /* Encode the user name if provided. */
+ if( pConnectInfo->pUserName != NULL )
+ {
+ pIndex = encodeString( pIndex, pConnectInfo->pUserName, pConnectInfo->userNameLength );
+ }
+
+ /* Encode the password if provided. */
+ if( pConnectInfo->pPassword != NULL )
+ {
+ pIndex = encodeString( pIndex, pConnectInfo->pPassword, pConnectInfo->passwordLength );
+ }
+
+ LogDebug( ( "Length of serialized CONNECT packet is %lu.",
+ ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+
+ /* Ensure that the difference between the end and beginning of the buffer
+ * is less than the buffer size. */
+ assert( ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) <= pBuffer->size );
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ size_t remainingLength;
+
+ /* The CONNECT packet will always include a 10-byte variable header. */
+ size_t connectPacketSize = MQTT_PACKET_CONNECT_HEADER_SIZE;
+
+ /* Validate arguments. */
+ if( ( pConnectInfo == NULL ) || ( pRemainingLength == NULL ) ||
+ ( pPacketSize == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
+ "pRemainingLength=%p, pPacketSize=%p.",
+ pConnectInfo,
+ pRemainingLength,
+ pPacketSize ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pConnectInfo->clientIdentifierLength == 0U ) || ( pConnectInfo->pClientIdentifier == NULL ) )
+ {
+ LogError( ( "Mqtt_GetConnectPacketSize() client identifier must be set." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Add the length of the client identifier. */
+ connectPacketSize += pConnectInfo->clientIdentifierLength + sizeof( uint16_t );
+
+ /* Add the lengths of the will message and topic name if provided. */
+ if( pWillInfo != NULL )
+ {
+ connectPacketSize += pWillInfo->topicNameLength + sizeof( uint16_t ) +
+ pWillInfo->payloadLength + sizeof( uint16_t );
+ }
+
+ /* Add the lengths of the user name and password if provided. */
+ if( pConnectInfo->pUserName != NULL )
+ {
+ connectPacketSize += pConnectInfo->userNameLength + sizeof( uint16_t );
+ }
+
+ if( pConnectInfo->pPassword != NULL )
+ {
+ connectPacketSize += pConnectInfo->passwordLength + sizeof( uint16_t );
+ }
+
+ /* At this point, the "Remaining Length" field of the MQTT CONNECT packet has
+ * been calculated. */
+ remainingLength = connectPacketSize;
+
+ /* Calculate the full size of the MQTT CONNECT packet by adding the size of
+ * the "Remaining Length" field plus 1 byte for the "Packet Type" field. */
+ connectPacketSize += 1U + remainingLengthEncodedSize( connectPacketSize );
+
+ /* Check that the CONNECT packet is within the bounds of the MQTT spec. */
+ if( connectPacketSize > MQTT_PACKET_CONNECT_MAX_SIZE )
+ {
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ *pRemainingLength = remainingLength;
+ *pPacketSize = connectPacketSize;
+ }
+
+ LogDebug( ( "CONNECT packet remaining length=%lu and packet size=%lu.",
+ *pRemainingLength,
+ *pPacketSize ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo,
+ const MQTTPublishInfo_t * pWillInfo,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Calculate CONNECT packet size. */
+ size_t connectPacketSize = remainingLength + remainingLengthEncodedSize( remainingLength ) + 1U;
+
+ /* Validate arguments. */
+ if( ( pConnectInfo == NULL ) || ( pBuffer == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pConnectInfo=%p, "
+ "pBuffer=%p.",
+ pConnectInfo,
+ pBuffer ) );
+ status = MQTTBadParameter;
+ }
+ /* Check that the full packet size fits within the given buffer. */
+ else if( connectPacketSize > pBuffer->size )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized CONNECT packet of size of %lu.",
+ pBuffer->size,
+ connectPacketSize ) );
+ status = MQTTNoMemory;
+ }
+ else if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) )
+ {
+ LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ serializeConnectPacket( pConnectInfo,
+ pWillInfo,
+ remainingLength,
+ pBuffer );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate parameters. */
+ if( ( pSubscriptionList == NULL ) || ( pRemainingLength == NULL ) ||
+ ( pPacketSize == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, "
+ "pRemainingLength=%p, pPacketSize=%p.",
+ pSubscriptionList,
+ pRemainingLength,
+ pPacketSize ) );
+ status = MQTTBadParameter;
+ }
+ else if( subscriptionCount == 0U )
+ {
+ LogError( ( " subscriptionCount is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Calculate the MQTT UNSUBSCRIBE packet size. */
+ status = calculateSubscriptionPacketSize( pSubscriptionList,
+ subscriptionCount,
+ pRemainingLength,
+ pPacketSize,
+ MQTT_SUBSCRIBE );
+
+ if( status == MQTTBadParameter )
+ {
+ LogError( ( "SUBSCRIBE packet remaining length exceeds %lu, which is the "
+ "maximum size allowed by MQTT 3.1.1.",
+ MQTT_MAX_REMAINING_LENGTH ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ size_t i = 0;
+ uint8_t * pIndex = NULL;
+
+ /* Validate all the parameters. */
+ MQTTStatus_t status =
+ validateSubscriptionSerializeParams( pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength,
+ pBuffer );
+
+ if( status == MQTTSuccess )
+ {
+ pIndex = pBuffer->pBuffer;
+
+ /* The first byte in SUBSCRIBE is the packet type. */
+ *pIndex = MQTT_PACKET_TYPE_SUBSCRIBE;
+ pIndex++;
+
+ /* Encode the "Remaining length" starting from the second byte. */
+ pIndex = encodeRemainingLength( pIndex, remainingLength );
+
+ /* Place the packet identifier into the SUBSCRIBE packet. */
+ *pIndex = UINT16_HIGH_BYTE( packetId );
+ *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId );
+ pIndex += 2;
+
+ /* Serialize each subscription topic filter and QoS. */
+ for( i = 0; i < subscriptionCount; i++ )
+ {
+ pIndex = encodeString( pIndex,
+ pSubscriptionList[ i ].pTopicFilter,
+ pSubscriptionList[ i ].topicFilterLength );
+
+ /* Place the QoS in the SUBSCRIBE packet. */
+ *pIndex = ( uint8_t ) ( pSubscriptionList[ i ].qos );
+ pIndex++;
+ }
+
+ LogDebug( ( "Length of serialized SUBSCRIBE packet is %lu.",
+ ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ size_t * pRemainingLength,
+ size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate parameters. */
+ if( ( pSubscriptionList == NULL ) || ( pRemainingLength == NULL ) ||
+ ( pPacketSize == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, "
+ "pRemainingLength=%p, pPacketSize=%p.",
+ pSubscriptionList,
+ pRemainingLength,
+ pPacketSize ) );
+ status = MQTTBadParameter;
+ }
+ else if( subscriptionCount == 0U )
+ {
+ LogError( ( "Subscription count is 0." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Calculate the MQTT SUBSCRIBE packet size. */
+ status = calculateSubscriptionPacketSize( pSubscriptionList,
+ subscriptionCount,
+ pRemainingLength,
+ pPacketSize,
+ MQTT_UNSUBSCRIBE );
+
+ if( status == MQTTBadParameter )
+ {
+ LogError( ( "UNSUBSCRIBE packet remaining length exceeds %lu, which is the "
+ "maximum size allowed by MQTT 3.1.1.",
+ MQTT_MAX_REMAINING_LENGTH ) );
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptionList,
+ size_t subscriptionCount,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ size_t i = 0;
+ uint8_t * pIndex = NULL;
+
+ /* Validate all the parameters. */
+ status = validateSubscriptionSerializeParams( pSubscriptionList,
+ subscriptionCount,
+ packetId,
+ remainingLength,
+ pBuffer );
+
+ if( status == MQTTSuccess )
+ {
+ /* Get the start of the buffer to the iterator variable. */
+ pIndex = pBuffer->pBuffer;
+
+ /* The first byte in UNSUBSCRIBE is the packet type. */
+ *pIndex = MQTT_PACKET_TYPE_UNSUBSCRIBE;
+ pIndex++;
+
+ /* Encode the "Remaining length" starting from the second byte. */
+ pIndex = encodeRemainingLength( pIndex, remainingLength );
+
+ /* Place the packet identifier into the UNSUBSCRIBE packet. */
+ *pIndex = UINT16_HIGH_BYTE( packetId );
+ *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId );
+ pIndex += 2;
+
+ /* Serialize each subscription topic filter. */
+ for( i = 0; i < subscriptionCount; i++ )
+ {
+ pIndex = encodeString( pIndex,
+ pSubscriptionList[ i ].pTopicFilter,
+ pSubscriptionList[ i ].topicFilterLength );
+ }
+
+ LogDebug( ( "Length of serialized UNSUBSCRIBE packet is %lu.",
+ ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo,
+ size_t * pRemainingLength,
+ size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( ( pPublishInfo == NULL ) || ( pRemainingLength == NULL ) || ( pPacketSize == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pPublishInfo=%p, "
+ "pRemainingLength=%p, pPacketSize=%p.",
+ pPublishInfo,
+ pRemainingLength,
+ pPacketSize ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) )
+ {
+ LogError( ( "Invalid topic name for PUBLISH: pTopicName=%p, "
+ "topicNameLength=%u.",
+ pPublishInfo->pTopicName,
+ pPublishInfo->topicNameLength ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Calculate the "Remaining length" field and total packet size. If it exceeds
+ * what is allowed in the MQTT standard, return an error. */
+ if( calculatePublishPacketSize( pPublishInfo, pRemainingLength, pPacketSize ) == false )
+ {
+ LogError( ( "PUBLISH packet remaining length exceeds %lu, which is the "
+ "maximum size allowed by MQTT 3.1.1.",
+ MQTT_MAX_REMAINING_LENGTH ) );
+ status = MQTTBadParameter;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Length of serialized packet = First byte
+ * + Length of encoded remaining length
+ * + Remaining length. */
+ size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength )
+ + remainingLength;
+
+ if( ( pBuffer == NULL ) || ( pPublishInfo == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pBuffer=%p, "
+ "pPublishInfo=%p.",
+ pBuffer,
+ pPublishInfo ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) )
+ {
+ LogError( ( "Invalid topic name for PUBLISH: pTopicName=%p, "
+ "topicNameLength=%u.",
+ pPublishInfo->pTopicName,
+ pPublishInfo->topicNameLength ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
+ {
+ LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.",
+ pPublishInfo->qos ) );
+ status = MQTTBadParameter;
+ }
+ else if( packetSize > pBuffer->size )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized PUBLISH packet of size of %lu.",
+ pBuffer->size,
+ packetSize ) );
+ status = MQTTNoMemory;
+ }
+ else
+ {
+ /* Serialize publish with header and payload. */
+ serializePublishCommon( pPublishInfo,
+ remainingLength,
+ packetId,
+ pBuffer,
+ true );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo,
+ uint16_t packetId,
+ size_t remainingLength,
+ const MQTTFixedBuffer_t * pBuffer,
+ size_t * pHeaderSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Length of serialized packet = First byte
+ * + Length of encoded remaining length
+ * + Remaining length
+ * - Payload Length.
+ * Payload length will be subtracted after verifying pPublishInfo parameter.
+ */
+ size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength )
+ + remainingLength;
+
+ if( ( pBuffer == NULL ) || ( pPublishInfo == NULL ) ||
+ ( pHeaderSize == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pBuffer=%p, "
+ "pPublishInfo=%p, pHeaderSize=%p.",
+ pBuffer,
+ pPublishInfo,
+ pHeaderSize ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) )
+ {
+ LogError( ( "Invalid topic name for publish: pTopicName=%p, "
+ "topicNameLength=%u.",
+ pPublishInfo->pTopicName,
+ pPublishInfo->topicNameLength ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) )
+ {
+ LogError( ( "Packet Id is 0 for publish with QoS=%u.",
+ pPublishInfo->qos ) );
+ status = MQTTBadParameter;
+ }
+
+
+ else if( ( packetSize - pPublishInfo->payloadLength ) > pBuffer->size )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized PUBLISH header packet of size of %lu.",
+ pBuffer->size,
+ ( packetSize - pPublishInfo->payloadLength ) ) );
+ status = MQTTNoMemory;
+ }
+ else
+ {
+ /* Serialize publish without copying the payload. */
+ serializePublishCommon( pPublishInfo,
+ remainingLength,
+ packetId,
+ pBuffer,
+ false );
+
+ /* Header size is the same as calculated packet size. */
+ *pHeaderSize = ( packetSize - pPublishInfo->payloadLength );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pBuffer,
+ uint8_t packetType,
+ uint16_t packetId )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( pBuffer == NULL )
+ {
+ LogError( ( "Provided buffer is NULL." ) );
+ status = MQTTBadParameter;
+ }
+ /* The buffer must be able to fit 4 bytes for the packet. */
+ else if( pBuffer->size < MQTT_PUBLISH_ACK_PACKET_SIZE )
+ {
+ LogError( ( "Insufficient memory for packet." ) );
+ status = MQTTNoMemory;
+ }
+ else if( packetId == 0U )
+ {
+ LogError( ( "Packet ID cannot be 0." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ switch( packetType )
+ {
+ /* Only publish acks are serialized by the client. */
+ case MQTT_PACKET_TYPE_PUBACK:
+ case MQTT_PACKET_TYPE_PUBREC:
+ case MQTT_PACKET_TYPE_PUBREL:
+ case MQTT_PACKET_TYPE_PUBCOMP:
+ pBuffer->pBuffer[ 0 ] = packetType;
+ pBuffer->pBuffer[ 1 ] = MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH;
+ pBuffer->pBuffer[ 2 ] = UINT16_HIGH_BYTE( packetId );
+ pBuffer->pBuffer[ 3 ] = UINT16_LOW_BYTE( packetId );
+ break;
+
+ default:
+ LogError( ( "Packet type is not a publish ACK: Packet type=%02x",
+ packetType ) );
+ status = MQTTBadParameter;
+ break;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( pPacketSize == NULL )
+ {
+ LogError( ( "pPacketSize is NULL." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* MQTT DISCONNECT packets always have the same size. */
+ *pPacketSize = MQTT_DISCONNECT_PACKET_SIZE;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ /* Validate arguments. */
+ if( pBuffer == NULL )
+ {
+ LogError( ( "pBuffer cannot be NULL." ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ if( pBuffer->size < MQTT_DISCONNECT_PACKET_SIZE )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized DISCONNECT packet of size of %lu.",
+ pBuffer->size,
+ MQTT_DISCONNECT_PACKET_SIZE ) );
+ status = MQTTNoMemory;
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ pBuffer->pBuffer[ 0 ] = MQTT_PACKET_TYPE_DISCONNECT;
+ pBuffer->pBuffer[ 1 ] = MQTT_DISCONNECT_REMAINING_LENGTH;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( pPacketSize == NULL )
+ {
+ LogError( ( "pPacketSize is NULL." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* MQTT PINGREQ packets always have the same size. */
+ *pPacketSize = MQTT_PACKET_PINGREQ_SIZE;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pBuffer )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( pBuffer == NULL )
+ {
+ LogError( ( "pBuffer is NULL." ) );
+ status = MQTTBadParameter;
+ }
+
+ if( status == MQTTSuccess )
+ {
+ if( pBuffer->size < MQTT_PACKET_PINGREQ_SIZE )
+ {
+ LogError( ( "Buffer size of %lu is not sufficient to hold "
+ "serialized PINGREQ packet of size of %u.",
+ pBuffer->size,
+ MQTT_PACKET_PINGREQ_SIZE ) );
+ status = MQTTNoMemory;
+ }
+ }
+
+ if( status == MQTTSuccess )
+ {
+ /* Ping request packets are always the same. */
+ pBuffer->pBuffer[ 0 ] = MQTT_PACKET_TYPE_PINGREQ;
+ pBuffer->pBuffer[ 1 ] = 0x00;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket,
+ uint16_t * pPacketId,
+ MQTTPublishInfo_t * pPublishInfo )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( ( pIncomingPacket == NULL ) || ( pPacketId == NULL ) || ( pPublishInfo == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pIncomingPacket=%p, "
+ "pPacketId=%p, pPublishInfo=%p",
+ pIncomingPacket,
+ pPacketId,
+ pPublishInfo ) );
+ status = MQTTBadParameter;
+ }
+ else if( ( pIncomingPacket->type & 0xF0U ) != MQTT_PACKET_TYPE_PUBLISH )
+ {
+ LogError( ( "Packet is not publish. Packet type: %hu.",
+ pIncomingPacket->type ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ status = deserializePublish( pIncomingPacket, pPacketId, pPublishInfo );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket,
+ uint16_t * pPacketId,
+ bool * pSessionPresent )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( ( pIncomingPacket == NULL ) )
+ {
+ LogError( ( "pIncomingPacket cannot be NULL." ) );
+ status = MQTTBadParameter;
+ }
+
+ /* Pointer for packet identifier cannot be NULL for packets other than
+ * CONNACK and PINGRESP. */
+ else if( ( pPacketId == NULL ) &&
+ ( ( pIncomingPacket->type != MQTT_PACKET_TYPE_CONNACK ) &&
+ ( pIncomingPacket->type != MQTT_PACKET_TYPE_PINGRESP ) ) )
+ {
+ LogError( ( "pPacketId cannot be NULL for packet type %02x.",
+ pIncomingPacket->type ) );
+ status = MQTTBadParameter;
+ }
+ /* Pointer for session present cannot be NULL for CONNACK. */
+ else if( ( pSessionPresent == NULL ) &&
+ ( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK ) )
+ {
+ LogError( ( "pSessionPresent cannot be NULL for CONNACK packet." ) );
+ status = MQTTBadParameter;
+ }
+ else if( pIncomingPacket->pRemainingData == NULL )
+ {
+ LogError( ( "Remaining data of incoming packet is NULL." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Make sure response packet is a valid ack. */
+ switch( pIncomingPacket->type )
+ {
+ case MQTT_PACKET_TYPE_CONNACK:
+ status = deserializeConnack( pIncomingPacket, pSessionPresent );
+ break;
+
+ case MQTT_PACKET_TYPE_SUBACK:
+ status = deserializeSuback( pIncomingPacket, pPacketId );
+ break;
+
+ case MQTT_PACKET_TYPE_PINGRESP:
+ status = deserializePingresp( pIncomingPacket );
+ break;
+
+ case MQTT_PACKET_TYPE_UNSUBACK:
+ case MQTT_PACKET_TYPE_PUBACK:
+ case MQTT_PACKET_TYPE_PUBREC:
+ case MQTT_PACKET_TYPE_PUBREL:
+ case MQTT_PACKET_TYPE_PUBCOMP:
+ status = deserializeSimpleAck( pIncomingPacket, pPacketId );
+ break;
+
+ /* Any other packet type is invalid. */
+ default:
+ LogError( ( "IotMqtt_DeserializeResponse() called with unknown packet type:(%02x).", pIncomingPacket->type ) );
+ status = MQTTBadResponse;
+ break;
+ }
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( MQTTTransportRecvFunc_t readFunc,
+ NetworkContext_t networkContext,
+ MQTTPacketInfo_t * pIncomingPacket )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ int32_t bytesReceived = 0;
+
+ if( pIncomingPacket == NULL )
+ {
+ LogError( ( "Invalid parameter: pIncomingPacket is NULL." ) );
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Read a single byte. */
+ bytesReceived = readFunc( networkContext, &( pIncomingPacket->type ), 1U );
+ }
+
+ if( bytesReceived == 1 )
+ {
+ /* Check validity. */
+ if( incomingPacketValid( pIncomingPacket->type ) == true )
+ {
+ pIncomingPacket->remainingLength = getRemainingLength( readFunc, networkContext );
+
+ if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
+ {
+ status = MQTTBadResponse;
+ }
+ }
+ else
+ {
+ LogError( ( "Incoming packet invalid: Packet type=%u",
+ pIncomingPacket->type ) );
+ status = MQTTBadResponse;
+ }
+ }
+ else if( ( status != MQTTBadParameter ) && ( bytesReceived == 0 ) )
+ {
+ status = MQTTNoDataAvailable;
+ }
+
+ /* If the input packet was valid, then any other number of bytes received is
+ * a failure. */
+ else if( status != MQTTBadParameter )
+ {
+ status = MQTTRecvFailed;
+ }
+ else
+ {
+ /* Empty else MISRA 15.7 */
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c
new file mode 100644
index 000000000..8afd93050
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c
@@ -0,0 +1,1100 @@
+/*
+ * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include <assert.h>
+#include <string.h>
+#include "mqtt_state.h"
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Create a 16-bit bitmap with bit set at specified position.
+ *
+ * @param[in] position The position at which the bit need to be set.
+ */
+#define UINT16_BITMAP_BIT_SET_AT( position ) ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) )
+
+/**
+ * @brief Set a bit in an 16-bit unsigned integer.
+ *
+ * @param[in] x The 16-bit unsigned integer to set a bit.
+ * @param[in] position The position at which the bit need to be set.
+ */
+#define UINT16_SET_BIT( x, position ) ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) )
+
+/**
+ * @brief Macro for checking if a bit is set in a 16-bit unsigned integer.
+ *
+ * @param[in] x The unsigned 16-bit integer to check.
+ * @param[in] position Which bit to check.
+ */
+#define UINT16_CHECK_BIT( x, position ) ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) )
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Test if a transition to new state is possible, when dealing with PUBLISHes.
+ *
+ * @param[in] currentState The current state.
+ * @param[in] newState State to transition to.
+ * @param[in] opType Reserve, Send, or Receive.
+ * @param[in] qos 0, 1, or 2.
+ *
+ * @note This function does not validate the current state, or the new state
+ * based on either the operation type or QoS. It assumes the new state is valid
+ * given the opType and QoS, which will be the case if calculated by
+ * MQTT_CalculateStatePublish().
+ *
+ * @return `true` if transition is possible, else `false`
+ */
+static bool validateTransitionPublish( MQTTPublishState_t currentState,
+ MQTTPublishState_t newState,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos );
+
+/**
+ * @brief Test if a transition to a new state is possible, when dealing with acks.
+ *
+ * @param[in] currentState The current state.
+ * @param[in] newState State to transition to.
+ *
+ * @return `true` if transition is possible, else `false`.
+ */
+static bool validateTransitionAck( MQTTPublishState_t currentState,
+ MQTTPublishState_t newState );
+
+/**
+ * @brief Test if the publish corresponding to an ack is outgoing or incoming.
+ *
+ * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP.
+ * @param[in] opType Send, or Receive.
+ *
+ * @return `true` if corresponds to outgoing publish, else `false`.
+ */
+static bool isPublishOutgoing( MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType );
+
+/**
+ * @brief Find a packet ID in the state record.
+ *
+ * @param[in] records State record array.
+ * @param[in] recordCount Length of record array.
+ * @param[in] packetId packet ID to search for.
+ * @param[out] pQos QoS retrieved from record.
+ * @param[out] pCurrentState state retrieved from record.
+ *
+ * @return index of the packet id in the record if it exists, else the record length.
+ */
+static size_t findInRecord( const MQTTPubAckInfo_t * records,
+ size_t recordCount,
+ uint16_t packetId,
+ MQTTQoS_t * pQos,
+ MQTTPublishState_t * pCurrentState );
+
+/**
+ * @brief Compact records.
+ *
+ * Records are arranged in the relative order to maintain message ordering.
+ * This will lead to fragmentation and this function will help in defragmenting
+ * the records array.
+ *
+ * @param[in] records State record array.
+ * @param[in] recordCount Length of record array.
+ */
+static void compactRecords( MQTTPubAckInfo_t * records,
+ size_t recordCount );
+
+/**
+ * @brief Store a new entry in the state record.
+ *
+ * @param[in] records State record array.
+ * @param[in] recordCount Length of record array.
+ * @param[in] packetId, packet ID of new entry.
+ * @param[in] qos QoS of new entry.
+ * @param[in] publishState state of new entry.
+ *
+ * @return MQTTSuccess, MQTTNoMemory, MQTTStateCollision.
+ */
+static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
+ size_t recordCount,
+ uint16_t packetId,
+ MQTTQoS_t qos,
+ MQTTPublishState_t publishState );
+
+/**
+ * @brief Update and possibly delete an entry in the state record.
+ *
+ * @param[in] records State record array.
+ * @param[in] recordIndex index of record to update.
+ * @param[in] newState New state to update.
+ * @param[in] shouldDelete Whether an existing entry should be deleted.
+ */
+static void updateRecord( MQTTPubAckInfo_t * records,
+ size_t recordIndex,
+ MQTTPublishState_t newState,
+ bool shouldDelete );
+
+/**
+ * @brief Get the packet ID and index of an outgoing publish in specified
+ * states.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in] searchStates The states to search for in 2-byte bit map.
+ * @param[in,out] pCursor Index at which to start searching.
+ */
+static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
+ uint16_t searchStates,
+ MQTTStateCursor_t * pCursor );
+
+/**
+ * @brief Update the state records for an ACK after state transition
+ * validations.
+ *
+ * @param[in] records State records pointer.
+ * @param[in] recordIndex Index at which the record is stored.
+ * @param[in] packetId Packet id of the packet.
+ * @param[in] currentState Current state of the publish record.
+ * @param[in] newState New state of the publish.
+ *
+ * @return #MQTTIllegalState, or #MQTTSuccess.
+ */
+static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
+ size_t recordIndex,
+ uint16_t packetId,
+ MQTTPublishState_t currentState,
+ MQTTPublishState_t newState );
+
+/**
+ * @brief Update the state record for a PUBLISH packet after validating
+ * the state transitions.
+ *
+ * @param[in] pMqttContext Initialized MQTT context.
+ * @param[in] recordIndex Index in state records at which publish record exists.
+ * @param[in] packetId ID of the PUBLISH packet.
+ * @param[in] opType Send or Receive.
+ * @param[in] qos 0, 1, or 2.
+ * @param[in] currentState Current state of the publish record.
+ * @param[in] newState New state of the publish record.
+ *
+ * @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess.
+ */
+static MQTTStatus_t updateStatePublish( MQTTContext_t * pMqttContext,
+ size_t recordIndex,
+ uint16_t packetId,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos,
+ MQTTPublishState_t currentState,
+ MQTTPublishState_t newState );
+
+/*-----------------------------------------------------------*/
+
+static bool validateTransitionPublish( MQTTPublishState_t currentState,
+ MQTTPublishState_t newState,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos )
+{
+ bool isValid = false;
+
+ switch( currentState )
+ {
+ case MQTTStateNull:
+
+ /* Transitions from null occur when storing a new entry into the record. */
+ if( opType == MQTT_RECEIVE )
+ {
+ isValid = ( ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend ) ) ? true : false;
+ }
+
+ break;
+
+ case MQTTPublishSend:
+
+ /* Outgoing publish. All such publishes start in this state due to
+ * the reserve operation. */
+ switch( qos )
+ {
+ case MQTTQoS1:
+ isValid = ( newState == MQTTPubAckPending ) ? true : false;
+ break;
+
+ case MQTTQoS2:
+ isValid = ( newState == MQTTPubRecPending ) ? true : false;
+ break;
+
+ default:
+ /* QoS 0 is checked before calling this function. */
+ break;
+ }
+
+ break;
+
+ /* Below cases are for validating the resends of publish when a session is
+ * reestablished. */
+ case MQTTPubAckPending:
+
+ /* When a session is reestablished, outgoing QoS1 publishes in state
+ * #MQTTPubAckPending can be resent. The state remains the same. */
+ isValid = ( newState == MQTTPubAckPending ) ? true : false;
+
+ break;
+
+ case MQTTPubRecPending:
+
+ /* When a session is reestablished, outgoing QoS2 publishes in state
+ * #MQTTPubRecPending can be resent. The state remains the same. */
+ isValid = ( newState == MQTTPubRecPending ) ? true : false;
+
+ break;
+
+ default:
+ /* For a PUBLISH, we should not start from any other state. */
+ break;
+ }
+
+ return isValid;
+}
+
+/*-----------------------------------------------------------*/
+
+static bool validateTransitionAck( MQTTPublishState_t currentState,
+ MQTTPublishState_t newState )
+{
+ bool isValid = false;
+
+ switch( currentState )
+ {
+ case MQTTPubAckSend:
+ /* Incoming publish, QoS 1. */
+ case MQTTPubAckPending:
+ /* Outgoing publish, QoS 1. */
+ isValid = ( newState == MQTTPublishDone ) ? true : false;
+ break;
+
+ case MQTTPubRecSend:
+ /* Incoming publish, QoS 2. */
+ isValid = ( newState == MQTTPubRelPending ) ? true : false;
+ break;
+
+ case MQTTPubRelPending:
+
+ /* Incoming publish, QoS 2.
+ * There are 2 valid transitions possible.
+ * 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received
+ * when publish record state is MQTTPubRelPending. This is the
+ * normal state transition without any connection interuptions.
+ * 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate
+ * QoS2 publish can result in a transition to the same state.
+ * This can happen in the below state transition.
+ * 1. Incoming publish received.
+ * 2. PUBREC ack sent and state is now MQTTPubRelPending.
+ * 3. TCP connection failure and broker didn't receive the PUBREC.
+ * 4. Reestablished MQTT session.
+ * 5. MQTT broker resent the un-acked publish.
+ * 6. Publish is received when publish record state is in
+ * MQTTPubRelPending.
+ * 7. Sending out a PUBREC will result in this transition
+ * to the same state. */
+ isValid = ( ( newState == MQTTPubCompSend ) ||
+ ( newState == MQTTPubRelPending ) ) ? true : false;
+ break;
+
+ case MQTTPubCompSend:
+
+ /* Incoming publish, QoS 2.
+ * There are 2 valid transitions possible.
+ * 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent
+ * after receiving a PUBREL from broker. This is the
+ * normal state transition without any connection interuptions.
+ * 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL
+ * can result in a transition to the same state.
+ * This can happen in the below state transition.
+ * 1. A TCP connection failure happened before sending a PUBCOMP
+ * for an incoming PUBREL.
+ * 2. Reestablished an MQTT session.
+ * 3. MQTT broker resent the un-acked PUBREL.
+ * 4. Receiving the PUBREL again will result in this transition
+ * to the same state. */
+ isValid = ( ( newState == MQTTPublishDone ) ||
+ ( newState == MQTTPubCompSend ) ) ? true : false;
+ break;
+
+ case MQTTPubRecPending:
+ /* Outgoing publish, Qos 2. */
+ isValid = ( newState == MQTTPubRelSend ) ? true : false;
+ break;
+
+ case MQTTPubRelSend:
+ /* Outgoing publish, Qos 2. */
+ isValid = ( newState == MQTTPubCompPending ) ? true : false;
+ break;
+
+ case MQTTPubCompPending:
+
+ /* Outgoing publish, Qos 2.
+ * There are 2 valid transitions possible.
+ * 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received.
+ * This marks the complete state transistion for the publish packet.
+ * This is the normal state transition without any connection
+ * interuptions.
+ * 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for
+ * packets in state #MQTTPubCompPending can result in this
+ * transition to the same state.
+ * This can happen in the below state transition.
+ * 1. A TCP connection failure happened before receiving a PUBCOMP
+ * for an outgoing PUBREL.
+ * 2. An MQTT session is reestablished.
+ * 3. Resending the un-acked PUBREL results in this transition
+ * to the same state. */
+ isValid = ( ( newState == MQTTPublishDone ) ||
+ ( newState == MQTTPubCompPending ) ) ? true : false;
+ break;
+
+ case MQTTPublishDone:
+ /* Done state should transition to invalid since it will be removed from the record. */
+ case MQTTPublishSend:
+ /* If an ack was sent/received we shouldn't have been in this state. */
+ case MQTTStateNull:
+ /* If an ack was sent/received the record should exist. */
+ default:
+ /* Invalid. */
+ break;
+ }
+
+ return isValid;
+}
+
+/*-----------------------------------------------------------*/
+
+static bool isPublishOutgoing( MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType )
+{
+ bool isOutgoing = false;
+
+ switch( packetType )
+ {
+ case MQTTPuback:
+ case MQTTPubrec:
+ case MQTTPubcomp:
+ isOutgoing = ( opType == MQTT_RECEIVE ) ? true : false;
+ break;
+
+ case MQTTPubrel:
+ isOutgoing = ( opType == MQTT_SEND ) ? true : false;
+ break;
+
+ default:
+ /* No other ack type. */
+ break;
+ }
+
+ return isOutgoing;
+}
+
+/*-----------------------------------------------------------*/
+
+static size_t findInRecord( const MQTTPubAckInfo_t * records,
+ size_t recordCount,
+ uint16_t packetId,
+ MQTTQoS_t * pQos,
+ MQTTPublishState_t * pCurrentState )
+{
+ size_t index = 0;
+
+ *pCurrentState = MQTTStateNull;
+
+ if( packetId == MQTT_PACKET_ID_INVALID )
+ {
+ index = recordCount;
+ }
+ else
+ {
+ for( index = 0; index < recordCount; index++ )
+ {
+ if( records[ index ].packetId == packetId )
+ {
+ *pQos = records[ index ].qos;
+ *pCurrentState = records[ index ].publishState;
+ break;
+ }
+ }
+ }
+
+ return index;
+}
+
+/*-----------------------------------------------------------*/
+
+static void compactRecords( MQTTPubAckInfo_t * records,
+ size_t recordCount )
+{
+ size_t index = 0;
+ size_t emptyIndex = MQTT_STATE_ARRAY_MAX_COUNT;
+
+ assert( records != NULL );
+
+ /* Find the empty spots and fill those with non empty values. */
+ for( ; index < recordCount; index++ )
+ {
+ /* Find the first empty spot. */
+ if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
+ {
+ if( emptyIndex == MQTT_STATE_ARRAY_MAX_COUNT )
+ {
+ emptyIndex = index;
+ }
+ }
+ else
+ {
+ if( emptyIndex != MQTT_STATE_ARRAY_MAX_COUNT )
+ {
+ /* Copy over the contents at non empty index to empty index. */
+ records[ emptyIndex ].packetId = records[ index ].packetId;
+ records[ emptyIndex ].qos = records[ index ].qos;
+ records[ emptyIndex ].publishState = records[ index ].publishState;
+
+ /* Mark the record at current non empty index as invalid. */
+ records[ index ].packetId = MQTT_PACKET_ID_INVALID;
+
+ /* Advance the emptyIndex. */
+ emptyIndex++;
+ }
+ }
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
+ size_t recordCount,
+ uint16_t packetId,
+ MQTTQoS_t qos,
+ MQTTPublishState_t publishState )
+{
+ MQTTStatus_t status = MQTTNoMemory;
+ int32_t index = 0;
+ size_t availableIndex = recordCount;
+ bool validEntryFound = false;
+
+ assert( packetId != MQTT_PACKET_ID_INVALID );
+ assert( qos != MQTTQoS0 );
+
+ /* Check if we have to compact the records. This is known by checking if
+ * the last spot in the array is filled. */
+ if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID )
+ {
+ compactRecords( records, recordCount );
+ }
+
+ /* Start from end so first available index will be populated.
+ * Available index is always found after the last element in the records.
+ * This is to make sure the relative order of the records in order to meet
+ * the message ordering requirement of MQTT spec 3.1.1. */
+ for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- )
+ {
+ /* Available index is only found after packet at the highest index. */
+ if( records[ index ].packetId == MQTT_PACKET_ID_INVALID )
+ {
+ if( validEntryFound == false )
+ {
+ availableIndex = ( size_t ) index;
+ }
+ }
+ else
+ {
+ /* A non-empty spot found in the records. */
+ validEntryFound = true;
+
+ if( records[ index ].packetId == packetId )
+ {
+ /* Collision. */
+ LogError( ( "Collision when adding PacketID=%u at index=%u",
+ packetId,
+ index ) );
+
+ status = MQTTStateCollision;
+ availableIndex = recordCount;
+ break;
+ }
+ }
+ }
+
+ if( availableIndex < recordCount )
+ {
+ records[ availableIndex ].packetId = packetId;
+ records[ availableIndex ].qos = qos;
+ records[ availableIndex ].publishState = publishState;
+ status = MQTTSuccess;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static void updateRecord( MQTTPubAckInfo_t * records,
+ size_t recordIndex,
+ MQTTPublishState_t newState,
+ bool shouldDelete )
+{
+ assert( records != NULL );
+
+ if( shouldDelete == true )
+ {
+ /* Mark the record as invalid. */
+ records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID;
+ }
+ else
+ {
+ records[ recordIndex ].publishState = newState;
+ }
+}
+
+/*-----------------------------------------------------------*/
+
+static uint16_t stateSelect( const MQTTContext_t * pMqttContext,
+ uint16_t searchStates,
+ MQTTStateCursor_t * pCursor )
+{
+ uint16_t packetId = MQTT_PACKET_ID_INVALID;
+ uint16_t outgoingStates = 0U;
+ const MQTTPubAckInfo_t * records = NULL;
+ bool stateCheck = false;
+
+ assert( pMqttContext != NULL );
+ assert( searchStates != 0U );
+ assert( pCursor != NULL );
+
+ /* Create a bit map with all the outgoing publish states. */
+ UINT16_SET_BIT( outgoingStates, MQTTPublishSend );
+ UINT16_SET_BIT( outgoingStates, MQTTPubAckPending );
+ UINT16_SET_BIT( outgoingStates, MQTTPubRecPending );
+ UINT16_SET_BIT( outgoingStates, MQTTPubRelSend );
+ UINT16_SET_BIT( outgoingStates, MQTTPubCompPending );
+
+ /* Only outgoing publish records need to be searched. */
+ assert( ( outgoingStates & searchStates ) > 0U );
+ assert( ( ~outgoingStates & searchStates ) == 0 );
+
+ records = pMqttContext->outgoingPublishRecords;
+
+ while( *pCursor < MQTT_STATE_ARRAY_MAX_COUNT )
+ {
+ /* Check if any of the search states are present. */
+ stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState ) ? true : false;
+
+ if( stateCheck == true )
+ {
+ packetId = records[ *pCursor ].packetId;
+ ( *pCursor )++;
+ break;
+ }
+
+ ( *pCursor )++;
+ }
+
+ return packetId;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos )
+{
+ MQTTPublishState_t calculatedState = MQTTStateNull;
+ /* There are more QoS2 cases than QoS1, so initialize to that. */
+ bool qosValid = ( qos == MQTTQoS2 ) ? true : false;
+
+ switch( packetType )
+ {
+ case MQTTPuback:
+ qosValid = ( qos == MQTTQoS1 ) ? true : false;
+ calculatedState = MQTTPublishDone;
+ break;
+
+ case MQTTPubrec:
+
+ /* Incoming publish: send PUBREC, PUBREL pending.
+ * Outgoing publish: receive PUBREC, send PUBREL. */
+ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend;
+ break;
+
+ case MQTTPubrel:
+
+ /* Incoming publish: receive PUBREL, send PUBCOMP.
+ * Outgoing publish: send PUBREL, PUBCOMP pending. */
+ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend;
+ break;
+
+ case MQTTPubcomp:
+ calculatedState = MQTTPublishDone;
+ break;
+
+ default:
+ /* No other ack type. */
+ break;
+ }
+
+ /* Sanity check, make sure ack and QoS agree. */
+ if( qosValid == false )
+ {
+ calculatedState = MQTTStateNull;
+ }
+
+ return calculatedState;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records,
+ size_t recordIndex,
+ uint16_t packetId,
+ MQTTPublishState_t currentState,
+ MQTTPublishState_t newState )
+{
+ MQTTStatus_t status = MQTTIllegalState;
+ bool shouldDeleteRecord = false;
+ bool isTransitionValid = false;
+
+ assert( records != NULL );
+
+ /* Record to be deleted if the state transition is completed or if a PUBREC
+ * is received for an outgoing QoS2 publish. When a PUBREC is received,
+ * record is deleted and added back to the end of the records to maintain
+ * ordering for PUBRELs. */
+ shouldDeleteRecord = ( ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend ) ) ? true : false;
+ isTransitionValid = validateTransitionAck( currentState, newState );
+
+ if( isTransitionValid == true )
+ {
+ status = MQTTSuccess;
+
+ /* Update record for acks. When sending or receiving acks for packets that
+ * are resent during a session reestablishment, the new state and
+ * current state can be the same. No update of record required in that case. */
+ if( currentState != newState )
+ {
+ updateRecord( records,
+ recordIndex,
+ newState,
+ shouldDeleteRecord );
+
+ /* For QoS2 messages, in order to preserve the message ordering, when
+ * a PUBREC is received for an outgoing publish, the record should be
+ * moved to the last. This move will help preserve the order in which
+ * a PUBREL needs to be resent in case of a session reestablishment. */
+ if( newState == MQTTPubRelSend )
+ {
+ status = addRecord( records,
+ MQTT_STATE_ARRAY_MAX_COUNT,
+ packetId,
+ MQTTQoS2,
+ MQTTPubRelSend );
+ }
+ }
+ }
+ else
+ {
+ /* Invalid state transition. */
+ LogError( ( "Invalid transition from state %s to state %s.",
+ MQTT_State_strerror( currentState ),
+ MQTT_State_strerror( newState ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+static MQTTStatus_t updateStatePublish( MQTTContext_t * pMqttContext,
+ size_t recordIndex,
+ uint16_t packetId,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos,
+ MQTTPublishState_t currentState,
+ MQTTPublishState_t newState )
+{
+ MQTTStatus_t status = MQTTSuccess;
+ bool isTransitionValid = false;
+
+ assert( pMqttContext != NULL );
+ assert( packetId != MQTT_PACKET_ID_INVALID );
+ assert( qos != MQTTQoS0 );
+
+ /* This will always succeed for an incoming publish. This is due to the fact
+ * that the passed in currentState must be MQTTStateNull, since
+ * #MQTT_UpdateStatePublish does not perform a lookup for receives. */
+ isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos );
+
+ if( isTransitionValid == true )
+ {
+ /* addRecord will check for collisions. */
+ if( opType == MQTT_RECEIVE )
+ {
+ status = addRecord( pMqttContext->incomingPublishRecords,
+ MQTT_STATE_ARRAY_MAX_COUNT,
+ packetId,
+ qos,
+ newState );
+ }
+ /* Send operation. */
+ else
+ {
+ /* Skip updating record when publish is resend and no state
+ * update is required. */
+ if( currentState != newState )
+ {
+ updateRecord( pMqttContext->outgoingPublishRecords, recordIndex, newState, false );
+ }
+ }
+ }
+ else
+ {
+ status = MQTTIllegalState;
+ LogError( ( "Invalid transition from state %s to state %s.",
+ MQTT_State_strerror( currentState ),
+ MQTT_State_strerror( newState ) ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTQoS_t qos )
+{
+ MQTTStatus_t status = MQTTSuccess;
+
+ if( qos == MQTTQoS0 )
+ {
+ status = MQTTSuccess;
+ }
+ else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) )
+ {
+ status = MQTTBadParameter;
+ }
+ else
+ {
+ /* Collisions are detected when adding the record. */
+ status = addRecord( pMqttContext->outgoingPublishRecords,
+ MQTT_STATE_ARRAY_MAX_COUNT,
+ packetId,
+ qos,
+ MQTTPublishSend );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType,
+ MQTTQoS_t qos )
+{
+ MQTTPublishState_t calculatedState = MQTTStateNull;
+
+ switch( qos )
+ {
+ case MQTTQoS0:
+ calculatedState = MQTTPublishDone;
+ break;
+
+ case MQTTQoS1:
+ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend;
+ break;
+
+ case MQTTQoS2:
+ calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend;
+ break;
+
+ default:
+ /* No other QoS values. */
+ break;
+ }
+
+ return calculatedState;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTStateOperation_t opType,
+ MQTTQoS_t qos,
+ MQTTPublishState_t * pNewState )
+{
+ MQTTPublishState_t newState = MQTTStateNull;
+ MQTTPublishState_t currentState = MQTTStateNull;
+ MQTTStatus_t mqttStatus = MQTTSuccess;
+ size_t recordIndex = MQTT_STATE_ARRAY_MAX_COUNT;
+ MQTTQoS_t foundQoS = MQTTQoS0;
+
+ if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p",
+ pMqttContext,
+ pNewState ) );
+
+ mqttStatus = MQTTBadParameter;
+ }
+ else if( qos == MQTTQoS0 )
+ {
+ /* QoS 0 publish. Do nothing. */
+ *pNewState = MQTTPublishDone;
+ }
+ else if( packetId == MQTT_PACKET_ID_INVALID )
+ {
+ /* Publishes > QoS 0 need a valid packet ID. */
+ mqttStatus = MQTTBadParameter;
+ }
+ else if( opType == MQTT_SEND )
+ {
+ /* Search record for entry so we can check QoS. */
+ recordIndex = findInRecord( pMqttContext->outgoingPublishRecords,
+ MQTT_STATE_ARRAY_MAX_COUNT,
+ packetId,
+ &foundQoS,
+ &currentState );
+
+ if( ( recordIndex == MQTT_STATE_ARRAY_MAX_COUNT ) || ( foundQoS != qos ) )
+ {
+ /* Entry should match with supplied QoS. */
+ mqttStatus = MQTTBadParameter;
+ }
+ }
+ else
+ {
+ /* QoS 1 or 2 receive. Nothing to be done. */
+ }
+
+ if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) )
+ {
+ newState = MQTT_CalculateStatePublish( opType, qos );
+ /* Validate state transition and update state records. */
+ mqttStatus = updateStatePublish( pMqttContext,
+ recordIndex,
+ packetId,
+ opType,
+ qos,
+ currentState,
+ newState );
+
+ /* Update output parameter on success. */
+ if( mqttStatus == MQTTSuccess )
+ {
+ *pNewState = newState;
+ }
+ }
+
+ return mqttStatus;
+}
+
+/*-----------------------------------------------------------*/
+
+MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext,
+ uint16_t packetId,
+ MQTTPubAckType_t packetType,
+ MQTTStateOperation_t opType,
+ MQTTPublishState_t * pNewState )
+{
+ MQTTPublishState_t newState = MQTTStateNull;
+ MQTTPublishState_t currentState = MQTTStateNull;
+ bool isOutgoingPublish = isPublishOutgoing( packetType, opType );
+ MQTTQoS_t qos = MQTTQoS0;
+ size_t recordIndex = MQTT_STATE_ARRAY_MAX_COUNT;
+ MQTTPubAckInfo_t * records = NULL;
+ MQTTStatus_t status = MQTTBadParameter;
+
+ if( ( pMqttContext == NULL ) || ( pNewState == NULL ) )
+ {
+ LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.",
+ pMqttContext,
+ pNewState ) );
+ }
+ else
+ {
+ if( isOutgoingPublish == true )
+ {
+ records = pMqttContext->outgoingPublishRecords;
+ }
+ else
+ {
+ records = pMqttContext->incomingPublishRecords;
+ }
+
+ recordIndex = findInRecord( records,
+ MQTT_STATE_ARRAY_MAX_COUNT,
+ packetId,
+ &qos,
+ &currentState );
+ }
+
+ if( recordIndex < MQTT_STATE_ARRAY_MAX_COUNT )
+ {
+ newState = MQTT_CalculateStateAck( packetType, opType, qos );
+
+ /* Validate state transition and update state record. */
+ status = updateStateAck( records, recordIndex, packetId, currentState, newState );
+
+ /* Update the output parameter. */
+ if( status == MQTTSuccess )
+ {
+ *pNewState = newState;
+ }
+ }
+ else
+ {
+ LogError( ( "No matching record found for publish %u.", packetId ) );
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext,
+ MQTTStateCursor_t * pCursor,
+ MQTTPublishState_t * pState )
+{
+ uint16_t packetId = MQTT_PACKET_ID_INVALID;
+ uint16_t searchStates = 0U;
+
+ /* Validate arguments. */
+ if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) )
+ {
+ LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p"
+ " pState=%p.",
+ pMqttContext,
+ pCursor,
+ pState ) );
+ }
+ else
+ {
+ /* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend
+ * would need to be resent when a session is reestablished.*/
+ UINT16_SET_BIT( searchStates, MQTTPubCompPending );
+ UINT16_SET_BIT( searchStates, MQTTPubRelSend );
+ packetId = stateSelect( pMqttContext, searchStates, pCursor );
+
+ /* The state needs to be in #MQTTPubRelSend for sending PUBREL. */
+ if( packetId != MQTT_PACKET_ID_INVALID )
+ {
+ *pState = MQTTPubRelSend;
+ }
+ }
+
+ return packetId;
+}
+
+/*-----------------------------------------------------------*/
+
+uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext,
+ MQTTStateCursor_t * pCursor )
+{
+ uint16_t packetId = MQTT_PACKET_ID_INVALID;
+ uint16_t searchStates = 0U;
+
+ /* Validate arguments. */
+ if( ( pMqttContext == NULL ) || ( pCursor == NULL ) )
+ {
+ LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p",
+ pMqttContext,
+ pCursor ) );
+ }
+ else
+ {
+ /* Packets in state #MQTTPublishSend, #MQTTPubAckPending and
+ * #MQTTPubRecPending would need to be resent when a session is
+ * reestablished. */
+ UINT16_SET_BIT( searchStates, MQTTPublishSend );
+ UINT16_SET_BIT( searchStates, MQTTPubAckPending );
+ UINT16_SET_BIT( searchStates, MQTTPubRecPending );
+
+ packetId = stateSelect( pMqttContext, searchStates, pCursor );
+ }
+
+ return packetId;
+}
+
+/*-----------------------------------------------------------*/
+
+const char * MQTT_State_strerror( MQTTPublishState_t state )
+{
+ const char * str = NULL;
+
+ switch( state )
+ {
+ case MQTTStateNull:
+ str = "MQTTStateNull";
+ break;
+
+ case MQTTPublishSend:
+ str = "MQTTPublishSend";
+ break;
+
+ case MQTTPubAckSend:
+ str = "MQTTPubAckSend";
+ break;
+
+ case MQTTPubRecSend:
+ str = "MQTTPubRecSend";
+ break;
+
+ case MQTTPubRelSend:
+ str = "MQTTPubRelSend";
+ break;
+
+ case MQTTPubCompSend:
+ str = "MQTTPubCompSend";
+ break;
+
+ case MQTTPubAckPending:
+ str = "MQTTPubAckPending";
+ break;
+
+ case MQTTPubRecPending:
+ str = "MQTTPubRecPending";
+ break;
+
+ case MQTTPubRelPending:
+ str = "MQTTPubRelPending";
+ break;
+
+ case MQTTPubCompPending:
+ str = "MQTTPubCompPending";
+ break;
+
+ case MQTTPublishDone:
+ str = "MQTTPublishDone";
+ break;
+
+ default:
+ /* Invalid state received. */
+ str = "Invalid MQTT State";
+ break;
+ }
+
+ return str;
+}
+
+/*-----------------------------------------------------------*/
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h
new file mode 100644
index 000000000..c62489573
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h
@@ -0,0 +1,23 @@
+#ifndef MQTT_INTERNAL_H_
+#define MQTT_INTERNAL_H_
+
+/* Include config file before other headers. */
+#include "mqtt_config.h"
+
+#ifndef LogError
+ #define LogError( message )
+#endif
+
+#ifndef LogWarn
+ #define LogWarn( message )
+#endif
+
+#ifndef LogInfo
+ #define LogInfo( message )
+#endif
+
+#ifndef LogDebug
+ #define LogDebug( message )
+#endif
+
+#endif /* ifndef MQTT_INTERNAL_H_ */