diff options
author | Muneeb Ahmed <54290492+muneebahmed10@users.noreply.github.com> | 2020-06-30 16:57:50 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-30 16:57:50 -0700 |
commit | 57df8b82109bc1e20a7a1816e985feb55975ce90 (patch) | |
tree | ce87879f4d05803b02a0cdf2f032e339e8d0d0c1 | |
parent | 0bcbf43fee889df413137678094a8bb314591ed7 (diff) | |
download | freertos-git-57df8b82109bc1e20a7a1816e985feb55975ce90.tar.gz |
Copy MQTT files from 10174c4d of CSDK development (#111)
* Copy MQTT files from 6d4e47f3 of CSDK development
* Change case
* Update MQTT to commit a4ad8baf
* Update MQTT to commit 10174c4d
9 files changed, 6195 insertions, 0 deletions
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt new file mode 100755 index 000000000..ad96185c7 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/directories.txt @@ -0,0 +1,3 @@ ++ standard +Contains the implementation of IoT libraries that implement standard protocols +and interfaces, such as MQTT. diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt new file mode 100755 index 000000000..721866f72 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/directories.txt @@ -0,0 +1,2 @@ ++ mqtt +Contains the implementation of the MQTT library. diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h new file mode 100644 index 000000000..7dfef1285 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h @@ -0,0 +1,353 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef MQTT_H +#define MQTT_H + +/* Include config file before other headers. */ +#include "mqtt_config.h" +#include "mqtt_lightweight.h" + +/** + * @brief Invalid packet identifier. + * + * Zero is an invalid packet identifier as per MQTT v3.1.1 spec. + */ +#define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U ) + +struct MQTTApplicationCallbacks; +typedef struct MQTTApplicationCallbacks MQTTApplicationCallbacks_t; + +struct MQTTPubAckInfo; +typedef struct MQTTPubAckInfo MQTTPubAckInfo_t; + +struct MQTTContext; +typedef struct MQTTContext MQTTContext_t; + +struct MQTTTransportInterface; +typedef struct MQTTTransportInterface MQTTTransportInterface_t; + +typedef int32_t (* MQTTTransportSendFunc_t )( NetworkContext_t context, + const void * pMessage, + size_t bytesToSend ); + +typedef uint32_t (* MQTTGetCurrentTimeFunc_t )( void ); + +typedef void (* MQTTEventCallback_t )( MQTTContext_t * pContext, + MQTTPacketInfo_t * pPacketInfo, + uint16_t packetIdentifier, + MQTTPublishInfo_t * pPublishInfo ); + +typedef enum MQTTConnectionStatus +{ + MQTTNotConnected, + MQTTConnected +} MQTTConnectionStatus_t; + +typedef enum MQTTPublishState +{ + MQTTStateNull = 0, + MQTTPublishSend, + MQTTPubAckSend, + MQTTPubRecSend, + MQTTPubRelSend, + MQTTPubCompSend, + MQTTPubAckPending, + MQTTPubRelPending, + MQTTPubRecPending, + MQTTPubCompPending, + MQTTPublishDone +} MQTTPublishState_t; + +typedef enum MQTTPubAckType +{ + MQTTPuback, + MQTTPubrec, + MQTTPubrel, + MQTTPubcomp +} MQTTPubAckType_t; + +struct MQTTTransportInterface +{ + MQTTTransportSendFunc_t send; + MQTTTransportRecvFunc_t recv; + NetworkContext_t networkContext; +}; + +struct MQTTApplicationCallbacks +{ + MQTTGetCurrentTimeFunc_t getTime; + MQTTEventCallback_t appCallback; +}; + +struct MQTTPubAckInfo +{ + uint16_t packetId; + MQTTQoS_t qos; + MQTTPublishState_t publishState; +}; + +struct MQTTContext +{ + MQTTPubAckInfo_t outgoingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ]; + size_t outgoingPublishCount; + MQTTPubAckInfo_t incomingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ]; + size_t incomingPublishCount; + + MQTTTransportInterface_t transportInterface; + MQTTFixedBuffer_t networkBuffer; + + uint16_t nextPacketId; + MQTTConnectionStatus_t connectStatus; + MQTTApplicationCallbacks_t callbacks; + uint32_t lastPacketTime; + bool controlPacketSent; + + /* Keep alive members. */ + uint16_t keepAliveIntervalSec; + uint32_t pingReqSendTimeMs; + uint32_t pingRespTimeoutMs; + bool waitingForPingResp; +}; + +/** + * @brief Initialize an MQTT context. + * + * This function must be called on an MQTT context before any other function. + * + * @note The getTime callback function must be defined. If there is no time + * implementation, it is the responsibility of the application to provide a + * dummy function to always return 0, and provide 0 timeouts for functions. This + * will ensure all time based functions will run for a single iteration. + * + * @brief param[in] pContext The context to initialize. + * @brief param[in] pTransportInterface The transport interface to use with the context. + * @brief param[in] pCallbacks Callbacks to use with the context. + * @brief param[in] pNetworkBuffer Network buffer provided for the context. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, + const MQTTTransportInterface_t * pTransportInterface, + const MQTTApplicationCallbacks_t * pCallbacks, + const MQTTFixedBuffer_t * pNetworkBuffer ); + +/** + * @brief Establish an MQTT session. + * + * This function will send MQTT CONNECT packet and receive a CONNACK packet. The + * send and receive from the network is done through the transport interface. + * + * The maximum time this function waits for a CONNACK is decided in one of the + * following ways: + * 1. If #timeoutMs is greater than 0: + * #getTime is used to ensure that the function does not wait more than #timeoutMs + * for CONNACK. + * 2. If #timeoutMs is 0: + * The network receive for CONNACK is retried up to the number of times configured + * by #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pConnectInfo MQTT CONNECT packet information. + * @param[in] pWillInfo Last Will and Testament. Pass NULL if Last Will and + * Testament is not used. + * @param[in] timeoutMs Maximum time in milliseconds to wait for a CONNACK packet. + * A zero timeout makes use of the retries for receiving CONNACK as configured with + * #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT . + * @param[out] pSessionPresent Whether a previous session was present. + * Only relevant if not establishing a clean session. + * + * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to + * hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport send failed; + * #MQTTRecvFailed if transport receive failed for CONNACK; + * #MQTTNoDataAvailable if no data available to receive in transport until + * the #timeoutMs for CONNACK; + * #MQTTSuccess otherwise. + * + * @note This API may spend more time than provided in the timeoutMS parameters in + * certain conditions as listed below: + * + * 1. Timeouts are incorrectly configured - If the timeoutMS is less than the + * transport receive timeout and if a CONNACK packet is not received within + * the transport receive timeout, the API will spend the transport receive + * timeout (which is more time than the timeoutMs). It is the case of incorrect + * timeout configuration as the timeoutMs parameter passed to this API must be + * greater than the transport receive timeout. Please refer to the transport + * interface documentation for more details about timeout configurations. + * + * 2. Partial CONNACK packet is received right before the expiry of the timeout - It + * is possible that first two bytes of CONNACK packet (packet type and remaining + * length) are received right before the expiry of the timeoutMS. In that case, + * the API makes one more network receive call in an attempt to receive the remaining + * 2 bytes. In the worst case, it can happen that the remaining 2 bytes are never + * received and this API will end up spending timeoutMs + transport receive timeout. + */ +MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + uint32_t timeoutMs, + bool * pSessionPresent ); + +/** + * @brief Sends MQTT SUBSCRIBE for the given list of topic filters to + * the broker. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * + * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to + * hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport write failed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ); + +/** + * @brief Publishes a message to the given topic name. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @brief param[in] packetId packet ID generated by #MQTT_GetPacketId. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport write failed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId ); + +/** + * @brief Sends an MQTT PINGREQ to broker. + * + * @param[in] pContext Initialized and connected MQTT context. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport write failed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); + +/** + * @brief Sends MQTT UNSUBSCRIBE for the given list of topic filters to + * the broker. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * + * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to + * hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport write failed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ); + +/** + * @brief Disconnect an MQTT session. + * + * @param[in] pContext Initialized and connected MQTT context. + * + * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to + * hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSendFailed if transport send failed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ); + +/** + * @brief Loop to receive packets from the transport interface. Handles keep + * alive. + * + * @param[in] pContext Initialized and connected MQTT context. + * @param[in] timeoutMs Minimum time in milliseconds that the receive loop will + * run, unless an error occurs. + * + * @return #MQTTBadParameter if context is NULL; + * #MQTTRecvFailed if a network error occurs during reception; + * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; + * #MQTTBadResponse if an invalid packet is received; + * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before + * pContext->pingRespTimeoutMs milliseconds; + * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an + * invalid transition for the internal state machine; + * #MQTTSuccess on success. + */ +MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext, + uint32_t timeoutMs ); + +/** + * @brief Loop to receive packets from the transport interface. Does not handle + * keep alive. + * + * @note Passing a timeout value of 0 will run the loop for a single iteration. + * + * @param[in] pContext Initialized and connected MQTT context. + * @param[in] timeoutMs Minimum time in milliseconds that the receive loop will + * run, unless an error occurs. + * + * @return #MQTTBadParameter if context is NULL; + * #MQTTRecvFailed if a network error occurs during reception; + * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; + * #MQTTBadResponse if an invalid packet is received; + * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an + * invalid transition for the internal state machine; + * #MQTTSuccess on success. + */ +MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext, + uint32_t timeoutMs ); + +/** + * @brief Get a packet ID that is valid according to the MQTT 3.1.1 spec. + * + * @param[in] pContext Initialized MQTT context. + * + * @return A non-zero number. + */ +uint16_t MQTT_GetPacketId( MQTTContext_t * pContext ); + +/** + * @brief Error code to string conversion for MQTT statuses. + * + * @param[in] status The status to convert to a string. + * + * @return The string representation of the status. + */ +const char * MQTT_Status_strerror( MQTTStatus_t status ); + +#endif /* ifndef MQTT_H */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h new file mode 100644 index 000000000..e52b0790f --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h @@ -0,0 +1,527 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef MQTT_LIGHTWEIGHT_H +#define MQTT_LIGHTWEIGHT_H + +#include <stddef.h> +#include <stdint.h> + +/* bools are only defined in C99+ */ +#if defined( __cplusplus ) || ( defined( __STDC_VERSION__ ) && ( __STDC_VERSION__ >= 199901L ) ) + #include <stdbool.h> +#elif !defined( bool ) + #define bool int8_t + #define false ( int8_t ) 0 + #define true ( int8_t ) 1 +#endif + +/* Include config file before other headers. */ +#include "mqtt_config.h" + +/* MQTT packet types. */ +#define MQTT_PACKET_TYPE_CONNECT ( ( uint8_t ) 0x10U ) /**< @brief CONNECT (client-to-server). */ +#define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */ +#define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bidirectional). */ +#define MQTT_PACKET_TYPE_PUBACK ( ( uint8_t ) 0x40U ) /**< @brief PUBACK (bidirectional). */ +#define MQTT_PACKET_TYPE_PUBREC ( ( uint8_t ) 0x50U ) /**< @brief PUBREC (bidirectional). */ +#define MQTT_PACKET_TYPE_PUBREL ( ( uint8_t ) 0x62U ) /**< @brief PUBREL (bidirectional). */ +#define MQTT_PACKET_TYPE_PUBCOMP ( ( uint8_t ) 0x70U ) /**< @brief PUBCOMP (bidirectional). */ +#define MQTT_PACKET_TYPE_SUBSCRIBE ( ( uint8_t ) 0x82U ) /**< @brief SUBSCRIBE (client-to-server). */ +#define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */ +#define MQTT_PACKET_TYPE_UNSUBSCRIBE ( ( uint8_t ) 0xA2U ) /**< @brief UNSUBSCRIBE (client-to-server). */ +#define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xB0U ) /**< @brief UNSUBACK (server-to-client). */ +#define MQTT_PACKET_TYPE_PINGREQ ( ( uint8_t ) 0xC0U ) /**< @brief PINGREQ (client-to-server). */ +#define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xD0U ) /**< @brief PINGRESP (server-to-client). */ +#define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xE0U ) /**< @brief DISCONNECT (client-to-server). */ + + +/** + * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec. + */ +#define MQTT_PUBLISH_ACK_PACKET_SIZE ( 4UL ) + +struct MQTTFixedBuffer; +typedef struct MQTTFixedBuffer MQTTFixedBuffer_t; + +struct MQTTConnectInfo; +typedef struct MQTTConnectInfo MQTTConnectInfo_t; + +struct MQTTSubscribeInfo; +typedef struct MQTTSubscribeInfo MQTTSubscribeInfo_t; + +struct MqttPublishInfo; +typedef struct MqttPublishInfo MQTTPublishInfo_t; + +struct MQTTPacketInfo; +typedef struct MQTTPacketInfo MQTTPacketInfo_t; + +/** + * @brief Signature of the transport interface receive function. + * + * A function with this signature must be provided to the MQTT library to read + * data off the network. + * + * @param[in] context The network context provided with this function. + * @param[out] pBuffer Buffer to receive network data. + * @param[in] bytesToRecv Bytes to receive from the network. pBuffer must be at + * least this size. + * + * @return The number of bytes received; negative value on failure. + */ +typedef int32_t (* MQTTTransportRecvFunc_t )( NetworkContext_t context, + void * pBuffer, + size_t bytesToRecv ); + +/** + * @brief Return codes from MQTT functions. + */ +typedef enum MQTTStatus +{ + MQTTSuccess = 0, /**< Function completed successfully. */ + MQTTBadParameter, /**< At least one parameter was invalid. */ + MQTTNoMemory, /**< A provided buffer was too small. */ + MQTTSendFailed, /**< The transport send function failed. */ + MQTTRecvFailed, /**< The transport receive function failed. */ + MQTTBadResponse, /**< An invalid packet was received from the server. */ + MQTTServerRefused, /**< The server refused a CONNECT or SUBSCRIBE. */ + MQTTNoDataAvailable, /**< No data available from the transport interface. */ + MQTTIllegalState, /**< An illegal state in the state record. */ + MQTTStateCollision, /**< A collision with an existing state record entry. */ + MQTTKeepAliveTimeout /**< Timeout while waiting for PINGRESP. */ +} MQTTStatus_t; + +/** + * @brief MQTT Quality of Service values. + */ +typedef enum MQTTQoS +{ + MQTTQoS0 = 0, /**< Delivery at most once. */ + MQTTQoS1 = 1, /**< Delivery at least once. */ + MQTTQoS2 = 2 /**< Delivery exactly once. */ +} MQTTQoS_t; + +/** + * @brief Buffer passed to MQTT library. + * + * These buffers are not copied and must remain in scope for the duration of the + * MQTT operation. + */ +struct MQTTFixedBuffer +{ + uint8_t * pBuffer; /**< @brief Pointer to buffer. */ + size_t size; /**< @brief Size of buffer. */ +}; + +/** + * @brief MQTT CONNECT packet parameters. + */ +struct MQTTConnectInfo +{ + /** + * @brief Whether to establish a new, clean session or resume a previous session. + */ + bool cleanSession; + + /** + * @brief MQTT keep alive period. + */ + uint16_t keepAliveSeconds; + + /** + * @brief MQTT client identifier. Must be unique per client. + */ + const char * pClientIdentifier; + + /** + * @brief Length of the client identifier. + */ + uint16_t clientIdentifierLength; + + /** + * @brief MQTT user name. Set to NULL if not used. + */ + const char * pUserName; + + /** + * @brief Length of MQTT user name. Set to 0 if not used. + */ + uint16_t userNameLength; + + /** + * @brief MQTT password. Set to NULL if not used. + */ + const char * pPassword; + + /** + * @brief Length of MQTT password. Set to 0 if not used. + */ + uint16_t passwordLength; +}; + +/** + * @brief MQTT SUBSCRIBE packet parameters. + */ +struct MQTTSubscribeInfo +{ + /** + * @brief Quality of Service for subscription. + */ + MQTTQoS_t qos; + + /** + * @brief Topic filter to subscribe to. + */ + const char * pTopicFilter; + + /** + * @brief Length of subscription topic filter. + */ + uint16_t topicFilterLength; +}; + +/** + * @brief MQTT PUBLISH packet parameters. + */ +struct MqttPublishInfo +{ + /** + * @brief Quality of Service for message. + */ + MQTTQoS_t qos; + + /** + * @brief Whether this is a retained message. + */ + bool retain; + + /** + * @brief Whether this is a duplicate publish message. + */ + bool dup; + + /** + * @brief Topic name on which the message is published. + */ + const char * pTopicName; + + /** + * @brief Length of topic name. + */ + uint16_t topicNameLength; + + /** + * @brief Message payload. + */ + const void * pPayload; + + /** + * @brief Message payload length. + */ + size_t payloadLength; +}; + +/** + * @brief MQTT incoming packet parameters. + */ +struct MQTTPacketInfo +{ + /** + * @brief Type of incoming MQTT packet. + */ + uint8_t type; + + /** + * @brief Remaining serialized data in the MQTT packet. + */ + uint8_t * pRemainingData; + + /** + * @brief Length of remaining serialized data. + */ + size_t remainingLength; +}; + +/** + * @brief Get the size and Remaining Length of an MQTT CONNECT packet. + * + * @param[in] pConnectInfo MQTT CONNECT packet parameters. + * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used. + * @param[out] pRemainingLength The Remaining Length of the MQTT CONNECT packet. + * @param[out] pPacketSize The total size of the MQTT CONNECT packet. + * + * @return #MQTTBadParameter if the packet would exceed the size allowed by the + * MQTT spec; #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t * pRemainingLength, + size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT CONNECT packet in the given buffer. + * + * @param[in] pConnectInfo MQTT CONNECT packet parameters. + * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize. + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Get packet size and Remaining Length of an MQTT SUBSCRIBE packet. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[out] pRemainingLength The Remaining Length of the MQTT SUBSCRIBE packet. + * @param[out] pPacketSize The total size of the MQTT SUBSCRIBE packet. + * + * @return #MQTTBadParameter if the packet would exceed the size allowed by the + * MQTT spec; #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT SUBSCRIBE packet in the given buffer. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetSubscribePacketSize. + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Get packet size and Remaining Length of an MQTT UNSUBSCRIBE packet. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[out] pRemainingLength The Remaining Length of the MQTT UNSUBSCRIBE packet. + * @param[out] pPacketSize The total size of the MQTT UNSUBSCRIBE packet. + * + * @return #MQTTBadParameter if the packet would exceed the size allowed by the + * MQTT spec; #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT UNSUBSCRIBE packet in the given buffer. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetUnsubscribePacketSize. + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Get the packet size and remaining length of an MQTT PUBLISH packet. + * + * @param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @param[out] pRemainingLength The Remaining Length of the MQTT PUBLISH packet. + * @param[out] pPacketSize The total size of the MQTT PUBLISH packet. + * + * @return #MQTTBadParameter if the packet would exceed the size allowed by the + * MQTT spec or if invalid parameters are passed; #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, + size_t * pRemainingLength, + size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT PUBLISH packet in the given buffer. + * + * This function will serialize complete MQTT PUBLISH packet into + * the given buffer. If the PUBLISH payload can be sent separately, + * consider using #MQTT_SerializePublishHeader, which will serialize + * only the PUBLISH header into the buffer. + * + * @param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize. + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Serialize an MQTT PUBLISH packet header in the given buffer. + * + * This function serializes PUBLISH header in to the given buffer. Payload + * for PUBLISH will not be copied over to the buffer. This will help reduce + * the memory needed for the buffer and avoid an unwanted copy operataion of + * PUBLISH payload into the buffer. If payload also would need to be part of + * the serialized buffer, consider using #MQTT_SerializePublish. + * + * @param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] remainingLength Remaining Length provided by #MQTT_GetConnectPacketSize. + * @param[out] pBuffer Buffer for packet serialization. + * @param[out] pHeaderSize Size of the serialized MQTT PUBLISH header. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer, + size_t * pHeaderSize ); + +/** + * @brief Serialize an MQTT PUBACK, PUBREC, PUBREL, or PUBCOMP into the given + * buffer. + * + * @param[out] pBuffer Buffer for packet serialization. + * @param[in] packetType Byte of the corresponding packet fixed header per the + * MQTT spec. + * @param[in] packetId Packet ID of the publish. + * + * @return #MQTTBadParameter, #MQTTNoMemory, or #MQTTSuccess. + */ +MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pBuffer, + uint8_t packetType, + uint16_t packetId ); + +/** + * @brief Get the size of an MQTT DISCONNECT packet. + * + * @param[out] pPacketSize The size of the MQTT DISCONNECT packet. + * + * @return Always returns #MQTTSuccess. + */ +MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT DISCONNECT packet into the given buffer. + * + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Get the size of an MQTT PINGREQ packet. + * + * @param[out] pPacketSize The size of the MQTT PINGREQ packet. + * + * @return #MQTTSuccess or #MQTTBadParameter if pPacketSize is NULL. + */ +MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize ); + +/** + * @brief Serialize an MQTT PINGREQ packet into the given buffer. + * + * @param[out] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Deserialize an MQTT PUBLISH packet. + * + * @param[in] pIncomingPacket #MQTTPacketInfo_t containing the buffer. + * @param[out] pPacketId The packet ID obtained from the buffer. + * @param[out] pPublishInfo Struct containing information about the publish. + * + * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess. + */ +MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + MQTTPublishInfo_t * pPublishInfo ); + +/** + * @brief Deserialize an MQTT CONNACK, SUBACK, UNSUBACK, PUBACK, PUBREC, PUBREL, + * PUBCOMP, or PINGRESP. + * + * @param[in] pIncomingPacket #MQTTPacketInfo_t containing the buffer. + * @param[out] pPacketId The packet ID of obtained from the buffer. Not used + * in CONNACK or PINGRESP. + * @param[out] pSessionPresent Boolean flag from a CONNACK indicating present session. + * + * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess. + */ +MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + bool * pSessionPresent ); + +/** + * @brief Extract the MQTT packet type and length from incoming packet. + * + * @param[in] readFunc Transport layer read function pointer. + * @param[out] pIncomingPacket Pointer to MQTTPacketInfo_t structure. This is + * where type, remaining length and packet identifier are stored. + * + * @return #MQTTSuccess on successful extraction of type and length, + * #MQTTBadParameter if @p pIncomingPacket is invalid, + * #MQTTRecvFailed on transport receive failure, + * #MQTTBadResponse if an invalid packet is read, and + * #MQTTNoDataAvailable if there is nothing to read. + */ +MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( MQTTTransportRecvFunc_t readFunc, + NetworkContext_t networkContext, + MQTTPacketInfo_t * pIncomingPacket ); + +#endif /* ifndef MQTT_LIGHTWEIGHT_H */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h new file mode 100644 index 000000000..75a54de7d --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h @@ -0,0 +1,154 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef MQTT_STATE_H +#define MQTT_STATE_H + +#include "mqtt.h" + +#define MQTT_STATE_CURSOR_INITIALIZER ( size_t ) 0 + +/** + * @brief Value indicating either send or receive. + */ +typedef enum MQTTStateOperation +{ + MQTT_SEND, + MQTT_RECEIVE +} MQTTStateOperation_t; + +/** + * @brief Cursor for iterating through state records. + */ +typedef size_t MQTTStateCursor_t; + +/** + * @brief Reserve an entry for an outgoing QoS 1/2 publish. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId The ID of the publish packet. + * @param[in] qos 1 or 2. + * + * @return MQTTSuccess, MQTTNoMemory, or MQTTStateCollision. + */ +MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTQoS_t qos ); + +/** + * @brief Calculate the new state for a publish from its qos and operation type. + * + * @param[in] opType Send or Receive. + * @param[in] qos 0, 1, or 2. + * + * @return The calculated state. + */ +MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, + MQTTQoS_t qos ); + +/** + * @brief Update the state record for a PUBLISH packet. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId ID of the PUBLISH packet. + * @param[in] opType Send or Receive. + * @param[in] qos 0, 1, or 2. + * @param[out] pNewState Updated state of the publish. + * + * @return #MQTTBadParameter, #MQTTIllegalState, #MQTTStateCollision or + * #MQTTSuccess. + */ +MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTStateOperation_t opType, + MQTTQoS_t qos, + MQTTPublishState_t * pNewState ); + +/** + * @brief Calculate the state from a PUBACK, PUBREC, PUBREL, or PUBCOMP. + * + * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. + * @param[in] opType Send or Receive. + * @param[in] qos 1 or 2. + * + * @return The calculated state. + */ +MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType, + MQTTStateOperation_t opType, + MQTTQoS_t qos ); + +/** + * @brief Update the state record for an ACKed publish. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId ID of the ack packet. + * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. + * @param[in] opType Send or Receive. + * @param[out] pNewState Updated state of the publish. + * + * @return #MQTTBadParameter, #MQTTIllegalState, or #MQTTSuccess. + */ +MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTPubAckType_t packetType, + MQTTStateOperation_t opType, + MQTTPublishState_t * pNewState ); + +/** + * @brief Get the packet ID of next pending PUBREL ack to be resent. + * + * This function will need to be called to get the packet for which a PUBREL + * need to be sent when a session is reestablished. Calling this function + * repeatedly until packet id is 0 will give all the packets for which + * a PUBREL need to be resent in the correct order. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in,out] pCursor Index at which to start searching. + * @param[out] pState State indicating that PUBREL packet need to be sent. + */ +uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, + MQTTStateCursor_t * pCursor, + MQTTPublishState_t * pState ); + +/** + * @brief Get the packet ID of next pending publish to be resent. + * + * This function will need to be called to get the packet for which a publish + * need to be sent when a session is reestablished. Calling this function + * repeatedly until packet id is 0 will give all the packets for which + * a publish need to be resent in the correct order. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in,out] pCursor Index at which to start searching. + */ +uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, + MQTTStateCursor_t * pCursor ); + +/** + * @brief State to string conversion for state engine. + * + * @param[in] state The state to convert to a string. + * + * @return The string representation of the state. + */ +const char * MQTT_State_strerror( MQTTPublishState_t state ); + +#endif /* ifndef MQTT_STATE_H */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c new file mode 100644 index 000000000..9efe1b618 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c @@ -0,0 +1,1939 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <assert.h> + +#include "mqtt.h" +#include "mqtt_state.h" +#include "private/mqtt_internal.h" + + +/** + * @brief The number of retries for receiving CONNACK. + * + * The MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT will be used only when the + * timeoutMs parameter of #MQTT_Connect() is passed as 0 . The transport + * receive for CONNACK will be retried MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT + * times before timing out. A value of 0 for this config will cause the + * transport receive for CONNACK to be invoked only once. + */ +#ifndef MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT + /* Default value for the CONNACK receive retries. */ + #define MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ( 5U ) +#endif + +/*-----------------------------------------------------------*/ + +/** + * @brief Sends provided buffer to network using transport send. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pBufferToSend Buffer to be sent to network. + * @brief param[in] bytesToSend Number of bytes to be sent. + * + * @return Total number of bytes sent; -1 if there is an error. + */ +static int32_t sendPacket( MQTTContext_t * pContext, + const uint8_t * pBufferToSend, + size_t bytesToSend ); + +/** + * @brief Calculate the interval between two millisecond timestamps, including + * when the later value has overflowed. + * + * @note In C, the operands are promoted to signed integers in subtraction. + * Using this function avoids the need to cast the result of subtractions back + * to uint32_t. + * + * @param[in] later The later time stamp, in milliseconds. + * @param[in] start The earlier time stamp, in milliseconds. + * + * @return later - start. + */ +static uint32_t calculateElapsedTime( uint32_t later, + uint32_t start ); + +/** + * @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t. + * + * @param[in] packetType First byte of fixed header. + * + * @return Type of ack. + */ +static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ); + +/** + * @brief Receive bytes into the network buffer, with a timeout. + * + * @param[in] pContext Initialized MQTT Context. + * @param[in] bytesToRecv Number of bytes to receive. + * @param[in] timeoutMs Time remaining to receive the packet. + * + * @return Number of bytes received, or negative number on network error. + */ +static int32_t recvExact( const MQTTContext_t * pContext, + size_t bytesToRecv, + uint32_t timeoutMs ); + +/** + * @brief Discard a packet from the transport interface. + * + * @param[in] PContext MQTT Connection context. + * @param[in] remainingLength Remaining length of the packet to dump. + * @param[in] timeoutMs Time remaining to discard the packet. + * + * @return #MQTTRecvFailed or #MQTTNoDataAvailable. + */ +static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, + size_t remainingLength, + uint32_t timeoutMs ); + +/** + * @brief Receive a packet from the transport interface. + * + * @param[in] pContext MQTT Connection context. + * @param[in] incomingPacket packet struct with remaining length. + * @param[in] remainingTimeMs Time remaining to receive the packet. + * + * @return #MQTTSuccess or #MQTTRecvFailed. + */ +static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, + MQTTPacketInfo_t incomingPacket, + uint32_t remainingTimeMs ); + +/** + * @brief Get the correct ack type to send. + * + * @param[in] state Current state of publish. + * + * @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of + * those should be sent, else 0. + */ +static uint8_t getAckTypeToSend( MQTTPublishState_t state ); + +/** + * @brief Send acks for received QoS 1/2 publishes. + * + * @param[in] pContext MQTT Connection context. + * @param[in] packetId packet ID of original PUBLISH. + * @param[in] publishState Current publish state in record. + * + * @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed. + */ +static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, + uint16_t packetId, + MQTTPublishState_t publishState ); + +/** + * @brief Send a keep alive PINGREQ if the keep alive interval has elapsed. + * + * @param[in] pContext Initialized MQTT Context. + * + * @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time, + * #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess. + */ +static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ); + +/** + * @brief Handle received MQTT PUBLISH packet. + * + * @param[in] pContext MQTT Connection context. + * @param[in] pIncomingPacket Incoming packet. + * + * @return MQTTSuccess, MQTTIllegalState or deserialization error. + */ +static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket ); + +/** + * @brief Handle received MQTT publish acks. + * + * @param[in] pContext MQTT Connection context. + * @param[in] pIncomingPacket Incoming packet. + * + * @return MQTTSuccess, MQTTIllegalState, or deserialization error. + */ +static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket ); + +/** + * @brief Handle received MQTT ack. + * + * @param[in] pContext MQTT Connection context. + * @param[in] pIncomingPacket Incoming packet. + * @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given + * to the application + * + * @return MQTTSuccess, MQTTIllegalState, or deserialization error. + */ +static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket, + bool manageKeepAlive ); + +/** + * @brief Run a single iteration of the receive loop. + * + * @param[in] pContext MQTT Connection context. + * @param[in] remainingTimeMs Remaining time for the loop in milliseconds. + * @param[in] manageKeepAlive Flag indicating if keep alive should be handled. + * + * @return #MQTTRecvFailed if a network error occurs during reception; + * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; + * #MQTTBadResponse if an invalid packet is received; + * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before + * pContext->pingRespTimeoutMs milliseconds; + * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an + * invalid transition for the internal state machine; + * #MQTTSuccess on success. + */ +static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, + uint32_t remainingTimeMs, + bool manageKeepAlive ); + +/** + * @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId Packet identifier. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ); + +/** + * @brief Send serialized publish packet using transport send. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @brief param[in] headerSize Header size of the PUBLISH packet. + * + * @return #MQTTSendFailed if transport write failed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t sendPublish( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + size_t headerSize ); + +/** + * @brief Receives a CONNACK MQTT packet. + * + * @param[in] pContext Initialized MQTT context. + * @param[in] timeoutMs Timeout for waiting for CONNACK packet. + * @param[in] cleanSession Clean session flag set by application. + * @param[out] pIncomingPacket List of MQTT subscription info. + * @param[out] pSessionPresent Whether a previous session was present. + * Only relevant if not establishing a clean session. + * + * @return #MQTTBadResponse if a bad response is received; + * #MQTTNoDataAvailable if no data available for transport recv; + * ##MQTTRecvFailed if transport recv failed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, + uint32_t timeoutMs, + bool cleanSession, + MQTTPacketInfo_t * pIncomingPacket, + bool * pSessionPresent ); + +/** + * @brief Resends pending acks for a re-established MQTT session. + * + * @param[in] pContext Initialized MQTT context. + * + * @return #MQTTSendFailed if transport send failed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext ); + +/** + * @brief Serializes a PUBLISH message. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @brief param[in] packetId Packet Id of the publish packet. + * @brief param[out] pHeaderSize Size of the serialized PUBLISH header. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t serializePublish( const MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t * const pHeaderSize ); + +/** + * @brief Function to validate #MQTT_Publish parameters. + * + * @brief param[in] pContext Initialized MQTT context. + * @brief param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @brief param[in] packetId Packet Id for the MQTT PUBLISH packet. + * + * @return #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId ); + +/*-----------------------------------------------------------*/ + +static int32_t sendPacket( MQTTContext_t * pContext, + const uint8_t * pBufferToSend, + size_t bytesToSend ) +{ + const uint8_t * pIndex = pBufferToSend; + size_t bytesRemaining = bytesToSend; + int32_t totalBytesSent = 0, bytesSent; + uint32_t sendTime = 0U; + + assert( pContext != NULL ); + assert( pContext->callbacks.getTime != NULL ); + + bytesRemaining = bytesToSend; + + /* Record the time of transmission. */ + sendTime = pContext->callbacks.getTime(); + + /* Loop until the entire packet is sent. */ + while( bytesRemaining > 0UL ) + { + bytesSent = pContext->transportInterface.send( pContext->transportInterface.networkContext, + pIndex, + bytesRemaining ); + + if( bytesSent > 0 ) + { + bytesRemaining -= ( size_t ) bytesSent; + totalBytesSent += bytesSent; + pIndex += bytesSent; + LogDebug( ( "Bytes sent=%d, bytes remaining=%ul," + "total bytes sent=%d.", + bytesSent, + bytesRemaining, + totalBytesSent ) ); + } + else + { + LogError( ( "Transport send failed." ) ); + totalBytesSent = -1; + break; + } + } + + /* Update time of last transmission if the entire packet is successfully sent. */ + if( totalBytesSent > 0 ) + { + pContext->lastPacketTime = sendTime; + LogDebug( ( "Successfully sent packet at time %u.", + sendTime ) ); + } + + return totalBytesSent; +} + +/*-----------------------------------------------------------*/ + +static uint32_t calculateElapsedTime( uint32_t later, + uint32_t start ) +{ + return later - start; +} + +/*-----------------------------------------------------------*/ + +static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ) +{ + MQTTPubAckType_t ackType = MQTTPuback; + + switch( packetType ) + { + case MQTT_PACKET_TYPE_PUBACK: + ackType = MQTTPuback; + break; + + case MQTT_PACKET_TYPE_PUBREC: + ackType = MQTTPubrec; + break; + + case MQTT_PACKET_TYPE_PUBREL: + ackType = MQTTPubrel; + break; + + case MQTT_PACKET_TYPE_PUBCOMP: + default: + + /* This function is only called after checking the type is one of + * the above four values, so packet type must be PUBCOMP here. */ + assert( packetType == MQTT_PACKET_TYPE_PUBCOMP ); + ackType = MQTTPubcomp; + break; + } + + return ackType; +} + +/*-----------------------------------------------------------*/ + +static int32_t recvExact( const MQTTContext_t * pContext, + size_t bytesToRecv, + uint32_t timeoutMs ) +{ + uint8_t * pIndex = NULL; + size_t bytesRemaining = bytesToRecv; + int32_t totalBytesRecvd = 0, bytesRecvd; + uint32_t entryTimeMs = 0U, elapsedTimeMs = 0U; + MQTTTransportRecvFunc_t recvFunc = NULL; + MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; + bool receiveError = false; + + assert( pContext != NULL ); + assert( bytesToRecv <= pContext->networkBuffer.size ); + assert( pContext->callbacks.getTime != NULL ); + pIndex = pContext->networkBuffer.pBuffer; + recvFunc = pContext->transportInterface.recv; + getTimeStampMs = pContext->callbacks.getTime; + + entryTimeMs = getTimeStampMs(); + + while( ( bytesRemaining > 0U ) && ( receiveError == false ) ) + { + bytesRecvd = recvFunc( pContext->transportInterface.networkContext, + pIndex, + bytesRemaining ); + + if( bytesRecvd >= 0 ) + { + bytesRemaining -= ( size_t ) bytesRecvd; + totalBytesRecvd += ( int32_t ) bytesRecvd; + pIndex += bytesRecvd; + } + else + { + LogError( ( "Network error while receiving packet: ReturnCode=%d", + bytesRecvd ) ); + totalBytesRecvd = bytesRecvd; + receiveError = true; + } + + elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); + + if( ( bytesRemaining > 0U ) && ( elapsedTimeMs >= timeoutMs ) ) + { + LogError( ( "Time expired while receiving packet." ) ); + receiveError = true; + } + } + + return totalBytesRecvd; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, + size_t remainingLength, + uint32_t timeoutMs ) +{ + MQTTStatus_t status = MQTTRecvFailed; + int32_t bytesReceived = 0; + size_t bytesToReceive = 0U; + uint32_t totalBytesReceived = 0U, entryTimeMs = 0U, elapsedTimeMs = 0U; + uint32_t remainingTimeMs = timeoutMs; + MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; + bool receiveError = false; + + assert( pContext != NULL ); + assert( pContext->callbacks.getTime != NULL ); + bytesToReceive = pContext->networkBuffer.size; + getTimeStampMs = pContext->callbacks.getTime; + + entryTimeMs = getTimeStampMs(); + + while( ( totalBytesReceived < remainingLength ) && ( receiveError == false ) ) + { + if( ( remainingLength - totalBytesReceived ) < bytesToReceive ) + { + bytesToReceive = remainingLength - totalBytesReceived; + } + + bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs ); + + if( bytesReceived != ( int32_t ) bytesToReceive ) + { + LogError( ( "Receive error while discarding packet." + "ReceivedBytes=%d, ExpectedBytes=%lu.", + bytesReceived, + bytesToReceive ) ); + receiveError = true; + } + else + { + totalBytesReceived += ( uint32_t ) bytesReceived; + + elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); + + /* Update remaining time and check for timeout. */ + if( elapsedTimeMs < timeoutMs ) + { + remainingTimeMs = timeoutMs - elapsedTimeMs; + } + else + { + LogError( ( "Time expired while discarding packet." ) ); + receiveError = true; + } + } + } + + if( totalBytesReceived == remainingLength ) + { + LogError( ( "Dumped packet. DumpedBytes=%d.", + totalBytesReceived ) ); + /* Packet dumped, so no data is available. */ + status = MQTTNoDataAvailable; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, + MQTTPacketInfo_t incomingPacket, + uint32_t remainingTimeMs ) +{ + MQTTStatus_t status = MQTTSuccess; + int32_t bytesReceived = 0; + size_t bytesToReceive = 0U; + + assert( pContext != NULL ); + + if( incomingPacket.remainingLength > pContext->networkBuffer.size ) + { + LogError( ( "Incoming packet will be dumped: " + "Packet length exceeds network buffer size." + "PacketSize=%lu, NetworkBufferSize=%lu", + incomingPacket.remainingLength, + pContext->networkBuffer.size ) ); + status = discardPacket( pContext, + incomingPacket.remainingLength, + remainingTimeMs ); + } + else + { + bytesToReceive = incomingPacket.remainingLength; + bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs ); + + if( bytesReceived == ( int32_t ) bytesToReceive ) + { + /* Receive successful, bytesReceived == bytesToReceive. */ + LogInfo( ( "Packet received. ReceivedBytes=%d.", + bytesReceived ) ); + } + else + { + LogError( ( "Packet reception failed. ReceivedBytes=%d, " + "ExpectedBytes=%lu.", + bytesReceived, + bytesToReceive ) ); + status = MQTTRecvFailed; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static uint8_t getAckTypeToSend( MQTTPublishState_t state ) +{ + uint8_t packetTypeByte = 0U; + + switch( state ) + { + case MQTTPubAckSend: + packetTypeByte = MQTT_PACKET_TYPE_PUBACK; + break; + + case MQTTPubRecSend: + packetTypeByte = MQTT_PACKET_TYPE_PUBREC; + break; + + case MQTTPubRelSend: + packetTypeByte = MQTT_PACKET_TYPE_PUBREL; + break; + + case MQTTPubCompSend: + packetTypeByte = MQTT_PACKET_TYPE_PUBCOMP; + break; + + default: + /* Take no action for states that do not require sending an ack. */ + break; + } + + return packetTypeByte; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, + uint16_t packetId, + MQTTPublishState_t publishState ) +{ + MQTTStatus_t status = MQTTSuccess; + MQTTPublishState_t newState = MQTTStateNull; + int32_t bytesSent = 0; + uint8_t packetTypeByte = 0U; + MQTTPubAckType_t packetType; + + assert( pContext != NULL ); + + packetTypeByte = getAckTypeToSend( publishState ); + + if( packetTypeByte != 0U ) + { + packetType = getAckFromPacketType( packetTypeByte ); + + status = MQTT_SerializeAck( &( pContext->networkBuffer ), + packetTypeByte, + packetId ); + + if( status == MQTTSuccess ) + { + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + MQTT_PUBLISH_ACK_PACKET_SIZE ); + } + + if( bytesSent == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) + { + pContext->controlPacketSent = true; + status = MQTT_UpdateStateAck( pContext, + packetId, + packetType, + MQTT_SEND, + &newState ); + + if( status != MQTTSuccess ) + { + LogError( ( "Failed to update state of publish %u.", packetId ) ); + } + } + else + { + LogError( ( "Failed to send ACK packet: PacketType=%02x, " + "SentBytes=%d, " + "PacketSize=%lu.", + packetTypeByte, + bytesSent, + MQTT_PUBLISH_ACK_PACKET_SIZE ) ); + status = MQTTSendFailed; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ) +{ + MQTTStatus_t status = MQTTSuccess; + uint32_t now = 0U, keepAliveMs = 0U; + + assert( pContext != NULL ); + now = pContext->callbacks.getTime(); + keepAliveMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec; + + /* If keep alive interval is 0, it is disabled. */ + if( ( keepAliveMs != 0U ) && + ( calculateElapsedTime( now, pContext->lastPacketTime ) > keepAliveMs ) ) + { + if( pContext->waitingForPingResp == true ) + { + /* Has time expired? */ + if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) > + pContext->pingRespTimeoutMs ) + { + status = MQTTKeepAliveTimeout; + } + } + else + { + status = MQTT_Ping( pContext ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket ) +{ + MQTTStatus_t status = MQTTBadParameter; + MQTTPublishState_t publishRecordState = MQTTStateNull; + uint16_t packetIdentifier = 0U; + MQTTPublishInfo_t publishInfo; + bool duplicatePublish = false; + + assert( pContext != NULL ); + assert( pIncomingPacket != NULL ); + + status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo ); + LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%d", status ) ); + + if( status == MQTTSuccess ) + { + status = MQTT_UpdateStatePublish( pContext, + packetIdentifier, + MQTT_RECEIVE, + publishInfo.qos, + &publishRecordState ); + + if( status == MQTTSuccess ) + { + LogInfo( ( "State record updated. New state=%s.", + MQTT_State_strerror( publishRecordState ) ) ); + } + + /* Different cases in which an incoming publish with duplicate flag is + * handled are as listed below. + * 1. No collision - This is the first instance of the incoming publish + * packet received or an earlier received packet state is lost. This + * will be handled as a new incoming publish for both QoS1 and QoS2 + * publishes. + * 2. Collision - The incoming packet was received before and a state + * record is present in the state engine. For QoS1 and QoS2 publishes + * this case can happen at 2 different cases and handling is + * different. + * a. QoS1 - If a PUBACK is not successfully sent for the incoming + * publish due to a connection issue, it can result in broker + * sending out a duplicate publish with dup flag set, when a + * session is reestablished. It can result in a collision in + * state engine. This will be handled by processing the incoming + * publish as a new publish ignoring the + * #MQTTStateCollision status from the state engine. The publish + * data is not passed to the application. + * b. QoS2 - If a PUBREC is not successfully sent for the incoming + * publish or the PUBREC sent is not successfully received by the + * broker due to a connection issue, it can result in broker + * sending out a duplicate publish with dup flag set, when a + * session is reestablished. It can result in a collision in + * state engine. This will be handled by ignoring the + * #MQTTStateCollision status from the state engine. The publish + * data is not passed to the application. */ + else if( ( status == MQTTStateCollision ) && ( publishInfo.dup == true ) ) + { + status = MQTTSuccess; + duplicatePublish = true; + + /* Calculate the state for the ack packet that needs to be sent out + * for the duplicate incoming publish. */ + publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE, + publishInfo.qos ); + LogDebug( ( "Incoming publish packet with packet id %u already exists.", + packetIdentifier ) ); + } + else + { + LogError( ( "Error in updating publish state for incoming publish with packet id %u." + " Error is %s", + packetIdentifier, + MQTT_Status_strerror( status ) ) ); + } + } + + if( status == MQTTSuccess ) + { + /* Invoke application callback to hand the buffer over to application + * before sending acks. + * Application callback will be invoked for all publishes, except for + * duplicate incoming publishes. */ + if( duplicatePublish == false ) + { + pContext->callbacks.appCallback( pContext, + pIncomingPacket, + packetIdentifier, + &publishInfo ); + } + + /* Send PUBACK or PUBREC if necessary. */ + status = sendPublishAcks( pContext, + packetIdentifier, + publishRecordState ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket ) +{ + MQTTStatus_t status = MQTTBadResponse; + MQTTPublishState_t publishRecordState = MQTTStateNull; + uint16_t packetIdentifier; + MQTTPubAckType_t ackType; + MQTTEventCallback_t appCallback; + + assert( pContext != NULL ); + assert( pIncomingPacket != NULL ); + assert( pContext->callbacks.appCallback != NULL ); + + appCallback = pContext->callbacks.appCallback; + + ackType = getAckFromPacketType( pIncomingPacket->type ); + status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); + LogInfo( ( "Ack packet deserialized with result: %s.", + MQTT_Status_strerror( status ) ) ); + + if( status == MQTTSuccess ) + { + status = MQTT_UpdateStateAck( pContext, + packetIdentifier, + ackType, + MQTT_RECEIVE, + &publishRecordState ); + + if( status == MQTTSuccess ) + { + LogInfo( ( "State record updated. New state=%s.", + MQTT_State_strerror( publishRecordState ) ) ); + } + else + { + LogError( ( "Updating the state engine for packet id %u" + " failed with error %s.", + packetIdentifier, + MQTT_Status_strerror( status ) ) ); + } + } + + if( status == MQTTSuccess ) + { + /* Invoke application callback to hand the buffer over to application + * before sending acks. */ + appCallback( pContext, pIncomingPacket, packetIdentifier, NULL ); + + /* Send PUBREL or PUBCOMP if necessary. */ + status = sendPublishAcks( pContext, + packetIdentifier, + publishRecordState ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, + MQTTPacketInfo_t * pIncomingPacket, + bool manageKeepAlive ) +{ + MQTTStatus_t status = MQTTBadResponse; + uint16_t packetIdentifier; + /* Need a dummy variable for MQTT_DeserializeAck(). */ + bool sessionPresent = false; + + /* We should always invoke the app callback unless we receive a PINGRESP + * and are managing keep alive, or if we receive an unknown packet. We + * initialize this to false since the callback must be invoked before + * sending any PUBREL or PUBCOMP. However, for other cases, we invoke it + * at the end to reduce the complexity of this function. */ + bool invokeAppCallback = false; + MQTTEventCallback_t appCallback = NULL; + + assert( pContext != NULL ); + assert( pIncomingPacket != NULL ); + + appCallback = pContext->callbacks.appCallback; + + switch( pIncomingPacket->type ) + { + case MQTT_PACKET_TYPE_PUBACK: + case MQTT_PACKET_TYPE_PUBREC: + case MQTT_PACKET_TYPE_PUBREL: + case MQTT_PACKET_TYPE_PUBCOMP: + + /* Handle all the publish acks. */ + status = handlePublishAcks( pContext, pIncomingPacket ); + + break; + + case MQTT_PACKET_TYPE_PINGRESP: + status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, &sessionPresent ); + invokeAppCallback = ( manageKeepAlive == true ) ? false : true; + + if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) ) + { + pContext->waitingForPingResp = false; + } + + break; + + case MQTT_PACKET_TYPE_SUBACK: + case MQTT_PACKET_TYPE_UNSUBACK: + /* Deserialize and give these to the app provided callback. */ + status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, &sessionPresent ); + invokeAppCallback = true; + break; + + default: + /* Bad response from the server. */ + LogError( ( "Unexpected packet type from server: PacketType=%02x.", + pIncomingPacket->type ) ); + status = MQTTBadResponse; + break; + } + + if( ( status == MQTTSuccess ) && ( invokeAppCallback == true ) ) + { + appCallback( pContext, pIncomingPacket, packetIdentifier, NULL ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, + uint32_t remainingTimeMs, + bool manageKeepAlive ) +{ + MQTTStatus_t status = MQTTSuccess; + MQTTPacketInfo_t incomingPacket; + + assert( pContext != NULL ); + + status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv, + pContext->transportInterface.networkContext, + &incomingPacket ); + + if( status == MQTTNoDataAvailable ) + { + if( manageKeepAlive == true ) + { + /* Assign status so an error can be bubbled up to application, + * but reset it on success. */ + status = handleKeepAlive( pContext ); + } + + if( status == MQTTSuccess ) + { + /* Reset the status to indicate that we should not try to read + * a packet from the transport interface. */ + status = MQTTNoDataAvailable; + } + } + else if( status != MQTTSuccess ) + { + LogError( ( "Receiving incoming packet length failed. Status=%s", + MQTT_Status_strerror( status ) ) ); + } + else + { + /* Receive packet. Remaining time is recalculated before calling this + * function. */ + status = receivePacket( pContext, incomingPacket, remainingTimeMs ); + } + + /* Handle received packet. If no data was read then this will not execute. */ + if( status == MQTTSuccess ) + { + incomingPacket.pRemainingData = pContext->networkBuffer.pBuffer; + + /* PUBLISH packets allow flags in the lower four bits. For other + * packet types, they are reserved. */ + if( ( incomingPacket.type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) + { + status = handleIncomingPublish( pContext, &incomingPacket ); + } + else + { + status = handleIncomingAck( pContext, &incomingPacket, manageKeepAlive ); + } + } + + if( status == MQTTNoDataAvailable ) + { + /* No data available is not an error. Reset to MQTTSuccess so the + * return code will indicate success. */ + status = MQTTSuccess; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate all the parameters. */ + if( ( pContext == NULL ) || ( pSubscriptionList == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pContext=%p, " + "pSubscriptionList=%p.", + pContext, + pSubscriptionList ) ); + status = MQTTBadParameter; + } + else if( subscriptionCount == 0UL ) + { + LogError( ( "Subscription count is 0." ) ); + status = MQTTBadParameter; + } + else if( packetId == 0U ) + { + LogError( ( "Packet Id for subscription packet is 0." ) ); + status = MQTTBadParameter; + } + else + { + /* Empty else MISRA 15.7 */ + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t sendPublish( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + size_t headerSize ) +{ + MQTTStatus_t status = MQTTSuccess; + int32_t bytesSent = 0; + + assert( pContext != NULL ); + assert( pPublishInfo != NULL ); + assert( headerSize > 0 ); + + /* Send header first. */ + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + headerSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for PUBLISH header." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of PUBLISH header.", + bytesSent ) ); + + /* Send Payload. */ + bytesSent = sendPacket( pContext, + pPublishInfo->pPayload, + pPublishInfo->payloadLength ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for PUBLISH payload." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of PUBLISH payload.", + bytesSent ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, + uint32_t timeoutMs, + bool cleanSession, + MQTTPacketInfo_t * pIncomingPacket, + bool * pSessionPresent ) +{ + MQTTStatus_t status = MQTTSuccess; + MQTTGetCurrentTimeFunc_t getTimeStamp = NULL; + uint32_t entryTimeMs = 0U, remainingTimeMs = 0U, timeTakenMs = 0U; + bool breakFromLoop = false; + uint16_t loopCount = 0U; + + assert( pContext != NULL ); + assert( pIncomingPacket != NULL ); + assert( pContext->callbacks.getTime != NULL ); + + getTimeStamp = pContext->callbacks.getTime; + + /* Get the entry time for the function. */ + entryTimeMs = getTimeStamp(); + + do + { + /* Transport read for incoming CONNACK packet type and length. + * MQTT_GetIncomingPacketTypeAndLength is a blocking call and it is + * returned after a transport receive timeout, an error, or a successful + * receive of packet type and length. */ + status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv, + pContext->transportInterface.networkContext, + pIncomingPacket ); + + /* The loop times out based on 2 conditions. + * 1. If timeoutMs is greater than 0: + * Loop times out based on the timeout calculated by getTime() + * function. + * 2. If timeoutMs is 0: + * Loop times out based on the maximum number of retries config + * MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. This config will control + * maximum the number of retry attempts to read the CONNACK packet. + * A value of 0 for the config will try once to read CONNACK. */ + if( timeoutMs > 0U ) + { + breakFromLoop = ( calculateElapsedTime( getTimeStamp(), entryTimeMs ) >= timeoutMs ) ? true : false; + } + else + { + breakFromLoop = ( loopCount >= MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ) ? true : false; + loopCount++; + } + + /* Loop until there is data to read or if we have exceeded the timeout/retries. */ + } while( ( status == MQTTNoDataAvailable ) && ( breakFromLoop == false ) ); + + if( status == MQTTSuccess ) + { + /* Time taken in this function so far. */ + timeTakenMs = calculateElapsedTime( getTimeStamp(), entryTimeMs ); + + if( timeTakenMs < timeoutMs ) + { + /* Calculate remaining time for receiving the remainder of + * the packet. */ + remainingTimeMs = timeoutMs - timeTakenMs; + } + + /* Reading the remainder of the packet by transport recv. + * Attempt to read once even if the timeout has expired. + * Invoking receivePacket with remainingTime as 0 would attempt to + * recv from network once. If using retries, the remainder of the + * CONNACK packet is tried to be read only once. Reading once would be + * good as the packet type and remaining length was already read. Hence, + * the probability of the remaining 2 bytes available to read is very high. */ + if( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK ) + { + status = receivePacket( pContext, + *pIncomingPacket, + remainingTimeMs ); + } + else + { + LogError( ( "Incorrect packet type %X received while expecting" + " CONNACK(%X).", + pIncomingPacket->type, + MQTT_PACKET_TYPE_CONNACK ) ); + status = MQTTBadResponse; + } + } + + if( status == MQTTSuccess ) + { + /* Update the packet info pointer to the buffer read. */ + pIncomingPacket->pRemainingData = pContext->networkBuffer.pBuffer; + + /* Deserialize CONNACK. */ + status = MQTT_DeserializeAck( pIncomingPacket, NULL, pSessionPresent ); + } + + /* If a clean session is requested, a session present should not be set by + * broker. */ + if( status == MQTTSuccess ) + { + if( ( cleanSession == true ) && ( *pSessionPresent == true ) ) + { + LogError( ( "Unexpected session present flag in CONNACK response from broker." + " CONNECT request with clean session was made with broker." ) ); + status = MQTTBadResponse; + } + } + + if( status == MQTTSuccess ) + { + LogInfo( ( "Received MQTT CONNACK successfully from broker." ) ); + } + else + { + LogError( ( "CONNACK recv failed with status = %s.", + MQTT_Status_strerror( status ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t resendPendingAcks( MQTTContext_t * pContext ) +{ + MQTTStatus_t status = MQTTSuccess; + MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; + uint16_t packetId = MQTT_PACKET_ID_INVALID; + MQTTPublishState_t state = MQTTStateNull; + + assert( pContext != NULL ); + + /* Get the next packet Id for which a PUBREL need to be resent. */ + packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); + + /* Resend all the PUBREL acks after session is reestablished. */ + while( ( packetId != MQTT_PACKET_ID_INVALID ) && + ( status == MQTTSuccess ) ) + { + status = sendPublishAcks( pContext, packetId, state ); + + packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t serializePublish( const MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t * const pHeaderSize ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t remainingLength = 0UL, packetSize = 0UL; + + assert( pContext != NULL ); + assert( pPublishInfo != NULL ); + assert( pHeaderSize != NULL ); + + /* Get the remaining length and packet size.*/ + status = MQTT_GetPublishPacketSize( pPublishInfo, + &remainingLength, + &packetSize ); + LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.", + packetSize, + remainingLength ) ); + + if( status == MQTTSuccess ) + { + status = MQTT_SerializePublishHeader( pPublishInfo, + packetId, + remainingLength, + &( pContext->networkBuffer ), + pHeaderSize ); + LogDebug( ( "Serialized PUBLISH header size is %lu.", + *pHeaderSize ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate arguments. */ + if( ( pContext == NULL ) || ( pPublishInfo == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pContext=%p, " + "pPublishInfo=%p.", + pContext, + pPublishInfo ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) ) + { + LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.", + pPublishInfo->qos ) ); + status = MQTTBadParameter; + } + else + { + /* Empty else MISRA 15.7 */ + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, + const MQTTTransportInterface_t * pTransportInterface, + const MQTTApplicationCallbacks_t * pCallbacks, + const MQTTFixedBuffer_t * pNetworkBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate arguments. */ + if( ( pContext == NULL ) || ( pTransportInterface == NULL ) || + ( pCallbacks == NULL ) || ( pNetworkBuffer == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pContext=%p, " + "pTransportInterface=%p, " + "pCallbacks=%p, " + "pNetworkBuffer=%p.", + pContext, + pTransportInterface, + pCallbacks, + pNetworkBuffer ) ); + status = MQTTBadParameter; + } + else if( ( pCallbacks->getTime == NULL ) || ( pCallbacks->appCallback == NULL ) || + ( pTransportInterface->recv == NULL ) || ( pTransportInterface->send == NULL ) ) + { + LogError( ( "Functions cannot be NULL: getTime=%p, appCallback=%p, recv=%p, send=%p.", + pCallbacks->getTime, + pCallbacks->appCallback, + pTransportInterface->recv, + pTransportInterface->send ) ); + status = MQTTBadParameter; + } + else + { + ( void ) memset( pContext, 0x00, sizeof( MQTTContext_t ) ); + + pContext->connectStatus = MQTTNotConnected; + pContext->transportInterface = *pTransportInterface; + pContext->callbacks = *pCallbacks; + pContext->networkBuffer = *pNetworkBuffer; + + /* Zero is not a valid packet ID per MQTT spec. Start from 1. */ + pContext->nextPacketId = 1; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, + const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + uint32_t timeoutMs, + bool * pSessionPresent ) +{ + size_t remainingLength = 0UL, packetSize = 0UL; + int32_t bytesSent; + MQTTStatus_t status = MQTTSuccess; + MQTTPacketInfo_t incomingPacket = { 0 }; + + incomingPacket.type = ( uint8_t ) 0; + + if( ( pContext == NULL ) || ( pConnectInfo == NULL ) || ( pSessionPresent == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pContext=%p, " + "pConnectInfo=%p, pSessionPresent=%p.", + pContext, + pConnectInfo, + pSessionPresent ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + /* Get MQTT connect packet size and remaining length. */ + status = MQTT_GetConnectPacketSize( pConnectInfo, + pWillInfo, + &remainingLength, + &packetSize ); + LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.", + packetSize, + remainingLength ) ); + } + + if( status == MQTTSuccess ) + { + status = MQTT_SerializeConnect( pConnectInfo, + pWillInfo, + remainingLength, + &( pContext->networkBuffer ) ); + } + + if( status == MQTTSuccess ) + { + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + packetSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for CONNECT packet." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of CONNECT packet.", + bytesSent ) ); + } + } + + /* Read CONNACK from transport layer. */ + if( status == MQTTSuccess ) + { + status = receiveConnack( pContext, + timeoutMs, + pConnectInfo->cleanSession, + &incomingPacket, + pSessionPresent ); + } + + /* Resend all the PUBREL when reestablishing a session. */ + if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) ) + { + status = resendPendingAcks( pContext ); + } + + if( status == MQTTSuccess ) + { + LogInfo( ( "MQTT connection established with the broker." ) ); + pContext->connectStatus = MQTTConnected; + } + else + { + LogError( ( "MQTT connection failed with status = %s.", + MQTT_Status_strerror( status ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ) +{ + size_t remainingLength = 0UL, packetSize = 0UL; + int32_t bytesSent = 0; + + /* Validate arguments. */ + MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, + pSubscriptionList, + subscriptionCount, + packetId ); + + if( status == MQTTSuccess ) + { + /* Get the remaining length and packet size.*/ + status = MQTT_GetSubscribePacketSize( pSubscriptionList, + subscriptionCount, + &remainingLength, + &packetSize ); + LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.", + packetSize, + remainingLength ) ); + } + + if( status == MQTTSuccess ) + { + /* Serialize MQTT SUBSCRIBE packet. */ + status = MQTT_SerializeSubscribe( pSubscriptionList, + subscriptionCount, + packetId, + remainingLength, + &( pContext->networkBuffer ) ); + } + + if( status == MQTTSuccess ) + { + /* Send serialized MQTT SUBSCRIBE packet to transport layer. */ + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + packetSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for SUBSCRIBE packet." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of SUBSCRIBE packet.", + bytesSent ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, + const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId ) +{ + size_t headerSize = 0UL; + MQTTPublishState_t publishStatus = MQTTStateNull; + + /* Validate arguments. */ + MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId ); + + if( status == MQTTSuccess ) + { + /* Serialize PUBLISH packet. */ + status = serializePublish( pContext, + pPublishInfo, + packetId, + &headerSize ); + } + + if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) + { + /* Reserve state for publish message. Only to be done for QoS1 or QoS2. */ + status = MQTT_ReserveState( pContext, + packetId, + pPublishInfo->qos ); + + /* State already exists for a duplicate packet. + * If a state doesn't exist, it will be handled as a new publish in + * state engine. */ + if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) ) + { + status = MQTTSuccess; + } + } + + if( status == MQTTSuccess ) + { + /* Sends the serialized publish packet over network. */ + status = sendPublish( pContext, + pPublishInfo, + headerSize ); + } + + if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) + { + /* Update state machine after PUBLISH is sent. + * Only to be done for QoS1 or QoS2. */ + status = MQTT_UpdateStatePublish( pContext, + packetId, + MQTT_SEND, + pPublishInfo->qos, + &publishStatus ); + + if( status != MQTTSuccess ) + { + LogError( ( "Update state for publish failed with status %s." + " However PUBLISH packet was sent to the broker." + " Any further handling of ACKs for the packet Id" + " will fail.", + MQTT_Status_strerror( status ) ) ); + } + } + + if( status != MQTTSuccess ) + { + LogError( ( "MQTT PUBLISH failed with status %s.", + MQTT_Status_strerror( status ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) +{ + int32_t bytesSent = 0; + MQTTStatus_t status = MQTTSuccess; + size_t packetSize; + + if( pContext == NULL ) + { + LogError( ( "pContext is NULL." ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + /* Get MQTT PINGREQ packet size. */ + status = MQTT_GetPingreqPacketSize( &packetSize ); + + if( status == MQTTSuccess ) + { + LogDebug( ( "MQTT PINGREQ packet size is %lu.", + packetSize ) ); + } + else + { + LogError( ( "Failed to get the PINGREQ packet size." ) ); + } + } + + if( status == MQTTSuccess ) + { + /* Serialize MQTT PINGREQ. */ + status = MQTT_SerializePingreq( &( pContext->networkBuffer ) ); + } + + if( status == MQTTSuccess ) + { + /* Send the serialized PINGREQ packet to transport layer. */ + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + packetSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for PINGREQ packet." ) ); + status = MQTTSendFailed; + } + else + { + pContext->pingReqSendTimeMs = pContext->lastPacketTime; + pContext->waitingForPingResp = true; + LogDebug( ( "Sent %d bytes of PINGREQ packet.", + bytesSent ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, + const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId ) +{ + size_t remainingLength = 0UL, packetSize = 0UL; + int32_t bytesSent = 0; + + /* Validate arguments. */ + MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext, + pSubscriptionList, + subscriptionCount, + packetId ); + + if( status == MQTTSuccess ) + { + /* Get the remaining length and packet size.*/ + status = MQTT_GetUnsubscribePacketSize( pSubscriptionList, + subscriptionCount, + &remainingLength, + &packetSize ); + LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.", + packetSize, + remainingLength ) ); + } + + if( status == MQTTSuccess ) + { + /* Serialize MQTT UNSUBSCRIBE packet. */ + status = MQTT_SerializeUnsubscribe( pSubscriptionList, + subscriptionCount, + packetId, + remainingLength, + &( pContext->networkBuffer ) ); + } + + if( status == MQTTSuccess ) + { + /* Send serialized MQTT UNSUBSCRIBE packet to transport layer. */ + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + packetSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for UNSUBSCRIBE packet." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of UNSUBSCRIBE packet.", + bytesSent ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) +{ + size_t packetSize; + int32_t bytesSent; + MQTTStatus_t status = MQTTSuccess; + + /* Validate arguments. */ + if( pContext == NULL ) + { + LogError( ( "pContext cannot be NULL." ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + /* Get MQTT DISCONNECT packet size. */ + status = MQTT_GetDisconnectPacketSize( &packetSize ); + LogDebug( ( "MQTT DISCONNECT packet size is %lu.", + packetSize ) ); + } + + if( status == MQTTSuccess ) + { + /* Serialize MQTT DISCONNECT packet. */ + status = MQTT_SerializeDisconnect( &( pContext->networkBuffer ) ); + } + + if( status == MQTTSuccess ) + { + bytesSent = sendPacket( pContext, + pContext->networkBuffer.pBuffer, + packetSize ); + + if( bytesSent < 0 ) + { + LogError( ( "Transport send failed for DISCONNECT packet." ) ); + status = MQTTSendFailed; + } + else + { + LogDebug( ( "Sent %d bytes of DISCONNECT packet.", + bytesSent ) ); + } + } + + if( status == MQTTSuccess ) + { + LogInfo( ( "Disconnected from the broker." ) ); + pContext->connectStatus = MQTTNotConnected; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext, + uint32_t timeoutMs ) +{ + MQTTStatus_t status = MQTTBadParameter; + MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; + uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U; + + if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) ) + { + getTimeStampMs = pContext->callbacks.getTime; + entryTimeMs = getTimeStampMs(); + status = MQTTSuccess; + pContext->controlPacketSent = false; + } + else if( pContext == NULL ) + { + LogError( ( "MQTT Context cannot be NULL." ) ); + } + else + { + LogError( ( "MQTT Context must set callbacks.getTime." ) ); + } + + while( status == MQTTSuccess ) + { + status = receiveSingleIteration( pContext, remainingTimeMs, true ); + + /* We don't need to break here since the status is already checked in + * the loop condition, and we do not want multiple breaks in a loop. */ + if( status != MQTTSuccess ) + { + LogError( ( "Exiting process loop. Error status=%s", + MQTT_Status_strerror( status ) ) ); + } + else + { + /* Recalculate remaining time and check if loop should exit. This is + * done at the end so the loop will run at least a single iteration. */ + elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); + + if( elapsedTimeMs > timeoutMs ) + { + break; + } + + remainingTimeMs = timeoutMs - elapsedTimeMs; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext, + uint32_t timeoutMs ) +{ + MQTTStatus_t status = MQTTBadParameter; + MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; + uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U; + + if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) ) + { + getTimeStampMs = pContext->callbacks.getTime; + entryTimeMs = getTimeStampMs(); + status = MQTTSuccess; + } + else if( pContext == NULL ) + { + LogError( ( "MQTT Context cannot be NULL." ) ); + } + else + { + LogError( ( "MQTT Context must set callbacks.getTime." ) ); + } + + while( status == MQTTSuccess ) + { + status = receiveSingleIteration( pContext, remainingTimeMs, false ); + + /* We don't need to break here since the status is already checked in + * the loop condition, and we do not want multiple breaks in a loop. */ + if( status != MQTTSuccess ) + { + LogError( ( "Exiting receive loop. Error status=%s", + MQTT_Status_strerror( status ) ) ); + } + else + { + /* Recalculate remaining time and check if loop should exit. This is + * done at the end so the loop will run at least a single iteration. */ + elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); + + if( elapsedTimeMs >= timeoutMs ) + { + break; + } + + remainingTimeMs = timeoutMs - elapsedTimeMs; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +uint16_t MQTT_GetPacketId( MQTTContext_t * pContext ) +{ + uint16_t packetId = 0U; + + if( pContext != NULL ) + { + packetId = pContext->nextPacketId; + + if( pContext->nextPacketId == ( uint16_t ) UINT16_MAX ) + { + pContext->nextPacketId = 1; + } + else + { + pContext->nextPacketId++; + } + } + + return packetId; +} + +/*-----------------------------------------------------------*/ + +const char * MQTT_Status_strerror( MQTTStatus_t status ) +{ + const char * str = NULL; + + switch( status ) + { + case MQTTSuccess: + str = "MQTTSuccess"; + break; + + case MQTTBadParameter: + str = "MQTTBadParameter"; + break; + + case MQTTNoMemory: + str = "MQTTNoMemory"; + break; + + case MQTTSendFailed: + str = "MQTTSendFailed"; + break; + + case MQTTRecvFailed: + str = "MQTTRecvFailed"; + break; + + case MQTTBadResponse: + str = "MQTTBadResponse"; + break; + + case MQTTServerRefused: + str = "MQTTServerRefused"; + break; + + case MQTTNoDataAvailable: + str = "MQTTNoDataAvailable"; + break; + + case MQTTIllegalState: + str = "MQTTIllegalState"; + break; + + case MQTTStateCollision: + str = "MQTTStateCollision"; + break; + + case MQTTKeepAliveTimeout: + str = "MQTTKeepAliveTimeout"; + break; + + default: + str = "Invalid MQTT Status code"; + break; + } + + return str; +} + +/*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c new file mode 100644 index 000000000..302dd6f1b --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c @@ -0,0 +1,2094 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <string.h> +#include <assert.h> + +#include "mqtt_lightweight.h" +#include "private/mqtt_internal.h" + +/** + * @brief MQTT protocol version 3.1.1. + */ +#define MQTT_VERSION_3_1_1 ( ( uint8_t ) 4U ) + +/** + * @brief Size of the fixed and variable header of a CONNECT packet. + */ +#define MQTT_PACKET_CONNECT_HEADER_SIZE ( 10UL ) + +/** + * @brief Maximum size of an MQTT CONNECT packet, per MQTT spec. + */ +#define MQTT_PACKET_CONNECT_MAX_SIZE ( 327700UL ) + +/* MQTT CONNECT flags. */ +#define MQTT_CONNECT_FLAG_CLEAN ( 1 ) /**< @brief Clean session. */ +#define MQTT_CONNECT_FLAG_WILL ( 2 ) /**< @brief Will present. */ +#define MQTT_CONNECT_FLAG_WILL_QOS1 ( 3 ) /**< @brief Will QoS 1. */ +#define MQTT_CONNECT_FLAG_WILL_QOS2 ( 4 ) /**< @brief Will QoS 2. */ +#define MQTT_CONNECT_FLAG_WILL_RETAIN ( 5 ) /**< @brief Will retain. */ +#define MQTT_CONNECT_FLAG_PASSWORD ( 6 ) /**< @brief Password present. */ +#define MQTT_CONNECT_FLAG_USERNAME ( 7 ) /**< @brief User name present. */ + +/* + * Positions of each flag in the first byte of an MQTT PUBLISH packet's + * fixed header. + */ +#define MQTT_PUBLISH_FLAG_RETAIN ( 0 ) /**< @brief MQTT PUBLISH retain flag. */ +#define MQTT_PUBLISH_FLAG_QOS1 ( 1 ) /**< @brief MQTT PUBLISH QoS1 flag. */ +#define MQTT_PUBLISH_FLAG_QOS2 ( 2 ) /**< @brief MQTT PUBLISH QoS2 flag. */ +#define MQTT_PUBLISH_FLAG_DUP ( 3 ) /**< @brief MQTT PUBLISH duplicate flag. */ + +/** + * @brief The size of MQTT DISCONNECT packets, per MQTT spec. + */ +#define MQTT_DISCONNECT_PACKET_SIZE ( 2UL ) + +/* + * @brief A PINGREQ packet is always 2 bytes in size, defined by MQTT 3.1.1 spec. + */ +#define MQTT_PACKET_PINGREQ_SIZE ( 2U ) + +/** + * @brief The Remaining Length field of MQTT disconnect packets, per MQTT spec. + */ +#define MQTT_DISCONNECT_REMAINING_LENGTH ( ( uint8_t ) 0 ) + +/* + * Constants relating to CONNACK packets, defined by MQTT 3.1.1 spec. + */ +#define MQTT_PACKET_CONNACK_REMAINING_LENGTH ( ( uint8_t ) 2U ) /**< @brief A CONNACK packet always has a "Remaining length" of 2. */ +#define MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ( ( uint8_t ) 0x01U ) /**< @brief The "Session Present" bit is always the lowest bit. */ + +/* + * UNSUBACK, PUBACK, PUBREC, PUBREL, and PUBCOMP always have a remaining length + * of 2. + */ +#define MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ( ( uint8_t ) 2 ) /**< @brief PUBACK, PUBREC, PUBREl, PUBCOMP, UNSUBACK Remaining length. */ +#define MQTT_PACKET_PINGRESP_REMAINING_LENGTH ( 0U ) /**< @brief A PINGRESP packet always has a "Remaining length" of 0. */ + +/** + * @brief Per the MQTT 3.1.1 spec, the largest "Remaining Length" of an MQTT + * packet is this value. + */ +#define MQTT_MAX_REMAINING_LENGTH ( 268435455UL ) + +/** + * @brief Set a bit in an 8-bit unsigned integer. + */ +#define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) ) + +/** + * @brief Macro for checking if a bit is set in a 1-byte unsigned int. + * + * @param[in] x The unsigned int to check. + * @param[in] position Which bit to check. + */ +#define UINT8_CHECK_BIT( x, position ) ( ( ( x ) & ( 0x01U << ( position ) ) ) == ( 0x01U << ( position ) ) ) + +/** + * @brief Get the high byte of a 16-bit unsigned integer. + */ +#define UINT16_HIGH_BYTE( x ) ( ( uint8_t ) ( ( x ) >> 8 ) ) + +/** + * @brief Get the low byte of a 16-bit unsigned integer. + */ +#define UINT16_LOW_BYTE( x ) ( ( uint8_t ) ( ( x ) & 0x00ffU ) ) + +/** + * @brief Macro for decoding a 2-byte unsigned int from a sequence of bytes. + * + * @param[in] ptr A uint8_t* that points to the high byte. + */ +#define UINT16_DECODE( ptr ) \ + ( uint16_t ) ( ( ( ( uint16_t ) ( *( ptr ) ) ) << 8 ) | \ + ( ( uint16_t ) ( *( ( ptr ) + 1 ) ) ) ) + +/** + * @brief A value that represents an invalid remaining length. + * + * This value is greater than what is allowed by the MQTT specification. + */ +#define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 ) + +/** + * @brief The minimum remaining length for a QoS 0 PUBLISH. + * + * Includes two bytes for topic name length and one byte for topic name. + */ +#define MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 ( 3U ) + +/*-----------------------------------------------------------*/ + +/* MQTT Subscription packet types. */ +typedef enum MQTTSubscriptionType +{ + MQTT_SUBSCRIBE, + MQTT_UNSUBSCRIBE +} MQTTSubscriptionType_t; + +/*-----------------------------------------------------------*/ + +/** + * @brief Serializes MQTT PUBLISH packet into the buffer provided. + * + * This function serializes MQTT PUBLISH packet into #pFixedBuffer.pBuffer. + * Copy of the payload into the buffer is done as part of the serialization + * only if #serializePayload is true. + * + * @brief param[in] pPublishInfo Publish information. + * @brief param[in] remainingLength Remaining length of the PUBLISH packet. + * @brief param[in] packetIdentifier Packet identifier of PUBLISH packet. + * @brief param[in, out] pFixedBuffer Buffer to which PUBLISH packet will be + * serialized. + * @brief param[in] serializePayload Copy payload to the serialized buffer + * only if true. Only PUBLISH header will be serialized if false. + * + * @return Total number of bytes sent; -1 if there is an error. + */ +static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo, + size_t remainingLength, + uint16_t packetIdentifier, + const MQTTFixedBuffer_t * pFixedBuffer, + bool serializePayload ); + +/** + * @brief Calculates the packet size and remaining length of an MQTT + * PUBLISH packet. + * + * @param[in] pPublishInfo MQTT PUBLISH packet parameters. + * @param[out] pRemainingLength The Remaining Length of the MQTT PUBLISH packet. + * @param[out] pPacketSize The total size of the MQTT PUBLISH packet. + * + * @return false if the packet would exceed the size allowed by the + * MQTT spec; true otherwise. + */ +static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, + size_t * pRemainingLength, + size_t * pPacketSize ); + +/** + * @brief Calculates the packet size and remaining length of an MQTT + * SUBSCRIBE or UNSUBSCRIBE packet. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[out] pRemainingLength The Remaining Length of the MQTT SUBSCRIBE or + * UNSUBSCRIBE packet. + * @param[out] pPacketSize The total size of the MQTT MQTT SUBSCRIBE or + * UNSUBSCRIBE packet. + * @param[in] subscriptionType #MQTT_SUBSCRIBE or #MQTT_UNSUBSCRIBE. + * + * #MQTTBadParameter if the packet would exceed the size allowed by the + * MQTT spec; #MQTTSuccess otherwise. + */ +static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize, + MQTTSubscriptionType_t subscriptionType ); + +/** + * @brief Validates parameters of #MQTT_SerializeSubscribe or + * #MQTT_SerializeUnsubscribe. + * + * @param[in] pSubscriptionList List of MQTT subscription info. + * @param[in] subscriptionCount The number of elements in pSubscriptionList. + * @param[in] packetId Packet identifier. + * @param[in] remainingLength Remaining length of the packet. + * @param[in] pBuffer Buffer for packet serialization. + * + * @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet; + * #MQTTBadParameter if invalid parameters are passed; + * #MQTTSuccess otherwise. + */ +static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/** + * @brief Serialize an MQTT CONNECT packet in the given buffer. + * + * @param[in] pConnectInfo MQTT CONNECT packet parameters. + * @param[in] pWillInfo Last Will and Testament. Pass NULL if not used. + * @param[in] remainingLength Remaining Length of MQTT CONNECT packet. + * @param[out] pBuffer Buffer for packet serialization. + */ +static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ); + +/*-----------------------------------------------------------*/ + +static size_t remainingLengthEncodedSize( size_t length ) +{ + size_t encodedSize; + + /* Determine how many bytes are needed to encode length. + * The values below are taken from the MQTT 3.1.1 spec. */ + + /* 1 byte is needed to encode lengths between 0 and 127. */ + if( length < 128U ) + { + encodedSize = 1U; + } + /* 2 bytes are needed to encode lengths between 128 and 16,383. */ + else if( length < 16384U ) + { + encodedSize = 2U; + } + /* 3 bytes are needed to encode lengths between 16,384 and 2,097,151. */ + else if( length < 2097152U ) + { + encodedSize = 3U; + } + /* 4 bytes are needed to encode lengths between 2,097,152 and 268,435,455. */ + else + { + encodedSize = 4U; + } + + LogDebug( ( "Encoded size for length =%ul is %ul.", + length, + encodedSize ) ); + + return encodedSize; +} + +/*-----------------------------------------------------------*/ + +static uint8_t * encodeRemainingLength( uint8_t * pDestination, + size_t length ) +{ + uint8_t lengthByte; + uint8_t * pLengthEnd = NULL; + size_t remainingLength = length; + + assert( pDestination != NULL ); + + pLengthEnd = pDestination; + + /* This algorithm is copied from the MQTT v3.1.1 spec. */ + do + { + lengthByte = ( uint8_t ) ( remainingLength % 128U ); + remainingLength = remainingLength / 128U; + + /* Set the high bit of this byte, indicating that there's more data. */ + if( remainingLength > 0U ) + { + UINT8_SET_BIT( lengthByte, 7 ); + } + + /* Output a single encoded byte. */ + *pLengthEnd = lengthByte; + pLengthEnd++; + } while( remainingLength > 0U ); + + return pLengthEnd; +} + +/*-----------------------------------------------------------*/ + +static uint8_t * encodeString( uint8_t * pDestination, + const char * source, + uint16_t sourceLength ) +{ + uint8_t * pBuffer = NULL; + + /* Typecast const char * typed source buffer to const uint8_t *. + * This is to use same type buffers in memcpy. */ + const uint8_t * pSourceBuffer = ( const uint8_t * ) source; + + assert( pDestination != NULL ); + + pBuffer = pDestination; + + /* The first byte of a UTF-8 string is the high byte of the string length. */ + *pBuffer = UINT16_HIGH_BYTE( sourceLength ); + pBuffer++; + + /* The second byte of a UTF-8 string is the low byte of the string length. */ + *pBuffer = UINT16_LOW_BYTE( sourceLength ); + pBuffer++; + + /* Copy the string into pBuffer. */ + if( pSourceBuffer != NULL ) + { + ( void ) memcpy( pBuffer, pSourceBuffer, sourceLength ); + } + + /* Return the pointer to the end of the encoded string. */ + pBuffer += sourceLength; + + return pBuffer; +} + +/*-----------------------------------------------------------*/ + +static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, + size_t * pRemainingLength, + size_t * pPacketSize ) +{ + bool status = true; + size_t packetSize = 0, payloadLimit = 0; + + assert( pPublishInfo != NULL ); + assert( pRemainingLength != NULL ); + assert( pPacketSize != NULL ); + + /* The variable header of a PUBLISH packet always contains the topic name. + * The first 2 bytes of UTF-8 string contains length of the string. + */ + packetSize += pPublishInfo->topicNameLength + sizeof( uint16_t ); + + /* The variable header of a QoS 1 or 2 PUBLISH packet contains a 2-byte + * packet identifier. */ + if( pPublishInfo->qos > MQTTQoS0 ) + { + packetSize += sizeof( uint16_t ); + } + + /* Calculate the maximum allowed size of the payload for the given parameters. + * This calculation excludes the "Remaining length" encoding, whose size is not + * yet known. */ + payloadLimit = MQTT_MAX_REMAINING_LENGTH - packetSize - 1U; + + /* Ensure that the given payload fits within the calculated limit. */ + if( pPublishInfo->payloadLength > payloadLimit ) + { + LogError( ( "PUBLISH payload length of %lu cannot exceed " + "%lu so as not to exceed the maximum " + "remaining length of MQTT 3.1.1 packet( %lu ).", + pPublishInfo->payloadLength, + payloadLimit, + MQTT_MAX_REMAINING_LENGTH ) ); + status = false; + } + else + { + /* Add the length of the PUBLISH payload. At this point, the "Remaining length" + * has been calculated. */ + packetSize += pPublishInfo->payloadLength; + + /* Now that the "Remaining length" is known, recalculate the payload limit + * based on the size of its encoding. */ + payloadLimit -= remainingLengthEncodedSize( packetSize ); + + /* Check that the given payload fits within the size allowed by MQTT spec. */ + if( pPublishInfo->payloadLength > payloadLimit ) + { + LogError( ( "PUBLISH payload length of %lu cannot exceed " + "%lu so as not to exceed the maximum " + "remaining length of MQTT 3.1.1 packet( %lu ).", + pPublishInfo->payloadLength, + payloadLimit, + MQTT_MAX_REMAINING_LENGTH ) ); + status = false; + } + else + { + /* Set the "Remaining length" output parameter and calculate the full + * size of the PUBLISH packet. */ + *pRemainingLength = packetSize; + + packetSize += 1U + remainingLengthEncodedSize( packetSize ); + *pPacketSize = packetSize; + } + } + + LogDebug( ( "PUBLISH packet remaining length=%lu and packet size=%lu.", + *pRemainingLength, + *pPacketSize ) ); + return status; +} + +/*-----------------------------------------------------------*/ + +static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo, + size_t remainingLength, + uint16_t packetIdentifier, + const MQTTFixedBuffer_t * pFixedBuffer, + bool serializePayload ) +{ + uint8_t * pIndex = NULL; + const uint8_t * pPayloadBuffer = NULL; + + /* The first byte of a PUBLISH packet contains the packet type and flags. */ + uint8_t publishFlags = MQTT_PACKET_TYPE_PUBLISH; + + assert( pPublishInfo != NULL ); + assert( pFixedBuffer != NULL ); + /* Packet Id should be non zero for QoS1 and QoS2. */ + assert( pPublishInfo->qos == MQTTQoS0 || packetIdentifier != 0U ); + /* Duplicate flag should be set only for Qos1 or Qos2. */ + assert( ( !pPublishInfo->dup ) || ( pPublishInfo->qos > MQTTQoS0 ) ); + + /* Get the start address of the buffer. */ + pIndex = pFixedBuffer->pBuffer; + + if( pPublishInfo->qos == MQTTQoS1 ) + { + LogDebug( ( "Adding QoS as QoS1 in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ); + } + else if( pPublishInfo->qos == MQTTQoS2 ) + { + LogDebug( ( "Adding QoS as QoS2 in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 ); + } + else + { + /* Empty else MISRA 15.7 */ + } + + if( pPublishInfo->retain == true ) + { + LogDebug( ( "Adding retain bit in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN ); + } + + if( pPublishInfo->dup == true ) + { + LogDebug( ( "Adding dup bit in PUBLISH flags." ) ); + UINT8_SET_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP ); + } + + *pIndex = publishFlags; + pIndex++; + + /* The "Remaining length" is encoded from the second byte. */ + pIndex = encodeRemainingLength( pIndex, remainingLength ); + + /* The topic name is placed after the "Remaining length". */ + pIndex = encodeString( pIndex, + pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ); + + /* A packet identifier is required for QoS 1 and 2 messages. */ + if( pPublishInfo->qos > MQTTQoS0 ) + { + LogDebug( ( "Adding packet Id in PUBLISH packet." ) ); + /* Place the packet identifier into the PUBLISH packet. */ + *pIndex = UINT16_HIGH_BYTE( packetIdentifier ); + *( pIndex + 1 ) = UINT16_LOW_BYTE( packetIdentifier ); + pIndex += 2; + } + + /* The payload is placed after the packet identifier. + * Payload is copied over only if required by the flag serializePayload. + * This will help reduce an unnecessary copy of the payload into the buffer. + */ + if( ( pPublishInfo->payloadLength > 0U ) && + ( serializePayload == true ) ) + { + LogDebug( ( "Copying PUBLISH payload of length =%lu to buffer", + pPublishInfo->payloadLength ) ); + + /* Typecast const void * typed payload buffer to const uint8_t *. + * This is to use same type buffers in memcpy. */ + pPayloadBuffer = ( const uint8_t * ) pPublishInfo->pPayload; + + ( void ) memcpy( pIndex, pPayloadBuffer, pPublishInfo->payloadLength ); + pIndex += pPublishInfo->payloadLength; + } + + /* Ensure that the difference between the end and beginning of the buffer + * is less than the buffer size. */ + assert( ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) <= pFixedBuffer->size ); +} + +static size_t getRemainingLength( MQTTTransportRecvFunc_t recvFunc, + NetworkContext_t networkContext ) +{ + size_t remainingLength = 0, multiplier = 1, bytesDecoded = 0, expectedSize = 0; + uint8_t encodedByte = 0; + int32_t bytesReceived = 0; + + /* This algorithm is copied from the MQTT v3.1.1 spec. */ + do + { + if( multiplier > 2097152U ) /* 128 ^ 3 */ + { + remainingLength = MQTT_REMAINING_LENGTH_INVALID; + } + else + { + bytesReceived = recvFunc( networkContext, &encodedByte, 1U ); + + if( bytesReceived == 1 ) + { + remainingLength += ( ( size_t ) encodedByte & 0x7FU ) * multiplier; + multiplier *= 128U; + bytesDecoded++; + } + else + { + remainingLength = MQTT_REMAINING_LENGTH_INVALID; + } + } + + if( remainingLength == MQTT_REMAINING_LENGTH_INVALID ) + { + break; + } + } while( ( encodedByte & 0x80U ) != 0U ); + + /* Check that the decoded remaining length conforms to the MQTT specification. */ + if( remainingLength != MQTT_REMAINING_LENGTH_INVALID ) + { + expectedSize = remainingLengthEncodedSize( remainingLength ); + + if( bytesDecoded != expectedSize ) + { + remainingLength = MQTT_REMAINING_LENGTH_INVALID; + } + } + + return remainingLength; +} + +/*-----------------------------------------------------------*/ + +static bool incomingPacketValid( uint8_t packetType ) +{ + bool status = false; + + /* Check packet type. Mask out lower bits to ignore flags. */ + switch( packetType & 0xF0U ) + { + /* Valid incoming packet types. */ + case MQTT_PACKET_TYPE_CONNACK: + case MQTT_PACKET_TYPE_PUBLISH: + case MQTT_PACKET_TYPE_PUBACK: + case MQTT_PACKET_TYPE_PUBREC: + case MQTT_PACKET_TYPE_PUBCOMP: + case MQTT_PACKET_TYPE_SUBACK: + case MQTT_PACKET_TYPE_UNSUBACK: + case MQTT_PACKET_TYPE_PINGRESP: + status = true; + break; + + case ( MQTT_PACKET_TYPE_PUBREL & 0xF0U ): + + /* The second bit of a PUBREL must be set. */ + if( ( packetType & 0x02U ) > 0U ) + { + status = true; + } + + break; + + /* Any other packet type is invalid. */ + default: + LogWarn( ( "Incoming packet invalid: Packet type=%u", + packetType ) ); + break; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t checkPublishRemainingLength( size_t remainingLength, + MQTTQoS_t qos, + size_t qos0Minimum ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Sanity checks for "Remaining length". */ + if( qos == MQTTQoS0 ) + { + /* Check that the "Remaining length" is greater than the minimum. */ + if( remainingLength < qos0Minimum ) + { + LogDebug( ( "QoS 0 PUBLISH cannot have a remaining length less than %lu.", + qos0Minimum ) ); + + status = MQTTBadResponse; + } + } + else + { + /* Check that the "Remaining length" is greater than the minimum. For + * QoS 1 or 2, this will be two bytes greater than for QoS 0 due to the + * packet identifier. */ + if( remainingLength < ( qos0Minimum + 2U ) ) + { + LogDebug( ( "QoS 1 or 2 PUBLISH cannot have a remaining length less than %lu.", + qos0Minimum + 2U ) ); + + status = MQTTBadResponse; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t processPublishFlags( uint8_t publishFlags, + MQTTPublishInfo_t * pPublishInfo ) +{ + MQTTStatus_t status = MQTTSuccess; + + assert( pPublishInfo != NULL ); + + /* Check for QoS 2. */ + if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS2 ) ) + { + /* PUBLISH packet is invalid if both QoS 1 and QoS 2 bits are set. */ + if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ) ) + { + LogDebug( ( "Bad QoS: 3." ) ); + + status = MQTTBadResponse; + } + else + { + pPublishInfo->qos = MQTTQoS2; + } + } + /* Check for QoS 1. */ + else if( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_QOS1 ) ) + { + pPublishInfo->qos = MQTTQoS1; + } + /* If the PUBLISH isn't QoS 1 or 2, then it's QoS 0. */ + else + { + pPublishInfo->qos = MQTTQoS0; + } + + if( status == MQTTSuccess ) + { + LogDebug( ( "QoS is %d.", pPublishInfo->qos ) ); + + /* Parse the Retain bit. */ + pPublishInfo->retain = ( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_RETAIN ) ) ? true : false; + + LogDebug( ( "Retain bit is %d.", pPublishInfo->retain ) ); + + /* Parse the DUP bit. */ + pPublishInfo->dup = ( UINT8_CHECK_BIT( publishFlags, MQTT_PUBLISH_FLAG_DUP ) ) ? true : false; + + LogDebug( ( "DUP bit is %d.", pPublishInfo->dup ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static void logConnackResponse( uint8_t responseCode ) +{ + const char * const pConnackResponses[ 6 ] = + { + "Connection accepted.", /* 0 */ + "Connection refused: unacceptable protocol version.", /* 1 */ + "Connection refused: identifier rejected.", /* 2 */ + "Connection refused: server unavailable", /* 3 */ + "Connection refused: bad user name or password.", /* 4 */ + "Connection refused: not authorized." /* 5 */ + }; + + /* Avoid unused parameter warning when assert and logs are disabled. */ + ( void ) responseCode; + ( void ) pConnackResponses; + + assert( responseCode <= 5 ); + + if( responseCode == 0u ) + { + /* Log at Info level for a success CONNACK response. */ + LogInfo( ( "%s", pConnackResponses[ 0 ] ) ); + } + else + { + /* Log an error based on the CONNACK response code. */ + LogError( ( "%s", pConnackResponses[ responseCode ] ) ); + } +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t deserializeConnack( const MQTTPacketInfo_t * pConnack, + bool * pSessionPresent ) +{ + MQTTStatus_t status = MQTTSuccess; + const uint8_t * pRemainingData = NULL; + + assert( pConnack != NULL ); + assert( pSessionPresent != NULL ); + pRemainingData = pConnack->pRemainingData; + + /* According to MQTT 3.1.1, the second byte of CONNACK must specify a + * "Remaining length" of 2. */ + if( pConnack->remainingLength != MQTT_PACKET_CONNACK_REMAINING_LENGTH ) + { + LogError( ( "CONNACK does not have remaining length of %d.", + MQTT_PACKET_CONNACK_REMAINING_LENGTH ) ); + + status = MQTTBadResponse; + } + + /* Check the reserved bits in CONNACK. The high 7 bits of the second byte + * in CONNACK must be 0. */ + else if( ( pRemainingData[ 0 ] | 0x01U ) != 0x01U ) + { + LogError( ( "Reserved bits in CONNACK incorrect." ) ); + + status = MQTTBadResponse; + } + else + { + /* Determine if the "Session Present" bit is set. This is the lowest bit of + * the second byte in CONNACK. */ + if( ( pRemainingData[ 0 ] & MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ) + == MQTT_PACKET_CONNACK_SESSION_PRESENT_MASK ) + { + LogWarn( ( "CONNACK session present bit set." ) ); + *pSessionPresent = true; + + /* MQTT 3.1.1 specifies that the fourth byte in CONNACK must be 0 if the + * "Session Present" bit is set. */ + if( pRemainingData[ 1 ] != 0U ) + { + status = MQTTBadResponse; + } + } + else + { + LogInfo( ( "CONNACK session present bit not set." ) ); + } + } + + if( status == MQTTSuccess ) + { + /* In MQTT 3.1.1, only values 0 through 5 are valid CONNACK response codes. */ + if( pRemainingData[ 1 ] > 5U ) + { + LogError( ( "CONNACK response %hhu is not valid.", + pRemainingData[ 1 ] ) ); + + status = MQTTBadResponse; + } + else + { + /* Print the appropriate message for the CONNACK response code if logs are + * enabled. */ + logConnackResponse( pRemainingData[ 1 ] ); + + /* A nonzero CONNACK response code means the connection was refused. */ + if( pRemainingData[ 1 ] > 0U ) + { + status = MQTTServerRefused; + } + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize, + MQTTSubscriptionType_t subscriptionType ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t i = 0, packetSize = 0; + + assert( pSubscriptionList != NULL ); + assert( subscriptionCount != 0U ); + assert( pRemainingLength != NULL ); + assert( pPacketSize != NULL ); + + /* The variable header of a subscription packet consists of a 2-byte packet + * identifier. */ + packetSize += sizeof( uint16_t ); + + /* Sum the lengths of all subscription topic filters; add 1 byte for each + * subscription's QoS if type is MQTT_SUBSCRIBE. */ + for( i = 0; i < subscriptionCount; i++ ) + { + /* Add the length of the topic filter. MQTT strings are prepended + * with 2 byte string length field. Hence 2 bytes are added to size. */ + packetSize += pSubscriptionList[ i ].topicFilterLength + sizeof( uint16_t ); + + /* Only SUBSCRIBE packets include the QoS. */ + if( subscriptionType == MQTT_SUBSCRIBE ) + { + packetSize += 1U; + } + } + + /* At this point, the "Remaining length" has been calculated. Return error + * if the "Remaining length" exceeds what is allowed by MQTT 3.1.1. Otherwise, + * set the output parameter.*/ + if( packetSize > MQTT_MAX_REMAINING_LENGTH ) + { + LogError( ( "Subscription packet length of %lu exceeds" + "the MQTT 3.1.1 maximum packet length of %lu.", + packetSize, + MQTT_MAX_REMAINING_LENGTH ) ); + status = MQTTBadParameter; + } + else + { + *pRemainingLength = packetSize; + + /* Calculate the full size of the subscription packet by adding + * number of bytes required to encode the "Remaining length" field + * plus 1 byte for the "Packet type" field. */ + packetSize += 1U + remainingLengthEncodedSize( packetSize ); + + /*Set the pPacketSize output parameter. */ + *pPacketSize = packetSize; + } + + LogDebug( ( "Subscription packet remaining length=%lu and packet size=%lu.", + *pRemainingLength, + *pPacketSize ) ); + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t readSubackStatus( size_t statusCount, + const uint8_t * pStatusStart ) +{ + MQTTStatus_t status = MQTTSuccess; + uint8_t subscriptionStatus = 0; + size_t i = 0; + + assert( pStatusStart != NULL ); + + /* Iterate through each status byte in the SUBACK packet. */ + for( i = 0; i < statusCount; i++ ) + { + /* Read a single status byte in SUBACK. */ + subscriptionStatus = pStatusStart[ i ]; + + /* MQTT 3.1.1 defines the following values as status codes. */ + switch( subscriptionStatus ) + { + case 0x00: + case 0x01: + case 0x02: + + LogDebug( ( "Topic filter %lu accepted, max QoS %hhu.", + ( unsigned long ) i, subscriptionStatus ) ); + break; + + case 0x80: + + LogDebug( ( "Topic filter %lu refused.", ( unsigned long ) i ) ); + + /* Application should remove subscription from the list */ + status = MQTTServerRefused; + + break; + + default: + LogDebug( ( "Bad SUBSCRIBE status %hhu.", subscriptionStatus ) ); + + status = MQTTBadResponse; + + break; + } + + /* Stop parsing the subscription statuses if a bad response was received. */ + if( status == MQTTBadResponse ) + { + break; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t deserializeSuback( const MQTTPacketInfo_t * pSuback, + uint16_t * pPacketIdentifier ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t remainingLength; + const uint8_t * pVariableHeader = NULL; + + assert( pSuback != NULL ); + assert( pPacketIdentifier != NULL ); + + remainingLength = pSuback->remainingLength; + pVariableHeader = pSuback->pRemainingData; + + /* A SUBACK must have a remaining length of at least 3 to accommodate the + * packet identifier and at least 1 return code. */ + if( remainingLength < 3U ) + { + LogDebug( ( "SUBACK cannot have a remaining length less than 3." ) ); + status = MQTTBadResponse; + } + else + { + /* Extract the packet identifier (first 2 bytes of variable header) from SUBACK. */ + *pPacketIdentifier = UINT16_DECODE( pVariableHeader ); + + LogDebug( ( "Packet identifier %hu.", *pPacketIdentifier ) ); + + status = readSubackStatus( remainingLength - sizeof( uint16_t ), + pVariableHeader + sizeof( uint16_t ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* The serialized packet size = First byte + * + length of encoded size of remaining length + * + remaining length. */ + size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength; + + /* Validate all the parameters. */ + if( ( pBuffer == NULL ) || ( pSubscriptionList == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pBuffer=%p, " + "pSubscriptionList=%p.", + pBuffer, + pSubscriptionList ) ); + status = MQTTBadParameter; + } + else if( subscriptionCount == 0U ) + { + LogError( ( "Subscription count is 0." ) ); + status = MQTTBadParameter; + } + else if( packetId == 0U ) + { + LogError( ( "Packet Id for subscription packet is 0." ) ); + status = MQTTBadParameter; + } + else if( packetSize > pBuffer->size ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized packet of size of %lu.", + pBuffer->size, + packetSize ) ); + status = MQTTNoMemory; + } + else + { + /* Empty else MISRA 15.7 */ + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t deserializePublish( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + MQTTPublishInfo_t * pPublishInfo ) +{ + MQTTStatus_t status = MQTTSuccess; + const uint8_t * pVariableHeader, * pPacketIdentifierHigh; + + assert( pIncomingPacket != NULL ); + assert( pPacketId != NULL ); + assert( pPublishInfo != NULL ); + pVariableHeader = pIncomingPacket->pRemainingData; + /* The flags are the lower 4 bits of the first byte in PUBLISH. */ + status = processPublishFlags( ( pIncomingPacket->type & 0x0FU ), pPublishInfo ); + + if( status == MQTTSuccess ) + { + /* Sanity checks for "Remaining length". A QoS 0 PUBLISH must have a remaining + * length of at least 3 to accommodate topic name length (2 bytes) and topic + * name (at least 1 byte). A QoS 1 or 2 PUBLISH must have a remaining length of + * at least 5 for the packet identifier in addition to the topic name length and + * topic name. */ + status = checkPublishRemainingLength( pIncomingPacket->remainingLength, + pPublishInfo->qos, + MQTT_MIN_PUBLISH_REMAINING_LENGTH_QOS0 ); + } + + if( status == MQTTSuccess ) + { + /* Extract the topic name starting from the first byte of the variable header. + * The topic name string starts at byte 3 in the variable header. */ + pPublishInfo->topicNameLength = UINT16_DECODE( pVariableHeader ); + + /* Sanity checks for topic name length and "Remaining length". The remaining + * length must be at least as large as the variable length header. */ + status = checkPublishRemainingLength( pIncomingPacket->remainingLength, + pPublishInfo->qos, + pPublishInfo->topicNameLength + sizeof( uint16_t ) ); + } + + if( status == MQTTSuccess ) + { + /* Parse the topic. */ + pPublishInfo->pTopicName = ( const char * ) ( pVariableHeader + sizeof( uint16_t ) ); + LogDebug( ( "Topic name length %hu: %.*s", + pPublishInfo->topicNameLength, + pPublishInfo->topicNameLength, + pPublishInfo->pTopicName ) ); + + /* Extract the packet identifier for QoS 1 or 2 PUBLISH packets. Packet + * identifier starts immediately after the topic name. */ + pPacketIdentifierHigh = ( const uint8_t * ) ( pPublishInfo->pTopicName + pPublishInfo->topicNameLength ); + + if( pPublishInfo->qos > MQTTQoS0 ) + { + *pPacketId = UINT16_DECODE( pPacketIdentifierHigh ); + + LogDebug( ( "Packet identifier %hu.", *pPacketId ) ); + + /* Packet identifier cannot be 0. */ + if( *pPacketId == 0U ) + { + status = MQTTBadResponse; + } + } + } + + if( status == MQTTSuccess ) + { + /* Calculate the length of the payload. QoS 1 or 2 PUBLISH packets contain + * a packet identifier, but QoS 0 PUBLISH packets do not. */ + if( pPublishInfo->qos == MQTTQoS0 ) + { + pPublishInfo->payloadLength = ( pIncomingPacket->remainingLength - pPublishInfo->topicNameLength - sizeof( uint16_t ) ); + pPublishInfo->pPayload = pPacketIdentifierHigh; + } + else + { + pPublishInfo->payloadLength = ( pIncomingPacket->remainingLength - pPublishInfo->topicNameLength - 2U * sizeof( uint16_t ) ); + pPublishInfo->pPayload = pPacketIdentifierHigh + sizeof( uint16_t ); + } + + LogDebug( ( "Payload length %hu.", pPublishInfo->payloadLength ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t deserializeSimpleAck( const MQTTPacketInfo_t * pAck, + uint16_t * pPacketIdentifier ) +{ + MQTTStatus_t status = MQTTSuccess; + + assert( pAck != NULL ); + assert( pPacketIdentifier != NULL ); + + /* Check that the "Remaining length" of the received ACK is 2. */ + if( pAck->remainingLength != MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ) + { + LogError( ( "ACK does not have remaining length of %d.", + MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH ) ); + + status = MQTTBadResponse; + } + else + { + /* Extract the packet identifier (third and fourth bytes) from ACK. */ + *pPacketIdentifier = UINT16_DECODE( pAck->pRemainingData ); + + LogDebug( ( "Packet identifier %hu.", *pPacketIdentifier ) ); + + /* Packet identifier cannot be 0. */ + if( *pPacketIdentifier == 0U ) + { + status = MQTTBadResponse; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp ) +{ + MQTTStatus_t status = MQTTSuccess; + + assert( pPingresp != NULL ); + + /* Check the "Remaining length" (second byte) of the received PINGRESP is 0. */ + if( pPingresp->remainingLength != MQTT_PACKET_PINGRESP_REMAINING_LENGTH ) + { + LogError( ( "PINGRESP does not have remaining length of %d.", + MQTT_PACKET_PINGRESP_REMAINING_LENGTH ) ); + + status = MQTTBadResponse; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + uint8_t connectFlags = 0U; + uint8_t * pIndex = NULL; + + assert( pConnectInfo != NULL ); + assert( pBuffer != NULL ); + + pIndex = pBuffer->pBuffer; + /* The first byte in the CONNECT packet is the control packet type. */ + *pIndex = MQTT_PACKET_TYPE_CONNECT; + pIndex++; + + /* The remaining length of the CONNECT packet is encoded starting from the + * second byte. The remaining length does not include the length of the fixed + * header or the encoding of the remaining length. */ + pIndex = encodeRemainingLength( pIndex, remainingLength ); + + /* The string "MQTT" is placed at the beginning of the CONNECT packet's variable + * header. This string is 4 bytes long. */ + pIndex = encodeString( pIndex, "MQTT", 4 ); + + /* The MQTT protocol version is the second field of the variable header. */ + *pIndex = MQTT_VERSION_3_1_1; + pIndex++; + + /* Set the clean session flag if needed. */ + if( pConnectInfo->cleanSession == true ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_CLEAN ); + } + + /* Set the flags for username and password if provided. */ + if( pConnectInfo->pUserName != NULL ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_USERNAME ); + } + + if( pConnectInfo->pPassword != NULL ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_PASSWORD ); + } + + /* Set will flag if a Last Will and Testament is provided. */ + if( pWillInfo != NULL ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL ); + + /* Flags only need to be changed for Will QoS 1 or 2. */ + if( pWillInfo->qos == MQTTQoS1 ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_QOS1 ); + } + else if( pWillInfo->qos == MQTTQoS2 ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_QOS2 ); + } + else + { + /* Empty else MISRA 15.7 */ + } + + if( pWillInfo->retain == true ) + { + UINT8_SET_BIT( connectFlags, MQTT_CONNECT_FLAG_WILL_RETAIN ); + } + } + + *pIndex = connectFlags; + pIndex++; + + /* Write the 2 bytes of the keep alive interval into the CONNECT packet. */ + *pIndex = UINT16_HIGH_BYTE( pConnectInfo->keepAliveSeconds ); + *( pIndex + 1 ) = UINT16_LOW_BYTE( pConnectInfo->keepAliveSeconds ); + pIndex += 2; + + /* Write the client identifier into the CONNECT packet. */ + pIndex = encodeString( pIndex, + pConnectInfo->pClientIdentifier, + pConnectInfo->clientIdentifierLength ); + + /* Write the will topic name and message into the CONNECT packet if provided. */ + if( pWillInfo != NULL ) + { + pIndex = encodeString( pIndex, + pWillInfo->pTopicName, + pWillInfo->topicNameLength ); + + pIndex = encodeString( pIndex, + pWillInfo->pPayload, + ( uint16_t ) pWillInfo->payloadLength ); + } + + /* Encode the user name if provided. */ + if( pConnectInfo->pUserName != NULL ) + { + pIndex = encodeString( pIndex, pConnectInfo->pUserName, pConnectInfo->userNameLength ); + } + + /* Encode the password if provided. */ + if( pConnectInfo->pPassword != NULL ) + { + pIndex = encodeString( pIndex, pConnectInfo->pPassword, pConnectInfo->passwordLength ); + } + + LogDebug( ( "Length of serialized CONNECT packet is %lu.", + ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) ); + + /* Ensure that the difference between the end and beginning of the buffer + * is less than the buffer size. */ + assert( ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) <= pBuffer->size ); +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t * pRemainingLength, + size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t remainingLength; + + /* The CONNECT packet will always include a 10-byte variable header. */ + size_t connectPacketSize = MQTT_PACKET_CONNECT_HEADER_SIZE; + + /* Validate arguments. */ + if( ( pConnectInfo == NULL ) || ( pRemainingLength == NULL ) || + ( pPacketSize == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pConnectInfo=%p, " + "pRemainingLength=%p, pPacketSize=%p.", + pConnectInfo, + pRemainingLength, + pPacketSize ) ); + status = MQTTBadParameter; + } + else if( ( pConnectInfo->clientIdentifierLength == 0U ) || ( pConnectInfo->pClientIdentifier == NULL ) ) + { + LogError( ( "Mqtt_GetConnectPacketSize() client identifier must be set." ) ); + status = MQTTBadParameter; + } + else + { + /* Add the length of the client identifier. */ + connectPacketSize += pConnectInfo->clientIdentifierLength + sizeof( uint16_t ); + + /* Add the lengths of the will message and topic name if provided. */ + if( pWillInfo != NULL ) + { + connectPacketSize += pWillInfo->topicNameLength + sizeof( uint16_t ) + + pWillInfo->payloadLength + sizeof( uint16_t ); + } + + /* Add the lengths of the user name and password if provided. */ + if( pConnectInfo->pUserName != NULL ) + { + connectPacketSize += pConnectInfo->userNameLength + sizeof( uint16_t ); + } + + if( pConnectInfo->pPassword != NULL ) + { + connectPacketSize += pConnectInfo->passwordLength + sizeof( uint16_t ); + } + + /* At this point, the "Remaining Length" field of the MQTT CONNECT packet has + * been calculated. */ + remainingLength = connectPacketSize; + + /* Calculate the full size of the MQTT CONNECT packet by adding the size of + * the "Remaining Length" field plus 1 byte for the "Packet Type" field. */ + connectPacketSize += 1U + remainingLengthEncodedSize( connectPacketSize ); + + /* Check that the CONNECT packet is within the bounds of the MQTT spec. */ + if( connectPacketSize > MQTT_PACKET_CONNECT_MAX_SIZE ) + { + status = MQTTBadParameter; + } + else + { + *pRemainingLength = remainingLength; + *pPacketSize = connectPacketSize; + } + + LogDebug( ( "CONNECT packet remaining length=%lu and packet size=%lu.", + *pRemainingLength, + *pPacketSize ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, + const MQTTPublishInfo_t * pWillInfo, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Calculate CONNECT packet size. */ + size_t connectPacketSize = remainingLength + remainingLengthEncodedSize( remainingLength ) + 1U; + + /* Validate arguments. */ + if( ( pConnectInfo == NULL ) || ( pBuffer == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pConnectInfo=%p, " + "pBuffer=%p.", + pConnectInfo, + pBuffer ) ); + status = MQTTBadParameter; + } + /* Check that the full packet size fits within the given buffer. */ + else if( connectPacketSize > pBuffer->size ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized CONNECT packet of size of %lu.", + pBuffer->size, + connectPacketSize ) ); + status = MQTTNoMemory; + } + else if( ( pWillInfo != NULL ) && ( pWillInfo->pTopicName == NULL ) ) + { + LogError( ( "pWillInfo->pTopicName cannot be NULL if Will is present." ) ); + status = MQTTBadParameter; + } + else + { + serializeConnectPacket( pConnectInfo, + pWillInfo, + remainingLength, + pBuffer ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate parameters. */ + if( ( pSubscriptionList == NULL ) || ( pRemainingLength == NULL ) || + ( pPacketSize == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, " + "pRemainingLength=%p, pPacketSize=%p.", + pSubscriptionList, + pRemainingLength, + pPacketSize ) ); + status = MQTTBadParameter; + } + else if( subscriptionCount == 0U ) + { + LogError( ( " subscriptionCount is 0." ) ); + status = MQTTBadParameter; + } + else + { + /* Calculate the MQTT UNSUBSCRIBE packet size. */ + status = calculateSubscriptionPacketSize( pSubscriptionList, + subscriptionCount, + pRemainingLength, + pPacketSize, + MQTT_SUBSCRIBE ); + + if( status == MQTTBadParameter ) + { + LogError( ( "SUBSCRIBE packet remaining length exceeds %lu, which is the " + "maximum size allowed by MQTT 3.1.1.", + MQTT_MAX_REMAINING_LENGTH ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + size_t i = 0; + uint8_t * pIndex = NULL; + + /* Validate all the parameters. */ + MQTTStatus_t status = + validateSubscriptionSerializeParams( pSubscriptionList, + subscriptionCount, + packetId, + remainingLength, + pBuffer ); + + if( status == MQTTSuccess ) + { + pIndex = pBuffer->pBuffer; + + /* The first byte in SUBSCRIBE is the packet type. */ + *pIndex = MQTT_PACKET_TYPE_SUBSCRIBE; + pIndex++; + + /* Encode the "Remaining length" starting from the second byte. */ + pIndex = encodeRemainingLength( pIndex, remainingLength ); + + /* Place the packet identifier into the SUBSCRIBE packet. */ + *pIndex = UINT16_HIGH_BYTE( packetId ); + *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId ); + pIndex += 2; + + /* Serialize each subscription topic filter and QoS. */ + for( i = 0; i < subscriptionCount; i++ ) + { + pIndex = encodeString( pIndex, + pSubscriptionList[ i ].pTopicFilter, + pSubscriptionList[ i ].topicFilterLength ); + + /* Place the QoS in the SUBSCRIBE packet. */ + *pIndex = ( uint8_t ) ( pSubscriptionList[ i ].qos ); + pIndex++; + } + + LogDebug( ( "Length of serialized SUBSCRIBE packet is %lu.", + ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + size_t * pRemainingLength, + size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate parameters. */ + if( ( pSubscriptionList == NULL ) || ( pRemainingLength == NULL ) || + ( pPacketSize == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, " + "pRemainingLength=%p, pPacketSize=%p.", + pSubscriptionList, + pRemainingLength, + pPacketSize ) ); + status = MQTTBadParameter; + } + else if( subscriptionCount == 0U ) + { + LogError( ( "Subscription count is 0." ) ); + status = MQTTBadParameter; + } + else + { + /* Calculate the MQTT SUBSCRIBE packet size. */ + status = calculateSubscriptionPacketSize( pSubscriptionList, + subscriptionCount, + pRemainingLength, + pPacketSize, + MQTT_UNSUBSCRIBE ); + + if( status == MQTTBadParameter ) + { + LogError( ( "UNSUBSCRIBE packet remaining length exceeds %lu, which is the " + "maximum size allowed by MQTT 3.1.1.", + MQTT_MAX_REMAINING_LENGTH ) ); + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, + size_t subscriptionCount, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + size_t i = 0; + uint8_t * pIndex = NULL; + + /* Validate all the parameters. */ + status = validateSubscriptionSerializeParams( pSubscriptionList, + subscriptionCount, + packetId, + remainingLength, + pBuffer ); + + if( status == MQTTSuccess ) + { + /* Get the start of the buffer to the iterator variable. */ + pIndex = pBuffer->pBuffer; + + /* The first byte in UNSUBSCRIBE is the packet type. */ + *pIndex = MQTT_PACKET_TYPE_UNSUBSCRIBE; + pIndex++; + + /* Encode the "Remaining length" starting from the second byte. */ + pIndex = encodeRemainingLength( pIndex, remainingLength ); + + /* Place the packet identifier into the UNSUBSCRIBE packet. */ + *pIndex = UINT16_HIGH_BYTE( packetId ); + *( pIndex + 1 ) = UINT16_LOW_BYTE( packetId ); + pIndex += 2; + + /* Serialize each subscription topic filter. */ + for( i = 0; i < subscriptionCount; i++ ) + { + pIndex = encodeString( pIndex, + pSubscriptionList[ i ].pTopicFilter, + pSubscriptionList[ i ].topicFilterLength ); + } + + LogDebug( ( "Length of serialized UNSUBSCRIBE packet is %lu.", + ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, + size_t * pRemainingLength, + size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( ( pPublishInfo == NULL ) || ( pRemainingLength == NULL ) || ( pPacketSize == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pPublishInfo=%p, " + "pRemainingLength=%p, pPacketSize=%p.", + pPublishInfo, + pRemainingLength, + pPacketSize ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) ) + { + LogError( ( "Invalid topic name for PUBLISH: pTopicName=%p, " + "topicNameLength=%u.", + pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ) ); + status = MQTTBadParameter; + } + else + { + /* Calculate the "Remaining length" field and total packet size. If it exceeds + * what is allowed in the MQTT standard, return an error. */ + if( calculatePublishPacketSize( pPublishInfo, pRemainingLength, pPacketSize ) == false ) + { + LogError( ( "PUBLISH packet remaining length exceeds %lu, which is the " + "maximum size allowed by MQTT 3.1.1.", + MQTT_MAX_REMAINING_LENGTH ) ); + status = MQTTBadParameter; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Length of serialized packet = First byte + * + Length of encoded remaining length + * + Remaining length. */ + size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength; + + if( ( pBuffer == NULL ) || ( pPublishInfo == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pBuffer=%p, " + "pPublishInfo=%p.", + pBuffer, + pPublishInfo ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) ) + { + LogError( ( "Invalid topic name for PUBLISH: pTopicName=%p, " + "topicNameLength=%u.", + pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) ) + { + LogError( ( "Packet Id is 0 for PUBLISH with QoS=%u.", + pPublishInfo->qos ) ); + status = MQTTBadParameter; + } + else if( packetSize > pBuffer->size ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized PUBLISH packet of size of %lu.", + pBuffer->size, + packetSize ) ); + status = MQTTNoMemory; + } + else + { + /* Serialize publish with header and payload. */ + serializePublishCommon( pPublishInfo, + remainingLength, + packetId, + pBuffer, + true ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo, + uint16_t packetId, + size_t remainingLength, + const MQTTFixedBuffer_t * pBuffer, + size_t * pHeaderSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Length of serialized packet = First byte + * + Length of encoded remaining length + * + Remaining length + * - Payload Length. + * Payload length will be subtracted after verifying pPublishInfo parameter. + */ + size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength; + + if( ( pBuffer == NULL ) || ( pPublishInfo == NULL ) || + ( pHeaderSize == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pBuffer=%p, " + "pPublishInfo=%p, pHeaderSize=%p.", + pBuffer, + pPublishInfo, + pHeaderSize ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) ) + { + LogError( ( "Invalid topic name for publish: pTopicName=%p, " + "topicNameLength=%u.", + pPublishInfo->pTopicName, + pPublishInfo->topicNameLength ) ); + status = MQTTBadParameter; + } + else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) ) + { + LogError( ( "Packet Id is 0 for publish with QoS=%u.", + pPublishInfo->qos ) ); + status = MQTTBadParameter; + } + + + else if( ( packetSize - pPublishInfo->payloadLength ) > pBuffer->size ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized PUBLISH header packet of size of %lu.", + pBuffer->size, + ( packetSize - pPublishInfo->payloadLength ) ) ); + status = MQTTNoMemory; + } + else + { + /* Serialize publish without copying the payload. */ + serializePublishCommon( pPublishInfo, + remainingLength, + packetId, + pBuffer, + false ); + + /* Header size is the same as calculated packet size. */ + *pHeaderSize = ( packetSize - pPublishInfo->payloadLength ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pBuffer, + uint8_t packetType, + uint16_t packetId ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pBuffer == NULL ) + { + LogError( ( "Provided buffer is NULL." ) ); + status = MQTTBadParameter; + } + /* The buffer must be able to fit 4 bytes for the packet. */ + else if( pBuffer->size < MQTT_PUBLISH_ACK_PACKET_SIZE ) + { + LogError( ( "Insufficient memory for packet." ) ); + status = MQTTNoMemory; + } + else if( packetId == 0U ) + { + LogError( ( "Packet ID cannot be 0." ) ); + status = MQTTBadParameter; + } + else + { + switch( packetType ) + { + /* Only publish acks are serialized by the client. */ + case MQTT_PACKET_TYPE_PUBACK: + case MQTT_PACKET_TYPE_PUBREC: + case MQTT_PACKET_TYPE_PUBREL: + case MQTT_PACKET_TYPE_PUBCOMP: + pBuffer->pBuffer[ 0 ] = packetType; + pBuffer->pBuffer[ 1 ] = MQTT_PACKET_SIMPLE_ACK_REMAINING_LENGTH; + pBuffer->pBuffer[ 2 ] = UINT16_HIGH_BYTE( packetId ); + pBuffer->pBuffer[ 3 ] = UINT16_LOW_BYTE( packetId ); + break; + + default: + LogError( ( "Packet type is not a publish ACK: Packet type=%02x", + packetType ) ); + status = MQTTBadParameter; + break; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pPacketSize == NULL ) + { + LogError( ( "pPacketSize is NULL." ) ); + status = MQTTBadParameter; + } + else + { + /* MQTT DISCONNECT packets always have the same size. */ + *pPacketSize = MQTT_DISCONNECT_PACKET_SIZE; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + /* Validate arguments. */ + if( pBuffer == NULL ) + { + LogError( ( "pBuffer cannot be NULL." ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + if( pBuffer->size < MQTT_DISCONNECT_PACKET_SIZE ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized DISCONNECT packet of size of %lu.", + pBuffer->size, + MQTT_DISCONNECT_PACKET_SIZE ) ); + status = MQTTNoMemory; + } + } + + if( status == MQTTSuccess ) + { + pBuffer->pBuffer[ 0 ] = MQTT_PACKET_TYPE_DISCONNECT; + pBuffer->pBuffer[ 1 ] = MQTT_DISCONNECT_REMAINING_LENGTH; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pPacketSize == NULL ) + { + LogError( ( "pPacketSize is NULL." ) ); + status = MQTTBadParameter; + } + else + { + /* MQTT PINGREQ packets always have the same size. */ + *pPacketSize = MQTT_PACKET_PINGREQ_SIZE; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pBuffer ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pBuffer == NULL ) + { + LogError( ( "pBuffer is NULL." ) ); + status = MQTTBadParameter; + } + + if( status == MQTTSuccess ) + { + if( pBuffer->size < MQTT_PACKET_PINGREQ_SIZE ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized PINGREQ packet of size of %u.", + pBuffer->size, + MQTT_PACKET_PINGREQ_SIZE ) ); + status = MQTTNoMemory; + } + } + + if( status == MQTTSuccess ) + { + /* Ping request packets are always the same. */ + pBuffer->pBuffer[ 0 ] = MQTT_PACKET_TYPE_PINGREQ; + pBuffer->pBuffer[ 1 ] = 0x00; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + MQTTPublishInfo_t * pPublishInfo ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( ( pIncomingPacket == NULL ) || ( pPacketId == NULL ) || ( pPublishInfo == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pIncomingPacket=%p, " + "pPacketId=%p, pPublishInfo=%p", + pIncomingPacket, + pPacketId, + pPublishInfo ) ); + status = MQTTBadParameter; + } + else if( ( pIncomingPacket->type & 0xF0U ) != MQTT_PACKET_TYPE_PUBLISH ) + { + LogError( ( "Packet is not publish. Packet type: %hu.", + pIncomingPacket->type ) ); + status = MQTTBadParameter; + } + else + { + status = deserializePublish( pIncomingPacket, pPacketId, pPublishInfo ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + bool * pSessionPresent ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( ( pIncomingPacket == NULL ) ) + { + LogError( ( "pIncomingPacket cannot be NULL." ) ); + status = MQTTBadParameter; + } + + /* Pointer for packet identifier cannot be NULL for packets other than + * CONNACK and PINGRESP. */ + else if( ( pPacketId == NULL ) && + ( ( pIncomingPacket->type != MQTT_PACKET_TYPE_CONNACK ) && + ( pIncomingPacket->type != MQTT_PACKET_TYPE_PINGRESP ) ) ) + { + LogError( ( "pPacketId cannot be NULL for packet type %02x.", + pIncomingPacket->type ) ); + status = MQTTBadParameter; + } + /* Pointer for session present cannot be NULL for CONNACK. */ + else if( ( pSessionPresent == NULL ) && + ( pIncomingPacket->type == MQTT_PACKET_TYPE_CONNACK ) ) + { + LogError( ( "pSessionPresent cannot be NULL for CONNACK packet." ) ); + status = MQTTBadParameter; + } + else if( pIncomingPacket->pRemainingData == NULL ) + { + LogError( ( "Remaining data of incoming packet is NULL." ) ); + status = MQTTBadParameter; + } + else + { + /* Make sure response packet is a valid ack. */ + switch( pIncomingPacket->type ) + { + case MQTT_PACKET_TYPE_CONNACK: + status = deserializeConnack( pIncomingPacket, pSessionPresent ); + break; + + case MQTT_PACKET_TYPE_SUBACK: + status = deserializeSuback( pIncomingPacket, pPacketId ); + break; + + case MQTT_PACKET_TYPE_PINGRESP: + status = deserializePingresp( pIncomingPacket ); + break; + + case MQTT_PACKET_TYPE_UNSUBACK: + case MQTT_PACKET_TYPE_PUBACK: + case MQTT_PACKET_TYPE_PUBREC: + case MQTT_PACKET_TYPE_PUBREL: + case MQTT_PACKET_TYPE_PUBCOMP: + status = deserializeSimpleAck( pIncomingPacket, pPacketId ); + break; + + /* Any other packet type is invalid. */ + default: + LogError( ( "IotMqtt_DeserializeResponse() called with unknown packet type:(%02x).", pIncomingPacket->type ) ); + status = MQTTBadResponse; + break; + } + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( MQTTTransportRecvFunc_t readFunc, + NetworkContext_t networkContext, + MQTTPacketInfo_t * pIncomingPacket ) +{ + MQTTStatus_t status = MQTTSuccess; + int32_t bytesReceived = 0; + + if( pIncomingPacket == NULL ) + { + LogError( ( "Invalid parameter: pIncomingPacket is NULL." ) ); + status = MQTTBadParameter; + } + else + { + /* Read a single byte. */ + bytesReceived = readFunc( networkContext, &( pIncomingPacket->type ), 1U ); + } + + if( bytesReceived == 1 ) + { + /* Check validity. */ + if( incomingPacketValid( pIncomingPacket->type ) == true ) + { + pIncomingPacket->remainingLength = getRemainingLength( readFunc, networkContext ); + + if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID ) + { + status = MQTTBadResponse; + } + } + else + { + LogError( ( "Incoming packet invalid: Packet type=%u", + pIncomingPacket->type ) ); + status = MQTTBadResponse; + } + } + else if( ( status != MQTTBadParameter ) && ( bytesReceived == 0 ) ) + { + status = MQTTNoDataAvailable; + } + + /* If the input packet was valid, then any other number of bytes received is + * a failure. */ + else if( status != MQTTBadParameter ) + { + status = MQTTRecvFailed; + } + else + { + /* Empty else MISRA 15.7 */ + } + + return status; +} + +/*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c new file mode 100644 index 000000000..8afd93050 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c @@ -0,0 +1,1100 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <assert.h> +#include <string.h> +#include "mqtt_state.h" + +/*-----------------------------------------------------------*/ + +/** + * @brief Create a 16-bit bitmap with bit set at specified position. + * + * @param[in] position The position at which the bit need to be set. + */ +#define UINT16_BITMAP_BIT_SET_AT( position ) ( ( uint16_t ) 0x01U << ( ( uint16_t ) position ) ) + +/** + * @brief Set a bit in an 16-bit unsigned integer. + * + * @param[in] x The 16-bit unsigned integer to set a bit. + * @param[in] position The position at which the bit need to be set. + */ +#define UINT16_SET_BIT( x, position ) ( ( x ) = ( uint16_t ) ( ( x ) | ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) ) + +/** + * @brief Macro for checking if a bit is set in a 16-bit unsigned integer. + * + * @param[in] x The unsigned 16-bit integer to check. + * @param[in] position Which bit to check. + */ +#define UINT16_CHECK_BIT( x, position ) ( ( ( x ) & ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) == ( UINT16_BITMAP_BIT_SET_AT( position ) ) ) + +/*-----------------------------------------------------------*/ + +/** + * @brief Test if a transition to new state is possible, when dealing with PUBLISHes. + * + * @param[in] currentState The current state. + * @param[in] newState State to transition to. + * @param[in] opType Reserve, Send, or Receive. + * @param[in] qos 0, 1, or 2. + * + * @note This function does not validate the current state, or the new state + * based on either the operation type or QoS. It assumes the new state is valid + * given the opType and QoS, which will be the case if calculated by + * MQTT_CalculateStatePublish(). + * + * @return `true` if transition is possible, else `false` + */ +static bool validateTransitionPublish( MQTTPublishState_t currentState, + MQTTPublishState_t newState, + MQTTStateOperation_t opType, + MQTTQoS_t qos ); + +/** + * @brief Test if a transition to a new state is possible, when dealing with acks. + * + * @param[in] currentState The current state. + * @param[in] newState State to transition to. + * + * @return `true` if transition is possible, else `false`. + */ +static bool validateTransitionAck( MQTTPublishState_t currentState, + MQTTPublishState_t newState ); + +/** + * @brief Test if the publish corresponding to an ack is outgoing or incoming. + * + * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. + * @param[in] opType Send, or Receive. + * + * @return `true` if corresponds to outgoing publish, else `false`. + */ +static bool isPublishOutgoing( MQTTPubAckType_t packetType, + MQTTStateOperation_t opType ); + +/** + * @brief Find a packet ID in the state record. + * + * @param[in] records State record array. + * @param[in] recordCount Length of record array. + * @param[in] packetId packet ID to search for. + * @param[out] pQos QoS retrieved from record. + * @param[out] pCurrentState state retrieved from record. + * + * @return index of the packet id in the record if it exists, else the record length. + */ +static size_t findInRecord( const MQTTPubAckInfo_t * records, + size_t recordCount, + uint16_t packetId, + MQTTQoS_t * pQos, + MQTTPublishState_t * pCurrentState ); + +/** + * @brief Compact records. + * + * Records are arranged in the relative order to maintain message ordering. + * This will lead to fragmentation and this function will help in defragmenting + * the records array. + * + * @param[in] records State record array. + * @param[in] recordCount Length of record array. + */ +static void compactRecords( MQTTPubAckInfo_t * records, + size_t recordCount ); + +/** + * @brief Store a new entry in the state record. + * + * @param[in] records State record array. + * @param[in] recordCount Length of record array. + * @param[in] packetId, packet ID of new entry. + * @param[in] qos QoS of new entry. + * @param[in] publishState state of new entry. + * + * @return MQTTSuccess, MQTTNoMemory, MQTTStateCollision. + */ +static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records, + size_t recordCount, + uint16_t packetId, + MQTTQoS_t qos, + MQTTPublishState_t publishState ); + +/** + * @brief Update and possibly delete an entry in the state record. + * + * @param[in] records State record array. + * @param[in] recordIndex index of record to update. + * @param[in] newState New state to update. + * @param[in] shouldDelete Whether an existing entry should be deleted. + */ +static void updateRecord( MQTTPubAckInfo_t * records, + size_t recordIndex, + MQTTPublishState_t newState, + bool shouldDelete ); + +/** + * @brief Get the packet ID and index of an outgoing publish in specified + * states. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] searchStates The states to search for in 2-byte bit map. + * @param[in,out] pCursor Index at which to start searching. + */ +static uint16_t stateSelect( const MQTTContext_t * pMqttContext, + uint16_t searchStates, + MQTTStateCursor_t * pCursor ); + +/** + * @brief Update the state records for an ACK after state transition + * validations. + * + * @param[in] records State records pointer. + * @param[in] recordIndex Index at which the record is stored. + * @param[in] packetId Packet id of the packet. + * @param[in] currentState Current state of the publish record. + * @param[in] newState New state of the publish. + * + * @return #MQTTIllegalState, or #MQTTSuccess. + */ +static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records, + size_t recordIndex, + uint16_t packetId, + MQTTPublishState_t currentState, + MQTTPublishState_t newState ); + +/** + * @brief Update the state record for a PUBLISH packet after validating + * the state transitions. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] recordIndex Index in state records at which publish record exists. + * @param[in] packetId ID of the PUBLISH packet. + * @param[in] opType Send or Receive. + * @param[in] qos 0, 1, or 2. + * @param[in] currentState Current state of the publish record. + * @param[in] newState New state of the publish record. + * + * @return #MQTTIllegalState, #MQTTStateCollision or #MQTTSuccess. + */ +static MQTTStatus_t updateStatePublish( MQTTContext_t * pMqttContext, + size_t recordIndex, + uint16_t packetId, + MQTTStateOperation_t opType, + MQTTQoS_t qos, + MQTTPublishState_t currentState, + MQTTPublishState_t newState ); + +/*-----------------------------------------------------------*/ + +static bool validateTransitionPublish( MQTTPublishState_t currentState, + MQTTPublishState_t newState, + MQTTStateOperation_t opType, + MQTTQoS_t qos ) +{ + bool isValid = false; + + switch( currentState ) + { + case MQTTStateNull: + + /* Transitions from null occur when storing a new entry into the record. */ + if( opType == MQTT_RECEIVE ) + { + isValid = ( ( newState == MQTTPubAckSend ) || ( newState == MQTTPubRecSend ) ) ? true : false; + } + + break; + + case MQTTPublishSend: + + /* Outgoing publish. All such publishes start in this state due to + * the reserve operation. */ + switch( qos ) + { + case MQTTQoS1: + isValid = ( newState == MQTTPubAckPending ) ? true : false; + break; + + case MQTTQoS2: + isValid = ( newState == MQTTPubRecPending ) ? true : false; + break; + + default: + /* QoS 0 is checked before calling this function. */ + break; + } + + break; + + /* Below cases are for validating the resends of publish when a session is + * reestablished. */ + case MQTTPubAckPending: + + /* When a session is reestablished, outgoing QoS1 publishes in state + * #MQTTPubAckPending can be resent. The state remains the same. */ + isValid = ( newState == MQTTPubAckPending ) ? true : false; + + break; + + case MQTTPubRecPending: + + /* When a session is reestablished, outgoing QoS2 publishes in state + * #MQTTPubRecPending can be resent. The state remains the same. */ + isValid = ( newState == MQTTPubRecPending ) ? true : false; + + break; + + default: + /* For a PUBLISH, we should not start from any other state. */ + break; + } + + return isValid; +} + +/*-----------------------------------------------------------*/ + +static bool validateTransitionAck( MQTTPublishState_t currentState, + MQTTPublishState_t newState ) +{ + bool isValid = false; + + switch( currentState ) + { + case MQTTPubAckSend: + /* Incoming publish, QoS 1. */ + case MQTTPubAckPending: + /* Outgoing publish, QoS 1. */ + isValid = ( newState == MQTTPublishDone ) ? true : false; + break; + + case MQTTPubRecSend: + /* Incoming publish, QoS 2. */ + isValid = ( newState == MQTTPubRelPending ) ? true : false; + break; + + case MQTTPubRelPending: + + /* Incoming publish, QoS 2. + * There are 2 valid transitions possible. + * 1. MQTTPubRelPending -> MQTTPubCompSend : A PUBREL ack is received + * when publish record state is MQTTPubRelPending. This is the + * normal state transition without any connection interuptions. + * 2. MQTTPubRelPending -> MQTTPubRelPending : Receiving a duplicate + * QoS2 publish can result in a transition to the same state. + * This can happen in the below state transition. + * 1. Incoming publish received. + * 2. PUBREC ack sent and state is now MQTTPubRelPending. + * 3. TCP connection failure and broker didn't receive the PUBREC. + * 4. Reestablished MQTT session. + * 5. MQTT broker resent the un-acked publish. + * 6. Publish is received when publish record state is in + * MQTTPubRelPending. + * 7. Sending out a PUBREC will result in this transition + * to the same state. */ + isValid = ( ( newState == MQTTPubCompSend ) || + ( newState == MQTTPubRelPending ) ) ? true : false; + break; + + case MQTTPubCompSend: + + /* Incoming publish, QoS 2. + * There are 2 valid transitions possible. + * 1. MQTTPubCompSend -> MQTTPublishDone : A PUBCOMP ack is sent + * after receiving a PUBREL from broker. This is the + * normal state transition without any connection interuptions. + * 2. MQTTPubCompSend -> MQTTPubCompSend : Receiving a duplicate PUBREL + * can result in a transition to the same state. + * This can happen in the below state transition. + * 1. A TCP connection failure happened before sending a PUBCOMP + * for an incoming PUBREL. + * 2. Reestablished an MQTT session. + * 3. MQTT broker resent the un-acked PUBREL. + * 4. Receiving the PUBREL again will result in this transition + * to the same state. */ + isValid = ( ( newState == MQTTPublishDone ) || + ( newState == MQTTPubCompSend ) ) ? true : false; + break; + + case MQTTPubRecPending: + /* Outgoing publish, Qos 2. */ + isValid = ( newState == MQTTPubRelSend ) ? true : false; + break; + + case MQTTPubRelSend: + /* Outgoing publish, Qos 2. */ + isValid = ( newState == MQTTPubCompPending ) ? true : false; + break; + + case MQTTPubCompPending: + + /* Outgoing publish, Qos 2. + * There are 2 valid transitions possible. + * 1. MQTTPubCompPending -> MQTTPublishDone : A PUBCOMP is received. + * This marks the complete state transistion for the publish packet. + * This is the normal state transition without any connection + * interuptions. + * 2. MQTTPubCompPending -> MQTTPubCompPending : Resending a PUBREL for + * packets in state #MQTTPubCompPending can result in this + * transition to the same state. + * This can happen in the below state transition. + * 1. A TCP connection failure happened before receiving a PUBCOMP + * for an outgoing PUBREL. + * 2. An MQTT session is reestablished. + * 3. Resending the un-acked PUBREL results in this transition + * to the same state. */ + isValid = ( ( newState == MQTTPublishDone ) || + ( newState == MQTTPubCompPending ) ) ? true : false; + break; + + case MQTTPublishDone: + /* Done state should transition to invalid since it will be removed from the record. */ + case MQTTPublishSend: + /* If an ack was sent/received we shouldn't have been in this state. */ + case MQTTStateNull: + /* If an ack was sent/received the record should exist. */ + default: + /* Invalid. */ + break; + } + + return isValid; +} + +/*-----------------------------------------------------------*/ + +static bool isPublishOutgoing( MQTTPubAckType_t packetType, + MQTTStateOperation_t opType ) +{ + bool isOutgoing = false; + + switch( packetType ) + { + case MQTTPuback: + case MQTTPubrec: + case MQTTPubcomp: + isOutgoing = ( opType == MQTT_RECEIVE ) ? true : false; + break; + + case MQTTPubrel: + isOutgoing = ( opType == MQTT_SEND ) ? true : false; + break; + + default: + /* No other ack type. */ + break; + } + + return isOutgoing; +} + +/*-----------------------------------------------------------*/ + +static size_t findInRecord( const MQTTPubAckInfo_t * records, + size_t recordCount, + uint16_t packetId, + MQTTQoS_t * pQos, + MQTTPublishState_t * pCurrentState ) +{ + size_t index = 0; + + *pCurrentState = MQTTStateNull; + + if( packetId == MQTT_PACKET_ID_INVALID ) + { + index = recordCount; + } + else + { + for( index = 0; index < recordCount; index++ ) + { + if( records[ index ].packetId == packetId ) + { + *pQos = records[ index ].qos; + *pCurrentState = records[ index ].publishState; + break; + } + } + } + + return index; +} + +/*-----------------------------------------------------------*/ + +static void compactRecords( MQTTPubAckInfo_t * records, + size_t recordCount ) +{ + size_t index = 0; + size_t emptyIndex = MQTT_STATE_ARRAY_MAX_COUNT; + + assert( records != NULL ); + + /* Find the empty spots and fill those with non empty values. */ + for( ; index < recordCount; index++ ) + { + /* Find the first empty spot. */ + if( records[ index ].packetId == MQTT_PACKET_ID_INVALID ) + { + if( emptyIndex == MQTT_STATE_ARRAY_MAX_COUNT ) + { + emptyIndex = index; + } + } + else + { + if( emptyIndex != MQTT_STATE_ARRAY_MAX_COUNT ) + { + /* Copy over the contents at non empty index to empty index. */ + records[ emptyIndex ].packetId = records[ index ].packetId; + records[ emptyIndex ].qos = records[ index ].qos; + records[ emptyIndex ].publishState = records[ index ].publishState; + + /* Mark the record at current non empty index as invalid. */ + records[ index ].packetId = MQTT_PACKET_ID_INVALID; + + /* Advance the emptyIndex. */ + emptyIndex++; + } + } + } +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records, + size_t recordCount, + uint16_t packetId, + MQTTQoS_t qos, + MQTTPublishState_t publishState ) +{ + MQTTStatus_t status = MQTTNoMemory; + int32_t index = 0; + size_t availableIndex = recordCount; + bool validEntryFound = false; + + assert( packetId != MQTT_PACKET_ID_INVALID ); + assert( qos != MQTTQoS0 ); + + /* Check if we have to compact the records. This is known by checking if + * the last spot in the array is filled. */ + if( records[ recordCount - 1U ].packetId != MQTT_PACKET_ID_INVALID ) + { + compactRecords( records, recordCount ); + } + + /* Start from end so first available index will be populated. + * Available index is always found after the last element in the records. + * This is to make sure the relative order of the records in order to meet + * the message ordering requirement of MQTT spec 3.1.1. */ + for( index = ( ( int32_t ) recordCount - 1 ); index >= 0; index-- ) + { + /* Available index is only found after packet at the highest index. */ + if( records[ index ].packetId == MQTT_PACKET_ID_INVALID ) + { + if( validEntryFound == false ) + { + availableIndex = ( size_t ) index; + } + } + else + { + /* A non-empty spot found in the records. */ + validEntryFound = true; + + if( records[ index ].packetId == packetId ) + { + /* Collision. */ + LogError( ( "Collision when adding PacketID=%u at index=%u", + packetId, + index ) ); + + status = MQTTStateCollision; + availableIndex = recordCount; + break; + } + } + } + + if( availableIndex < recordCount ) + { + records[ availableIndex ].packetId = packetId; + records[ availableIndex ].qos = qos; + records[ availableIndex ].publishState = publishState; + status = MQTTSuccess; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static void updateRecord( MQTTPubAckInfo_t * records, + size_t recordIndex, + MQTTPublishState_t newState, + bool shouldDelete ) +{ + assert( records != NULL ); + + if( shouldDelete == true ) + { + /* Mark the record as invalid. */ + records[ recordIndex ].packetId = MQTT_PACKET_ID_INVALID; + } + else + { + records[ recordIndex ].publishState = newState; + } +} + +/*-----------------------------------------------------------*/ + +static uint16_t stateSelect( const MQTTContext_t * pMqttContext, + uint16_t searchStates, + MQTTStateCursor_t * pCursor ) +{ + uint16_t packetId = MQTT_PACKET_ID_INVALID; + uint16_t outgoingStates = 0U; + const MQTTPubAckInfo_t * records = NULL; + bool stateCheck = false; + + assert( pMqttContext != NULL ); + assert( searchStates != 0U ); + assert( pCursor != NULL ); + + /* Create a bit map with all the outgoing publish states. */ + UINT16_SET_BIT( outgoingStates, MQTTPublishSend ); + UINT16_SET_BIT( outgoingStates, MQTTPubAckPending ); + UINT16_SET_BIT( outgoingStates, MQTTPubRecPending ); + UINT16_SET_BIT( outgoingStates, MQTTPubRelSend ); + UINT16_SET_BIT( outgoingStates, MQTTPubCompPending ); + + /* Only outgoing publish records need to be searched. */ + assert( ( outgoingStates & searchStates ) > 0U ); + assert( ( ~outgoingStates & searchStates ) == 0 ); + + records = pMqttContext->outgoingPublishRecords; + + while( *pCursor < MQTT_STATE_ARRAY_MAX_COUNT ) + { + /* Check if any of the search states are present. */ + stateCheck = UINT16_CHECK_BIT( searchStates, records[ *pCursor ].publishState ) ? true : false; + + if( stateCheck == true ) + { + packetId = records[ *pCursor ].packetId; + ( *pCursor )++; + break; + } + + ( *pCursor )++; + } + + return packetId; +} + +/*-----------------------------------------------------------*/ + +MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType, + MQTTStateOperation_t opType, + MQTTQoS_t qos ) +{ + MQTTPublishState_t calculatedState = MQTTStateNull; + /* There are more QoS2 cases than QoS1, so initialize to that. */ + bool qosValid = ( qos == MQTTQoS2 ) ? true : false; + + switch( packetType ) + { + case MQTTPuback: + qosValid = ( qos == MQTTQoS1 ) ? true : false; + calculatedState = MQTTPublishDone; + break; + + case MQTTPubrec: + + /* Incoming publish: send PUBREC, PUBREL pending. + * Outgoing publish: receive PUBREC, send PUBREL. */ + calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRelPending : MQTTPubRelSend; + break; + + case MQTTPubrel: + + /* Incoming publish: receive PUBREL, send PUBCOMP. + * Outgoing publish: send PUBREL, PUBCOMP pending. */ + calculatedState = ( opType == MQTT_SEND ) ? MQTTPubCompPending : MQTTPubCompSend; + break; + + case MQTTPubcomp: + calculatedState = MQTTPublishDone; + break; + + default: + /* No other ack type. */ + break; + } + + /* Sanity check, make sure ack and QoS agree. */ + if( qosValid == false ) + { + calculatedState = MQTTStateNull; + } + + return calculatedState; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t updateStateAck( MQTTPubAckInfo_t * records, + size_t recordIndex, + uint16_t packetId, + MQTTPublishState_t currentState, + MQTTPublishState_t newState ) +{ + MQTTStatus_t status = MQTTIllegalState; + bool shouldDeleteRecord = false; + bool isTransitionValid = false; + + assert( records != NULL ); + + /* Record to be deleted if the state transition is completed or if a PUBREC + * is received for an outgoing QoS2 publish. When a PUBREC is received, + * record is deleted and added back to the end of the records to maintain + * ordering for PUBRELs. */ + shouldDeleteRecord = ( ( newState == MQTTPublishDone ) || ( newState == MQTTPubRelSend ) ) ? true : false; + isTransitionValid = validateTransitionAck( currentState, newState ); + + if( isTransitionValid == true ) + { + status = MQTTSuccess; + + /* Update record for acks. When sending or receiving acks for packets that + * are resent during a session reestablishment, the new state and + * current state can be the same. No update of record required in that case. */ + if( currentState != newState ) + { + updateRecord( records, + recordIndex, + newState, + shouldDeleteRecord ); + + /* For QoS2 messages, in order to preserve the message ordering, when + * a PUBREC is received for an outgoing publish, the record should be + * moved to the last. This move will help preserve the order in which + * a PUBREL needs to be resent in case of a session reestablishment. */ + if( newState == MQTTPubRelSend ) + { + status = addRecord( records, + MQTT_STATE_ARRAY_MAX_COUNT, + packetId, + MQTTQoS2, + MQTTPubRelSend ); + } + } + } + else + { + /* Invalid state transition. */ + LogError( ( "Invalid transition from state %s to state %s.", + MQTT_State_strerror( currentState ), + MQTT_State_strerror( newState ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t updateStatePublish( MQTTContext_t * pMqttContext, + size_t recordIndex, + uint16_t packetId, + MQTTStateOperation_t opType, + MQTTQoS_t qos, + MQTTPublishState_t currentState, + MQTTPublishState_t newState ) +{ + MQTTStatus_t status = MQTTSuccess; + bool isTransitionValid = false; + + assert( pMqttContext != NULL ); + assert( packetId != MQTT_PACKET_ID_INVALID ); + assert( qos != MQTTQoS0 ); + + /* This will always succeed for an incoming publish. This is due to the fact + * that the passed in currentState must be MQTTStateNull, since + * #MQTT_UpdateStatePublish does not perform a lookup for receives. */ + isTransitionValid = validateTransitionPublish( currentState, newState, opType, qos ); + + if( isTransitionValid == true ) + { + /* addRecord will check for collisions. */ + if( opType == MQTT_RECEIVE ) + { + status = addRecord( pMqttContext->incomingPublishRecords, + MQTT_STATE_ARRAY_MAX_COUNT, + packetId, + qos, + newState ); + } + /* Send operation. */ + else + { + /* Skip updating record when publish is resend and no state + * update is required. */ + if( currentState != newState ) + { + updateRecord( pMqttContext->outgoingPublishRecords, recordIndex, newState, false ); + } + } + } + else + { + status = MQTTIllegalState; + LogError( ( "Invalid transition from state %s to state %s.", + MQTT_State_strerror( currentState ), + MQTT_State_strerror( newState ) ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTQoS_t qos ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( qos == MQTTQoS0 ) + { + status = MQTTSuccess; + } + else if( ( packetId == MQTT_PACKET_ID_INVALID ) || ( pMqttContext == NULL ) ) + { + status = MQTTBadParameter; + } + else + { + /* Collisions are detected when adding the record. */ + status = addRecord( pMqttContext->outgoingPublishRecords, + MQTT_STATE_ARRAY_MAX_COUNT, + packetId, + qos, + MQTTPublishSend ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, + MQTTQoS_t qos ) +{ + MQTTPublishState_t calculatedState = MQTTStateNull; + + switch( qos ) + { + case MQTTQoS0: + calculatedState = MQTTPublishDone; + break; + + case MQTTQoS1: + calculatedState = ( opType == MQTT_SEND ) ? MQTTPubAckPending : MQTTPubAckSend; + break; + + case MQTTQoS2: + calculatedState = ( opType == MQTT_SEND ) ? MQTTPubRecPending : MQTTPubRecSend; + break; + + default: + /* No other QoS values. */ + break; + } + + return calculatedState; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTStateOperation_t opType, + MQTTQoS_t qos, + MQTTPublishState_t * pNewState ) +{ + MQTTPublishState_t newState = MQTTStateNull; + MQTTPublishState_t currentState = MQTTStateNull; + MQTTStatus_t mqttStatus = MQTTSuccess; + size_t recordIndex = MQTT_STATE_ARRAY_MAX_COUNT; + MQTTQoS_t foundQoS = MQTTQoS0; + + if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p", + pMqttContext, + pNewState ) ); + + mqttStatus = MQTTBadParameter; + } + else if( qos == MQTTQoS0 ) + { + /* QoS 0 publish. Do nothing. */ + *pNewState = MQTTPublishDone; + } + else if( packetId == MQTT_PACKET_ID_INVALID ) + { + /* Publishes > QoS 0 need a valid packet ID. */ + mqttStatus = MQTTBadParameter; + } + else if( opType == MQTT_SEND ) + { + /* Search record for entry so we can check QoS. */ + recordIndex = findInRecord( pMqttContext->outgoingPublishRecords, + MQTT_STATE_ARRAY_MAX_COUNT, + packetId, + &foundQoS, + ¤tState ); + + if( ( recordIndex == MQTT_STATE_ARRAY_MAX_COUNT ) || ( foundQoS != qos ) ) + { + /* Entry should match with supplied QoS. */ + mqttStatus = MQTTBadParameter; + } + } + else + { + /* QoS 1 or 2 receive. Nothing to be done. */ + } + + if( ( qos != MQTTQoS0 ) && ( mqttStatus == MQTTSuccess ) ) + { + newState = MQTT_CalculateStatePublish( opType, qos ); + /* Validate state transition and update state records. */ + mqttStatus = updateStatePublish( pMqttContext, + recordIndex, + packetId, + opType, + qos, + currentState, + newState ); + + /* Update output parameter on success. */ + if( mqttStatus == MQTTSuccess ) + { + *pNewState = newState; + } + } + + return mqttStatus; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, + uint16_t packetId, + MQTTPubAckType_t packetType, + MQTTStateOperation_t opType, + MQTTPublishState_t * pNewState ) +{ + MQTTPublishState_t newState = MQTTStateNull; + MQTTPublishState_t currentState = MQTTStateNull; + bool isOutgoingPublish = isPublishOutgoing( packetType, opType ); + MQTTQoS_t qos = MQTTQoS0; + size_t recordIndex = MQTT_STATE_ARRAY_MAX_COUNT; + MQTTPubAckInfo_t * records = NULL; + MQTTStatus_t status = MQTTBadParameter; + + if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) + { + LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.", + pMqttContext, + pNewState ) ); + } + else + { + if( isOutgoingPublish == true ) + { + records = pMqttContext->outgoingPublishRecords; + } + else + { + records = pMqttContext->incomingPublishRecords; + } + + recordIndex = findInRecord( records, + MQTT_STATE_ARRAY_MAX_COUNT, + packetId, + &qos, + ¤tState ); + } + + if( recordIndex < MQTT_STATE_ARRAY_MAX_COUNT ) + { + newState = MQTT_CalculateStateAck( packetType, opType, qos ); + + /* Validate state transition and update state record. */ + status = updateStateAck( records, recordIndex, packetId, currentState, newState ); + + /* Update the output parameter. */ + if( status == MQTTSuccess ) + { + *pNewState = newState; + } + } + else + { + LogError( ( "No matching record found for publish %u.", packetId ) ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + +uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, + MQTTStateCursor_t * pCursor, + MQTTPublishState_t * pState ) +{ + uint16_t packetId = MQTT_PACKET_ID_INVALID; + uint16_t searchStates = 0U; + + /* Validate arguments. */ + if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) ) + { + LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p" + " pState=%p.", + pMqttContext, + pCursor, + pState ) ); + } + else + { + /* PUBREL for packets in state #MQTTPubCompPending and #MQTTPubRelSend + * would need to be resent when a session is reestablished.*/ + UINT16_SET_BIT( searchStates, MQTTPubCompPending ); + UINT16_SET_BIT( searchStates, MQTTPubRelSend ); + packetId = stateSelect( pMqttContext, searchStates, pCursor ); + + /* The state needs to be in #MQTTPubRelSend for sending PUBREL. */ + if( packetId != MQTT_PACKET_ID_INVALID ) + { + *pState = MQTTPubRelSend; + } + } + + return packetId; +} + +/*-----------------------------------------------------------*/ + +uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, + MQTTStateCursor_t * pCursor ) +{ + uint16_t packetId = MQTT_PACKET_ID_INVALID; + uint16_t searchStates = 0U; + + /* Validate arguments. */ + if( ( pMqttContext == NULL ) || ( pCursor == NULL ) ) + { + LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p", + pMqttContext, + pCursor ) ); + } + else + { + /* Packets in state #MQTTPublishSend, #MQTTPubAckPending and + * #MQTTPubRecPending would need to be resent when a session is + * reestablished. */ + UINT16_SET_BIT( searchStates, MQTTPublishSend ); + UINT16_SET_BIT( searchStates, MQTTPubAckPending ); + UINT16_SET_BIT( searchStates, MQTTPubRecPending ); + + packetId = stateSelect( pMqttContext, searchStates, pCursor ); + } + + return packetId; +} + +/*-----------------------------------------------------------*/ + +const char * MQTT_State_strerror( MQTTPublishState_t state ) +{ + const char * str = NULL; + + switch( state ) + { + case MQTTStateNull: + str = "MQTTStateNull"; + break; + + case MQTTPublishSend: + str = "MQTTPublishSend"; + break; + + case MQTTPubAckSend: + str = "MQTTPubAckSend"; + break; + + case MQTTPubRecSend: + str = "MQTTPubRecSend"; + break; + + case MQTTPubRelSend: + str = "MQTTPubRelSend"; + break; + + case MQTTPubCompSend: + str = "MQTTPubCompSend"; + break; + + case MQTTPubAckPending: + str = "MQTTPubAckPending"; + break; + + case MQTTPubRecPending: + str = "MQTTPubRecPending"; + break; + + case MQTTPubRelPending: + str = "MQTTPubRelPending"; + break; + + case MQTTPubCompPending: + str = "MQTTPubCompPending"; + break; + + case MQTTPublishDone: + str = "MQTTPublishDone"; + break; + + default: + /* Invalid state received. */ + str = "Invalid MQTT State"; + break; + } + + return str; +} + +/*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h new file mode 100644 index 000000000..c62489573 --- /dev/null +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h @@ -0,0 +1,23 @@ +#ifndef MQTT_INTERNAL_H_ +#define MQTT_INTERNAL_H_ + +/* Include config file before other headers. */ +#include "mqtt_config.h" + +#ifndef LogError + #define LogError( message ) +#endif + +#ifndef LogWarn + #define LogWarn( message ) +#endif + +#ifndef LogInfo + #define LogInfo( message ) +#endif + +#ifndef LogDebug + #define LogDebug( message ) +#endif + +#endif /* ifndef MQTT_INTERNAL_H_ */ |