diff options
author | Muneeb Ahmed <54290492+muneebahmed10@users.noreply.github.com> | 2020-08-19 16:30:59 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-19 16:30:59 -0700 |
commit | 262cdd9ae5be111e1ba7b1640ee88bbda23f9007 (patch) | |
tree | 6852e9ef3480e4eea0314be3b3e03eaeb7f217a6 | |
parent | 6dda6d84f5d98a3435dc4eee830f1bd61de2e372 (diff) | |
download | freertos-git-262cdd9ae5be111e1ba7b1640ee88bbda23f9007.tar.gz |
Sync MQTT files with 71df018b of CSDK (#205)
* Sync MQTT files with CSDK commit 71df018badd91f92b51f8923b820af9cc3567a23
* Update demos to use new callback signature
9 files changed, 1888 insertions, 268 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_mutual_auth/DemoTasks/MutualAuthMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_mutual_auth/DemoTasks/MutualAuthMQTTExample.c index d5ac66a1c..7934171d3 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_mutual_auth/DemoTasks/MutualAuthMQTTExample.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_mutual_auth/DemoTasks/MutualAuthMQTTExample.c @@ -239,14 +239,11 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); * * @param[in] pxMQTTContext MQTT context pointer. * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. - * @param[in] usPacketIdentifier Packet identifier of the incoming packet. - * @param[in] pxPublishInfo Deserialized publish info for the incoming packet if - * there is an incoming PUBLISH; NULL otherwise + * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. */ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, - uint16_t usPacketIdentifier, - MQTTPublishInfo_t * pxPublishInfo ); + MQTTDeserializedInfo_t * pxDeserializedInfo ); /** * @brief TLS connect to endpoint democonfigMQTT_BROKER_ENDPOINT. @@ -673,19 +670,18 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, - uint16_t usPacketIdentifier, - MQTTPublishInfo_t * pxPublishInfo ) + MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { - prvMQTTProcessIncomingPublish( pxPublishInfo ); + prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { - prvMQTTProcessResponse( pxPacketInfo, usPacketIdentifier ); + prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } } diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c index 5709a323b..da314ac53 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c @@ -228,14 +228,11 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); * * @param pxMQTTContext MQTT context pointer. * @param pxPacketInfo Packet Info pointer for the incoming packet. - * @param usPacketIdentifier Packet identifier of the incoming packet. - * @param pxPublishInfo Deserialized publish info pointer for the incoming - * packet. + * @param pxDeserializedInfo Deserialized information from the incoming packet. */ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, - uint16_t usPacketIdentifier, - MQTTPublishInfo_t * pxPublishInfo ); + MQTTDeserializedInfo_t * pxDeserializedInfo ); /*-----------------------------------------------------------*/ @@ -600,19 +597,18 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, - uint16_t usPacketIdentifier, - MQTTPublishInfo_t * pxPublishInfo ) + MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { - prvMQTTProcessIncomingPublish( pxPublishInfo ); + prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { - prvMQTTProcessResponse( pxPacketInfo, usPacketIdentifier ); + prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } } diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h index 2e5c15c0d..b2be8ce4c 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h @@ -19,6 +19,10 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt.h + * @brief User-facing functions of the MQTT 3.1.1 library. + */ #ifndef MQTT_H #define MQTT_H @@ -35,11 +39,10 @@ */ #define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U ) +/* Structures defined in this file. */ struct MQTTPubAckInfo; -typedef struct MQTTPubAckInfo MQTTPubAckInfo_t; - struct MQTTContext; -typedef struct MQTTContext MQTTContext_t; +struct MQTTDeserializedInfo; /** * @brief Application provided callback to retrieve the current time in @@ -53,95 +56,214 @@ typedef uint32_t (* MQTTGetCurrentTimeFunc_t )( void ); * @brief Application callback for receiving incoming publishes and incoming * acks. * + * @note This callback will be called only if packets are deserialized with a + * result of #MQTTSuccess or #MQTTServerRefused. The latter can be obtained + * when deserializing a SUBACK, indicating a broker's rejection of a subscribe. + * * @param[in] pContext Initialized MQTT context. * @param[in] pPacketInfo Information on the type of incoming MQTT packet. - * @param[in] packetIdentifier Packet identifier of incoming PUBLISH packet. - * @param[in] pPublishInfo Incoming PUBLISH packet parameters. + * @param[in] pDeserializedInfo Deserialized information from incoming packet. */ -typedef void (* MQTTEventCallback_t )( MQTTContext_t * pContext, - MQTTPacketInfo_t * pPacketInfo, - uint16_t packetIdentifier, - MQTTPublishInfo_t * pPublishInfo ); +typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext, + struct MQTTPacketInfo * pPacketInfo, + struct MQTTDeserializedInfo * pDeserializedInfo ); +/** + * @brief Values indicating if an MQTT connection exists. + */ typedef enum MQTTConnectionStatus { - MQTTNotConnected, - MQTTConnected + MQTTNotConnected, /**< @brief MQTT Connection is inactive. */ + MQTTConnected /**< @brief MQTT Connection is active. */ } MQTTConnectionStatus_t; +/** + * @brief The state of QoS 1 or QoS 2 MQTT publishes, used in the state engine. + */ typedef enum MQTTPublishState { - MQTTStateNull = 0, - MQTTPublishSend, - MQTTPubAckSend, - MQTTPubRecSend, - MQTTPubRelSend, - MQTTPubCompSend, - MQTTPubAckPending, - MQTTPubRelPending, - MQTTPubRecPending, - MQTTPubCompPending, - MQTTPublishDone + MQTTStateNull = 0, /**< @brief An empty state with no corresponding PUBLISH. */ + MQTTPublishSend, /**< @brief The library will send an outgoing PUBLISH packet. */ + MQTTPubAckSend, /**< @brief The library will send a PUBACK for a received PUBLISH. */ + MQTTPubRecSend, /**< @brief The library will send a PUBREC for a received PUBLISH. */ + MQTTPubRelSend, /**< @brief The library will send a PUBREL for a received PUBREC. */ + MQTTPubCompSend, /**< @brief The library will send a PUBCOMP for a received PUBREL. */ + MQTTPubAckPending, /**< @brief The library is awaiting a PUBACK for an outgoing PUBLISH. */ + MQTTPubRecPending, /**< @brief The library is awaiting a PUBREC for an outgoing PUBLISH. */ + MQTTPubRelPending, /**< @brief The library is awaiting a PUBREL for an incoming PUBLISH. */ + MQTTPubCompPending, /**< @brief The library is awaiting a PUBCOMP for an outgoing PUBLISH. */ + MQTTPublishDone /**< @brief The PUBLISH has been completed. */ } MQTTPublishState_t; +/** + * @brief Packet types used in acknowledging QoS 1 or QoS 2 publishes. + */ typedef enum MQTTPubAckType { - MQTTPuback, - MQTTPubrec, - MQTTPubrel, - MQTTPubcomp + MQTTPuback, /**< @brief PUBACKs are sent in response to a QoS 1 PUBLISH. */ + MQTTPubrec, /**< @brief PUBRECs are sent in response to a QoS 2 PUBLISH. */ + MQTTPubrel, /**< @brief PUBRELs are sent in response to a PUBREC. */ + MQTTPubcomp /**< @brief PUBCOMPs are sent in response to a PUBREL. */ } MQTTPubAckType_t; -struct MQTTPubAckInfo +/** + * @brief The status codes in the SUBACK response to a subscription request. + */ +typedef enum MQTTSubAckStatus +{ + MQTTSubAckSuccessQos0 = 0x00, /**< @brief Success with a maximum delivery at QoS 0 . */ + MQTTSubAckSuccessQos1 = 0x01, /**< @brief Success with a maximum delivery at QoS 1. */ + MQTTSubAckSuccessQos2 = 0x02, /**< @brief Success with a maximum delivery at QoS 2. */ + MQTTSubAckFailure = 0x80 /**< @brief Failure. */ +} MQTTSubAckStatus_t; + +/** + * @brief An element of the state engine records for QoS 1 or Qos 2 publishes. + */ +typedef struct MQTTPubAckInfo { - uint16_t packetId; - MQTTQoS_t qos; - MQTTPublishState_t publishState; -}; + uint16_t packetId; /**< @brief The packet ID of the original PUBLISH. */ + MQTTQoS_t qos; /**< @brief The QoS of the original PUBLISH. */ + MQTTPublishState_t publishState; /**< @brief The current state of the publish process. */ +} MQTTPubAckInfo_t; -struct MQTTContext +/** + * @brief A struct representing an MQTT connection. + */ +typedef struct MQTTContext { + /** + * @brief State engine records for outgoing publishes. + */ MQTTPubAckInfo_t outgoingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ]; - size_t outgoingPublishCount; + + /** + * @brief State engine records for incoming publishes. + */ MQTTPubAckInfo_t incomingPublishRecords[ MQTT_STATE_ARRAY_MAX_COUNT ]; - size_t incomingPublishCount; + /** + * @brief The transport interface used by the MQTT connection. + */ TransportInterface_t transportInterface; + + /** + * @brief The buffer used in sending and receiving packets from the network. + */ MQTTFixedBuffer_t networkBuffer; + /** + * @brief The next available ID for outgoing MQTT packets. + */ uint16_t nextPacketId; + + /** + * @brief Whether the context currently has a connection to the broker. + */ MQTTConnectionStatus_t connectStatus; + + /** + * @brief Function used to get millisecond timestamps. + */ MQTTGetCurrentTimeFunc_t getTime; + + /** + * @brief Callback function used to give deserialized MQTT packets to the application. + */ MQTTEventCallback_t appCallback; + + /** + * @brief Timestamp of the last packet sent by the library. + */ uint32_t lastPacketTime; + + /** + * @brief Whether the library sent a packet during a call of #MQTT_ProcessLoop or + * #MQTT_ReceiveLoop. + */ bool controlPacketSent; /* Keep alive members. */ - uint16_t keepAliveIntervalSec; - uint32_t pingReqSendTimeMs; - uint32_t pingRespTimeoutMs; - bool waitingForPingResp; -}; + uint16_t keepAliveIntervalSec; /**< @brief Keep Alive interval. */ + uint32_t pingReqSendTimeMs; /**< @brief Timestamp of the last sent PINGREQ. */ + bool waitingForPingResp; /**< @brief If the library is currently awaiting a PINGRESP. */ +} MQTTContext_t; + +/** + * @brief Struct to hold deserialized packet information for an #MQTTEventCallback_t + * callback. + */ +typedef struct MQTTDeserializedInfo +{ + uint16_t packetIdentifier; /**< @brief Packet ID of deserialized packet. */ + MQTTPublishInfo_t * pPublishInfo; /**< @brief Pointer to deserialized publish info. */ + MQTTStatus_t deserializationResult; /**< @brief Return code of deserialization. */ +} MQTTDeserializedInfo_t; /** * @brief Initialize an MQTT context. * - * This function must be called on an MQTT context before any other function. + * This function must be called on a #MQTTContext_t before any other function. * - * @note The getTime callback function must be defined. If there is no time - * implementation, it is the responsibility of the application to provide a - * dummy function to always return 0, and provide 0 timeouts for functions. This - * will ensure all time based functions will run for a single iteration. + * @note The #MQTTGetCurrentTimeFunc_t callback function must be defined. If + * there is no time implementation, it is the responsibility of the application + * to provide a dummy function to always return 0, and provide 0 timeouts for + * functions. This will ensure all time based functions will run for a single + * iteration. * * @brief param[in] pContext The context to initialize. - * @brief param[in] pTransportInterface The transport interface to use with the context. - * @brief param[in] getTimeFunction The time utility function to use with the context. - * @brief param[in] userCallback The user callback to use with the context to notify about - * incoming packet events. + * @brief param[in] pTransportInterface The transport interface to use with the + * context. + * @brief param[in] getTimeFunction The time utility function to use with the + * context. + * @brief param[in] userCallback The user callback to use with the context to + * notify about incoming packet events. * @brief param[in] pNetworkBuffer Network buffer provided for the context. * * @return #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Function for obtaining a timestamp. + * uint32_t getTimeStampMs(); + * // Callback function for receiving packets. + * void eventCallback( + * MQTTContext_t * pContext, + * MQTTPacketInfo_t * pPacketInfo, + * MQTTDeserializedInfo_t * pDeserializedInfo + * ); + * // Network send. + * int32_t networkSend( NetworkContext_t * pContext, const void * pBuffer, size_t bytes ); + * // Network receive. + * int32_t networkRecv( NetworkContext_t * pContext, void * pBuffer, size_t bytes ); + * + * MQTTContext_t mqttContext; + * TransportInterface_t transport; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ 1024 ]; + * + * // Clear context. + * memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) ); + * + * // Set transport interface members. + * transport.pNetworkInterface = &someNetworkInterface; + * transport.send = networkSend; + * transport.recv = networkRecv; + * + * // Set buffer members. + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = 1024; + * + * status = MQTT_Init( &mqttContext, &transport, getTimeStampMs, eventCallback, &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // Do something with mqttContext. The transport and fixedBuffer structs were + * // copied into the context, so the original structs do not need to stay in scope. + * } + * @endcode */ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, const TransportInterface_t * pTransportInterface, @@ -157,12 +279,12 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, * * The maximum time this function waits for a CONNACK is decided in one of the * following ways: - * 1. If #timeoutMs is greater than 0: - * #getTime is used to ensure that the function does not wait more than #timeoutMs - * for CONNACK. - * 2. If #timeoutMs is 0: - * The network receive for CONNACK is retried up to the number of times configured - * by #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. + * 1. If @p timeoutMs is greater than 0: + * #MQTTContext_t.getTime is used to ensure that the function does not wait + * more than @p timeoutMs for CONNACK. + * 2. If @p timeoutMs is 0: + * The network receive for CONNACK is retried up to the number of times + * configured by #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT. * * @param[in] pContext Initialized MQTT context. * @param[in] pConnectInfo MQTT CONNECT packet information. @@ -180,7 +302,7 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, * #MQTTSendFailed if transport send failed; * #MQTTRecvFailed if transport receive failed for CONNACK; * #MQTTNoDataAvailable if no data available to receive in transport until - * the #timeoutMs for CONNACK; + * the @p timeoutMs for CONNACK; * #MQTTSuccess otherwise. * * @note This API may spend more time than provided in the timeoutMS parameters in @@ -200,6 +322,52 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, * the API makes one more network receive call in an attempt to receive the remaining * 2 bytes. In the worst case, it can happen that the remaining 2 bytes are never * received and this API will end up spending timeoutMs + transport receive timeout. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTConnectInfo_t connectInfo = { 0 }; + * MQTTPublishInfo_t willInfo = { 0 }; + * bool sessionPresent; + * // This is assumed to have been initialized before calling this function. + * MQTTContext_t * pContext; + * + * // True for creating a new session with broker, false if we want to resume an old one. + * connectInfo.cleanSession = true; + * // Client ID must be unique to broker. This field is required. + * connectInfo.pClientIdentifier = "someClientID"; + * connectInfo.clientIdentifierLength = strlen( connectInfo.pClientIdentifier ); + * + * // The following fields are optional. + * // Value for keep alive. + * connectInfo.keepAliveSeconds = 60; + * // Optional username and password. + * connectInfo.pUserName = "someUserName"; + * connectInfo.userNameLength = strlen( connectInfo.pUserName ); + * connectInfo.pPassword = "somePassword"; + * connectInfo.passwordLength = strlen( connectInfo.pPassword ); + * + * // The last will and testament is optional, it will be published by the broker + * // should this client disconnect without sending a DISCONNECT packet. + * willInfo.qos = MQTTQoS0; + * willInfo.pTopicName = "/lwt/topic/name"; + * willInfo.topicNameLength = strlen( willInfo.pTopicName ); + * willInfo.pPayload = "LWT Message"; + * willInfo.payloadLength = strlen( "LWT Message" ); + * + * // Send the connect packet. Use 100 ms as the timeout to wait for the CONNACK packet. + * status = MQTT_Connect( pContext, &connectInfo, &willInfo, 100, &sessionPresent ); + * + * if( status == MQTTSuccess ) + * { + * // Since we requested a clean session, this must be false + * assert( sessionPresent == false ); + * + * // Do something with the connection. + * } + * @endcode */ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, const MQTTConnectInfo_t * pConnectInfo, @@ -214,13 +382,47 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, * @param[in] pContext Initialized MQTT context. * @param[in] pSubscriptionList List of MQTT subscription info. * @param[in] subscriptionCount The number of elements in pSubscriptionList. - * @param[in] packetId packet ID generated by #MQTT_GetPacketId. + * @param[in] packetId Packet ID generated by #MQTT_GetPacketId. * * @return #MQTTNoMemory if the #MQTTContext_t.networkBuffer is too small to * hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t subscriptionList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * uint16_t packetId; + * // This context is assumed to be initialized and connected. + * MQTTContext_t * pContext; + * // This is assumed to be a list of filters we want to subscribe to. + * const char * filters[ NUMBER_OF_SUBSCRIPTIONS ]; + * + * // Set each subscription. + * for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ ) + * { + * subscriptionList[ i ].qos = MQTTQoS0; + * // Each subscription needs a topic filter. + * subscriptionList[ i ].pTopicFilter = filters[ i ]; + * subscriptionList[ i ].topicFilterLength = strlen( filters[ i ] ); + * } + * + * // Obtain a new packet id for the subscription. + * packetId = MQTT_GetPacketId( pContext ); + * + * status = MQTT_Subscribe( pContext, &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, packetId ); + * + * if( status == MQTTSuccess ) + * { + * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the SUBACK. + * // If the broker accepts the subscription we can now receive publishes + * // on the requested topics. + * } + * @endcode */ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, const MQTTSubscribeInfo_t * pSubscriptionList, @@ -238,6 +440,35 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTPublishInfo_t publishInfo; + * uint16_t packetId; + * // This context is assumed to be initialized and connected. + * MQTTContext_t * pContext; + * + * // QoS of publish. + * publishInfo.qos = MQTTQoS1; + * publishInfo.pTopicName = "/some/topic/name"; + * publishInfo.topicNameLength = strlen( publishInfo.pTopicName ); + * publishInfo.pPayload = "Hello World!"; + * publishInfo.payloadLength = strlen( "Hello World!" ); + * + * // Packet ID is needed for QoS > 0. + * packetId = MQTT_GetPacketId( pContext ); + * + * status = MQTT_Publish( pContext, &publishInfo, packetId ); + * + * if( status == MQTTSuccess ) + * { + * // Since the QoS is > 0, we will need to call MQTT_ReceiveLoop() + * // or MQTT_ProcessLoop() to process the publish acknowledgments. + * } + * @endcode */ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, @@ -269,6 +500,39 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ); * #MQTTBadParameter if invalid parameters are passed; * #MQTTSendFailed if transport write failed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t unsubscribeList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * uint16_t packetId; + * // This context is assumed to be initialized and connected. + * MQTTContext_t * pContext; + * // This is assumed to be a list of filters we want to unsubscribe from. + * const char * filters[ NUMBER_OF_SUBSCRIPTIONS ]; + * + * // Set information for each unsubscribe request. + * for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ ) + * { + * unsubscribeList[ i ].pTopicFilter = filters[ i ]; + * unsubscribeList[ i ].topicFilterLength = strlen( filters[ i ] ); + * + * // The QoS field of MQTT_SubscribeInfo_t is unused for unsubscribing. + * } + * + * // Obtain a new packet id for the unsubscribe request. + * packetId = MQTT_GetPacketId( pContext ); + * + * status = MQTT_Subscribe( pContext, &unsubscribeList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, packetId ); + * + * if( status == MQTTSuccess ) + * { + * // We must now call MQTT_ReceiveLoop() or MQTT_ProcessLoop() to receive the UNSUBACK. + * // After this the broker should no longer send publishes for these topics. + * } + * @endcode */ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, const MQTTSubscribeInfo_t * pSubscriptionList, @@ -301,10 +565,35 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ); * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; * #MQTTBadResponse if an invalid packet is received; * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before - * pContext->pingRespTimeoutMs milliseconds; + * #MQTT_PINGRESP_TIMEOUT_MS milliseconds; * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an * invalid transition for the internal state machine; * #MQTTSuccess on success. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * uint32_t timeoutMs = 100; + * // This context is assumed to be initialized and connected. + * MQTTContext_t * pContext; + * + * while( true ) + * { + * status = MQTT_ProcessLoop( pContext, timeoutMs ); + * + * if( status != MQTTSuccess ) + * { + * // Determine the error. It's possible we might need to disconnect + * // the underlying transport connection. + * } + * else + * { + * // Other application functions. + * } + * } + * @endcode */ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext, uint32_t timeoutMs ); @@ -326,6 +615,39 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext, * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an * invalid transition for the internal state machine; * #MQTTSuccess on success. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * uint32_t timeoutMs = 100; + * uint32_t keepAliveMs = 60 * 1000; + * // This context is assumed to be initialized and connected. + * MQTTContext_t * pContext; + * + * while( true ) + * { + * status = MQTT_ReceiveLoop( pContext, timeoutMs ); + * + * if( status != MQTTSuccess ) + * { + * // Determine the error. It's possible we might need to disconnect + * // the underlying transport connection. + * } + * else + * { + * // Since this function does not send pings, the application may need + * // to in order to comply with keep alive. + * if( ( pContext->getTime() - pContext->lastPacketTime ) > keepAliveMs ) + * { + * status = MQTT_Ping( pContext ); + * } + * + * // Other application functions. + * } + * } + * @endcode */ MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext, uint32_t timeoutMs ); @@ -340,6 +662,62 @@ MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext, uint16_t MQTT_GetPacketId( MQTTContext_t * pContext ); /** + * @brief A utility function that determines whether the passed topic filter and + * topic name match according to the MQTT 3.1.1 protocol specification. + * + * @param[in] pTopicName The topic name to check. + * @param[in] topicNameLength Length of the topic name. + * @param[in] pTopicFilter The topic filter to check. + * @param[in] topicFilterLength Length of topic filter. + * @param[out] pIsMatch This is filled with the whether there + * exists a match or not. + * + * @note The API assumes that the passed topic name is valid to meet the + * requirements of the MQTT 3.1.1 specification. Invalid topic names (for example, + * containing wildcard characters) should not be passed to the function. + * Also, the API checks validity of topic filter for wildcard characters ONLY if + * the passed topic name and topic filter do not have an exact string match. + * + * @return Returns one of the following: + * - #MQTTBadParameter, if any of the input parameters is invalid. + * - #MQTTSuccess, if the matching operation was performed. + */ +MQTTStatus_t MQTT_MatchTopic( const char * pTopicName, + const uint16_t topicNameLength, + const char * pTopicFilter, + const uint16_t topicFilterLength, + bool * pIsMatch ); + +/** + * @brief Parses the payload of an MQTT SUBACK packet that contains status codes + * corresponding to topic filter subscription requests from the original + * subscribe packet. + * + * Each return code in the SUBACK packet corresponds to a topic filter in the + * SUBSCRIBE Packet being acknowledged. + * The status codes can be one of the following: + * - 0x00 - Success - Maximum QoS 0 + * - 0x01 - Success - Maximum QoS 1 + * - 0x02 - Success - Maximum QoS 2 + * - 0x80 - Failure + * Refer to #MQTTSubAckStatus_t for the status codes. + * + * @param[in] pSubackPacket The SUBACK packet whose payload is to be parsed. + * @param[out] pPayloadStart This is populated with the starting address + * of the payload (or return codes for topic filters) in the SUBACK packet. + * @param[out] pPayloadSize This is populated with the size of the payload + * in the SUBACK packet. It represents the number of topic filters whose + * SUBACK status is present in the packet. + * + * @return Returns one of the following: + * - #MQTTBadParameter if the input SUBACK packet is invalid. + * - #MQTTSuccess if parsing the payload was successful. + */ +MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket, + uint8_t ** pPayloadStart, + size_t * pPayloadSize ); + +/** * @brief Error code to string conversion for MQTT statuses. * * @param[in] status The status to convert to a string. diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h index 44ee296dd..048457c10 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_lightweight.h @@ -19,12 +19,23 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt_lightweight.h + * @brief User-facing functions for serializing and deserializing MQTT 3.1.1 + * packets. This header should be included for building a lightweight MQTT + * client bypassing the managed CSDK MQTT library API in mqtt.h. + */ #ifndef MQTT_LIGHTWEIGHT_H #define MQTT_LIGHTWEIGHT_H #include <stddef.h> #include <stdint.h> +/** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section. + */ + /* bool is defined in only C99+. */ #if defined( __cplusplus ) || ( defined( __STDC_VERSION__ ) && ( __STDC_VERSION__ >= 199901L ) ) #include <stdbool.h> @@ -33,6 +44,7 @@ #define false ( int8_t ) 0 #define true ( int8_t ) 1 #endif +/** @endcond */ /* Include config file before other headers. */ #include "mqtt_config.h" @@ -55,26 +67,17 @@ #define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xD0U ) /**< @brief PINGRESP (server-to-client). */ #define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xE0U ) /**< @brief DISCONNECT (client-to-server). */ - /** * @brief The size of MQTT PUBACK, PUBREC, PUBREL, and PUBCOMP packets, per MQTT spec. */ #define MQTT_PUBLISH_ACK_PACKET_SIZE ( 4UL ) +/* Structures defined in this file. */ struct MQTTFixedBuffer; -typedef struct MQTTFixedBuffer MQTTFixedBuffer_t; - struct MQTTConnectInfo; -typedef struct MQTTConnectInfo MQTTConnectInfo_t; - struct MQTTSubscribeInfo; -typedef struct MQTTSubscribeInfo MQTTSubscribeInfo_t; - -struct MqttPublishInfo; -typedef struct MqttPublishInfo MQTTPublishInfo_t; - +struct MQTTPublishInfo; struct MQTTPacketInfo; -typedef struct MQTTPacketInfo MQTTPacketInfo_t; /** * @brief Return codes from MQTT functions. @@ -110,16 +113,16 @@ typedef enum MQTTQoS * These buffers are not copied and must remain in scope for the duration of the * MQTT operation. */ -struct MQTTFixedBuffer +typedef struct MQTTFixedBuffer { uint8_t * pBuffer; /**< @brief Pointer to buffer. */ size_t size; /**< @brief Size of buffer. */ -}; +} MQTTFixedBuffer_t; /** * @brief MQTT CONNECT packet parameters. */ -struct MQTTConnectInfo +typedef struct MQTTConnectInfo { /** * @brief Whether to establish a new, clean session or resume a previous session. @@ -160,12 +163,12 @@ struct MQTTConnectInfo * @brief Length of MQTT password. Set to 0 if not used. */ uint16_t passwordLength; -}; +} MQTTConnectInfo_t; /** * @brief MQTT SUBSCRIBE packet parameters. */ -struct MQTTSubscribeInfo +typedef struct MQTTSubscribeInfo { /** * @brief Quality of Service for subscription. @@ -181,12 +184,12 @@ struct MQTTSubscribeInfo * @brief Length of subscription topic filter. */ uint16_t topicFilterLength; -}; +} MQTTSubscribeInfo_t; /** * @brief MQTT PUBLISH packet parameters. */ -struct MqttPublishInfo +typedef struct MQTTPublishInfo { /** * @brief Quality of Service for message. @@ -222,12 +225,12 @@ struct MqttPublishInfo * @brief Message payload length. */ size_t payloadLength; -}; +} MQTTPublishInfo_t; /** * @brief MQTT incoming packet parameters. */ -struct MQTTPacketInfo +typedef struct MQTTPacketInfo { /** * @brief Type of incoming MQTT packet. @@ -243,7 +246,7 @@ struct MQTTPacketInfo * @brief Length of remaining serialized data. */ size_t remainingLength; -}; +} MQTTPacketInfo_t; /** * @brief Get the size and Remaining Length of an MQTT CONNECT packet. @@ -264,6 +267,33 @@ struct MQTTPacketInfo * * @return #MQTTBadParameter if the packet would exceed the size allowed by the * MQTT spec; #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTConnectInfo_t connectInfo = { 0 }; + * MQTTPublishInfo_t willInfo = { 0 }; + * size_t remainingLength = 0, packetSize = 0; + * + * // Initialize the connection info, the details are out of scope for this example. + * initializeConnectInfo( &connectInfo ); + * + * // Initialize the optional will info, the details are out of scope for this example. + * initializeWillInfo( &willInfo ); + * + * // Get the size requirement for the connect packet. + * status = MQTT_GetConnectPacketSize( + * &connectInfo, &willInfo, &remainingLength, &packetSize + * ); + * + * if( status == MQTTSuccess ) + * { + * // The application should allocate or use a static #MQTTFixedBuffer_t + * // of size >= packetSize to serialize the connect request. + * } + * @endcode */ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, @@ -287,6 +317,37 @@ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTConnectInfo_t connectInfo = { 0 }; + * MQTTPublishInfo_t willInfo = { 0 }; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * size_t remainingLength = 0, packetSize = 0; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // Assume connectInfo and willInfo are initialized. Get the size requirement for + * // the connect packet. + * status = MQTT_GetConnectPacketSize( + * &connectInfo, &willInfo, &remainingLength, &packetSize + * ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the connect packet into the fixed buffer. + * status = MQTT_SerializeConnect( &connectInfo, &willInfo, remainingLength, &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // The connect packet can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, const MQTTPublishInfo_t * pWillInfo, @@ -312,6 +373,37 @@ MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, * * @return #MQTTBadParameter if the packet would exceed the size allowed by the * MQTT spec; #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t subscriptionList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * size_t remainingLength = 0, packetSize = 0; + * // This is assumed to be a list of filters we want to subscribe to. + * const char * filters[ NUMBER_OF_SUBSCRIPTIONS ]; + * + * // Set each subscription. + * for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ ) + * { + * subscriptionList[ i ].qos = MQTTQoS0; + * // Each subscription needs a topic filter. + * subscriptionList[ i ].pTopicFilter = filters[ i ]; + * subscriptionList[ i ].topicFilterLength = strlen( filters[ i ] ); + * } + * + * // Get the size requirement for the subscribe packet. + * status = MQTT_GetSubscribePacketSize( + * &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, &remainingLength, &packetSize + * ); + * + * if( status == MQTTSuccess ) + * { + * // The application should allocate or use a static #MQTTFixedBuffer_t + * // of size >= packetSize to serialize the subscribe request. + * } + * @endcode */ MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, size_t subscriptionCount, @@ -336,6 +428,46 @@ MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscript * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t subscriptionList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * size_t remainingLength = 0, packetSize = 0; + * uint16_t packetId; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // Function to return a valid, unused packet identifier. The details are out of + * // scope for this example. + * packetId = getNewPacketId(); + * + * // Assume subscriptionList has been initialized. Get the subscribe packet size. + * status = MQTT_GetSubscribePacketSize( + * &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, &remainingLength, &packetSize + * ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the subscribe packet into the fixed buffer. + * status = MQTT_SerializeSubscribe( + * &subscriptionList[ 0 ], + * NUMBER_OF_SUBSCRIPTIONS, + * packetId, + * remainingLength, + * &fixedBuffer + * ); + * + * if( status == MQTTSuccess ) + * { + * // The subscribe packet can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, size_t subscriptionCount, @@ -362,6 +494,29 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL * * @return #MQTTBadParameter if the packet would exceed the size allowed by the * MQTT spec; #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t subscriptionList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * size_t remainingLength = 0, packetSize = 0; + * + * // Initialize the subscribe info. The details are out of scope for this example. + * initializeSubscribeInfo( &subscriptionList[ 0 ] ); + * + * // Get the size requirement for the unsubscribe packet. + * status = MQTT_GetUnsubscribePacketSize( + * &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, &remainingLength, &packetSize + * ); + * + * if( status == MQTTSuccess ) + * { + * // The application should allocate or use a static #MQTTFixedBuffer_t + * // of size >= packetSize to serialize the unsubscribe request. + * } + * @endcode */ MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscriptionList, size_t subscriptionCount, @@ -386,6 +541,46 @@ MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscri * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTSubscribeInfo_t subscriptionList[ NUMBER_OF_SUBSCRIPTIONS ] = { 0 }; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * size_t remainingLength = 0, packetSize = 0; + * uint16_t packetId; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // Function to return a valid, unused packet identifier. The details are out of + * // scope for this example. + * packetId = getNewPacketId(); + * + * // Assume subscriptionList has been initialized. Get the unsubscribe packet size. + * status = MQTT_GetUnsubscribePacketSize( + * &subscriptionList[ 0 ], NUMBER_OF_SUBSCRIPTIONS, &remainingLength, &packetSize + * ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the unsubscribe packet into the fixed buffer. + * status = MQTT_SerializeUnsubscribe( + * &subscriptionList[ 0 ], + * NUMBER_OF_SUBSCRIPTIONS, + * packetId, + * remainingLength, + * &fixedBuffer + * ); + * + * if( status == MQTTSuccess ) + * { + * // The unsubscribe packet can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptionList, size_t subscriptionCount, @@ -411,6 +606,33 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio * * @return #MQTTBadParameter if the packet would exceed the size allowed by the * MQTT spec or if invalid parameters are passed; #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTPublishInfo_t publishInfo = { 0 }; + * size_t remainingLength = 0, packetSize = 0; + * + * // Initialize the publish info. + * publishInfo.qos = MQTTQoS0; + * publishInfo.pTopicName = "/some/topic/name"; + * publishInfo.topicNameLength = strlen( publishInfo.pTopicName ); + * publishInfo.pPayload = "Hello World!"; + * publishInfo.payloadLength = strlen( "Hello World!" ); + * + * // Get the size requirement for the publish packet. + * status = MQTT_GetPublishPacketSize( + * &publishInfo, &remainingLength, &packetSize + * ); + * + * if( status == MQTTSuccess ) + * { + * // The application should allocate or use a static #MQTTFixedBuffer_t + * // of size >= packetSize to serialize the publish. + * } + * @endcode */ MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, size_t * pRemainingLength, @@ -438,6 +660,45 @@ MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTPublishInfo_t publishInfo = { 0 }; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * size_t remainingLength = 0, packetSize = 0; + * uint16_t packetId; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // A packet identifier is unused for QoS 0 publishes. Otherwise, a valid, unused packet + * // identifier must be used. + * packetId = 0; + * + * // Assume publishInfo has been initialized. Get publish packet size. + * status = MQTT_GetPublishPacketSize( + * &publishInfo, &remainingLength, &packetSize + * ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the publish packet into the fixed buffer. + * status = MQTT_SerializePublish( + * &publishInfo, + * packetId, + * remainingLength, + * &fixedBuffer + * ); + * + * if( status == MQTTSuccess ) + * { + * // The publish packet can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, uint16_t packetId, @@ -468,6 +729,54 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTPublishInfo_t publishInfo = { 0 }; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * size_t remainingLength = 0, packetSize = 0, headerSize = 0; + * uint16_t packetId; + * int32_t bytesSent; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // A packet identifier is unused for QoS 0 publishes. Otherwise, a valid, unused packet + * // identifier must be used. + * packetId = 0; + * + * // Assume publishInfo has been initialized. Get the publish packet size. + * status = MQTT_GetPublishPacketSize( + * &publishInfo, &remainingLength, &packetSize + * ); + * assert( status == MQTTSuccess ); + * // The payload will not be serialized, so the the fixed buffer does not need to hold it. + * assert( ( packetSize - publishInfo.payloadLength ) <= BUFFER_SIZE ); + * + * // Serialize the publish packet header into the fixed buffer. + * status = MQTT_SerializePublishHeader( + * &publishInfo, + * packetId, + * remainingLength, + * &fixedBuffer, + * &headerSize + * ); + * + * if( status == MQTTSuccess ) + * { + * // The publish header and payload can now be sent to the broker. + * // mqttSocket here is a socket descriptor created and connected to the MQTT + * // broker outside of this function. + * bytesSent = send( mqttSocket, ( void * ) fixedBuffer.pBuffer, headerSize, 0 ); + * assert( bytesSent == headerSize ); + * bytesSent = send( mqttSocket, publishInfo.pPayload, publishInfo.payloadLength, 0 ); + * assert( bytesSent == publishInfo.payloadLength ); + * } + * @endcode */ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo, uint16_t packetId, @@ -485,6 +794,36 @@ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo * @param[in] packetId Packet ID of the publish. * * @return #MQTTBadParameter, #MQTTNoMemory, or #MQTTSuccess. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * uint16_t packetId; + * uint8_t packetType; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * // The fixed buffer must be large enough to hold 4 bytes. + * assert( BUFFER_SIZE >= MQTT_PUBLISH_ACK_PACKET_SIZE ); + * + * // The packet ID must be the same as the original publish packet. + * packetId = publishPacketId; + * + * // The byte representing a packet of type ACK. This function accepts PUBACK, PUBREC, PUBREL, or PUBCOMP. + * packetType = MQTT_PACKET_TYPE_PUBACK; + * + * // Serialize the publish acknowledgment into the fixed buffer. + * status = MQTT_SerializeAck( &fixedBuffer, packetType, packetId ); + * + * if( status == MQTTSuccess ) + * { + * // The publish acknowledgment can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pFixedBuffer, uint8_t packetType, @@ -495,7 +834,24 @@ MQTTStatus_t MQTT_SerializeAck( const MQTTFixedBuffer_t * pFixedBuffer, * * @param[out] pPacketSize The size of the MQTT DISCONNECT packet. * - * @return Always returns #MQTTSuccess. + * @return #MQTTSuccess, or #MQTTBadParameter if @p pPacketSize is NULL. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * size_t packetSize = 0; + * + * // Get the size requirement for the disconnect packet. + * status = MQTT_GetDisconnectPacketSize( &packetSize ); + * assert( status == MQTTSuccess ); + * assert( packetSize == 2 ); + * + * // The application should allocate or use a static #MQTTFixedBuffer_t of + * // size >= 2 to serialize the disconnect packet. + * + * @endcode */ MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize ); @@ -510,6 +866,31 @@ MQTTStatus_t MQTT_GetDisconnectPacketSize( size_t * pPacketSize ); * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // Get the disconnect packet size. + * status = MQTT_GetDisconnectPacketSize( &packetSize ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the disconnect into the fixed buffer. + * status = MQTT_SerializeDisconnect( &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // The disconnect packet can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pFixedBuffer ); @@ -519,6 +900,23 @@ MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pFixedBuffer ); * @param[out] pPacketSize The size of the MQTT PINGREQ packet. * * @return #MQTTSuccess or #MQTTBadParameter if pPacketSize is NULL. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * size_t packetSize = 0; + * + * // Get the size requirement for the ping request packet. + * status = MQTT_GetPingreqPacketSize( &packetSize ); + * assert( status == MQTTSuccess ); + * assert( packetSize == 2 ); + * + * // The application should allocate or use a static #MQTTFixedBuffer_t of + * // size >= 2 to serialize the ping request. + * + * @endcode */ MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize ); @@ -533,6 +931,31 @@ MQTTStatus_t MQTT_GetPingreqPacketSize( size_t * pPacketSize ); * @return #MQTTNoMemory if pFixedBuffer is too small to hold the MQTT packet; * #MQTTBadParameter if invalid parameters are passed; * #MQTTSuccess otherwise. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTFixedBuffer_t fixedBuffer; + * uint8_t buffer[ BUFFER_SIZE ]; + * + * fixedBuffer.pBuffer = buffer; + * fixedBuffer.size = BUFFER_SIZE; + * + * // Get the ping request packet size. + * status = MQTT_GetPingreqPacketSize( &packetSize ); + * assert( status == MQTTSuccess ); + * assert( packetSize <= BUFFER_SIZE ); + * + * // Serialize the ping request into the fixed buffer. + * status = MQTT_SerializePingreq( &fixedBuffer ); + * + * if( status == MQTTSuccess ) + * { + * // The ping request can now be sent to the broker. + * } + * @endcode */ MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pFixedBuffer ); @@ -544,6 +967,54 @@ MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pFixedBuffer ); * @param[out] pPublishInfo Struct containing information about the publish. * * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess. + * + * <b>Example</b> + * @code{c} + * + * // TransportRecv_t function for reading from the network. + * int32_t socket_recv( + * NetworkContext_t * pNetworkContext, + * void * pBuffer, + * size_t bytesToRecv + * ); + * // Some context to be used with the above transport receive function. + * NetworkContext_t networkContext; + * + * // Other variables used in this example. + * MQTTStatus_t status; + * MQTTPacketInfo_t incomingPacket; + * MQTTPublishInfo_t publishInfo = { 0 }; + * uint16_t packetId; + * + * int32_t bytesRecvd; + * // A buffer to hold remaining data of the incoming packet. + * uint8_t buffer[ BUFFER_SIZE ]; + * + * // Populate all fields of the incoming packet. + * status = MQTT_GetIncomingPacketTypeAndLength( + * socket_recv, + * &networkContext, + * &incomingPacket + * ); + * assert( status == MQTTSuccess ); + * assert( incomingPacket.remainingLength <= BUFFER_SIZE ); + * bytesRecvd = socket_recv( + * &networkContext, + * ( void * ) buffer, + * incomingPacket.remainingLength + * ); + * incomingPacket.pRemainingData = buffer; + * + * // Deserialize the publish information if the incoming packet is a publish. + * if( ( incomingPacket.type & 0xF0 ) == MQTT_PACKET_TYPE_PUBLISH ) + * { + * status = MQTT_DeserializePublish( &incomingPacket, &packetId, &publishInfo ); + * if( status == MQTTSuccess ) + * { + * // The deserialized publish information can now be used from `publishInfo`. + * } + * } + * @endcode */ MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket, uint16_t * pPacketId, @@ -558,7 +1029,34 @@ MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket, * in CONNACK or PINGRESP. * @param[out] pSessionPresent Boolean flag from a CONNACK indicating present session. * - * @return #MQTTBadParameter, #MQTTBadResponse, or #MQTTSuccess. + * @return #MQTTBadParameter, #MQTTBadResponse, #MQTTServerRefused, or #MQTTSuccess. + * + * <b>Example</b> + * @code{c} + * + * // Variables used in this example. + * MQTTStatus_t status; + * MQTTPacketInfo_t incomingPacket; + * // Used for SUBACK, UNSUBACK, PUBACK, PUBREC, PUBREL, and PUBCOMP. + * uint16_t packetId; + * // Used for CONNACK. + * bool sessionPresent; + * + * // Receive an incoming packet and populate all fields. The details are out of scope + * // for this example. + * receiveIncomingPacket( &incomingPacket ); + * + * // Deserialize ack information if the incoming packet is not a publish. + * if( ( incomingPacket.type & 0xF0 ) != MQTT_PACKET_TYPE_PUBLISH ) + * { + * status = MQTT_DeserializeAck( &incomingPacket, &packetId, &sessionPresent ); + * if( status == MQTTSuccess ) + * { + * // The packet ID or session present flag information is available. For + * // ping response packets, the only information is the status code. + * } + * } + * @endcode */ MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket, uint16_t * pPacketId, @@ -581,6 +1079,48 @@ MQTTStatus_t MQTT_DeserializeAck( const MQTTPacketInfo_t * pIncomingPacket, * #MQTTRecvFailed on transport receive failure, * #MQTTBadResponse if an invalid packet is read, and * #MQTTNoDataAvailable if there is nothing to read. + * + * <b>Example</b> + * @code{c} + * + * // TransportRecv_t function for reading from the network. + * int32_t socket_recv( + * NetworkContext_t * pNetworkContext, + * void * pBuffer, + * size_t bytesToRecv + * ); + * // Some context to be used with above transport receive function. + * NetworkContext_t networkContext; + * + * // Struct to hold the incoming packet information. + * MQTTPacketInfo_t incomingPacket; + * MQTTStatus_t status = MQTTSuccess; + * int32_t bytesRecvd; + * // Buffer to hold the remaining data of the incoming packet. + * uint8_t buffer[ BUFFER_SIZE ]; + * + * // Loop until data is available to be received. + * do{ + * status = MQTT_GetIncomingPacketTypeAndLength( + * socket_recv, + * &networkContext, + * &incomingPacket + * ); + * } while( status == MQTTNoDataAvailable ); + * + * assert( status == MQTTSuccess ); + * + * // Receive the rest of the incoming packet. + * assert( incomingPacket.remainingLength <= BUFFER_SIZE ); + * bytesRecvd = socket_recv( + * &networkContext, + * ( void * ) buffer, + * incomingPacket.remainingLength + * ); + * + * // Set the remaining data field. + * incomingPacket.pRemainingData = buffer; + * @endcode */ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, NetworkContext_t * pNetworkContext, diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h index 75a54de7d..af5e26169 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt_state.h @@ -19,14 +19,25 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt_state.h + * @brief Function to keep state of MQTT PUBLISH packet deliveries. + */ #ifndef MQTT_STATE_H #define MQTT_STATE_H #include "mqtt.h" +/** + * @brief Initializer value for an #MQTTStateCursor_t, indicating a search + * should start at the beginning of a state record array + */ #define MQTT_STATE_CURSOR_INITIALIZER ( size_t ) 0 /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this enum is private. + * * @brief Value indicating either send or receive. */ typedef enum MQTTStateOperation @@ -34,6 +45,7 @@ typedef enum MQTTStateOperation MQTT_SEND, MQTT_RECEIVE } MQTTStateOperation_t; +/** @endcond */ /** * @brief Cursor for iterating through state records. @@ -41,7 +53,10 @@ typedef enum MQTTStateOperation typedef size_t MQTTStateCursor_t; /** - * @brief Reserve an entry for an outgoing QoS 1/2 publish. + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * + * @brief Reserve an entry for an outgoing QoS 1 or Qos 2 publish. * * @param[in] pMqttContext Initialized MQTT context. * @param[in] packetId The ID of the publish packet. @@ -52,8 +67,12 @@ typedef size_t MQTTStateCursor_t; MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, uint16_t packetId, MQTTQoS_t qos ); +/** @endcond */ /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief Calculate the new state for a publish from its qos and operation type. * * @param[in] opType Send or Receive. @@ -63,8 +82,12 @@ MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, */ MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, MQTTQoS_t qos ); +/** @endcond */ /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief Update the state record for a PUBLISH packet. * * @param[in] pMqttContext Initialized MQTT context. @@ -81,8 +104,12 @@ MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, MQTTStateOperation_t opType, MQTTQoS_t qos, MQTTPublishState_t * pNewState ); +/** @endcond */ /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief Calculate the state from a PUBACK, PUBREC, PUBREL, or PUBCOMP. * * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. @@ -94,8 +121,12 @@ MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType, MQTTStateOperation_t opType, MQTTQoS_t qos ); +/** @endcond */ /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief Update the state record for an ACKed publish. * * @param[in] pMqttContext Initialized MQTT context. @@ -111,8 +142,12 @@ MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, MQTTPubAckType_t packetType, MQTTStateOperation_t opType, MQTTPublishState_t * pNewState ); +/** @endcond */ /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief Get the packet ID of next pending PUBREL ack to be resent. * * This function will need to be called to get the packet for which a PUBREL @@ -127,6 +162,7 @@ MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, MQTTStateCursor_t * pCursor, MQTTPublishState_t * pState ); +/** @endcond */ /** * @brief Get the packet ID of next pending publish to be resent. @@ -143,6 +179,9 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, MQTTStateCursor_t * pCursor ); /** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section, this function is private. + * * @brief State to string conversion for state engine. * * @param[in] state The state to convert to a string. @@ -150,5 +189,6 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, * @return The string representation of the state. */ const char * MQTT_State_strerror( MQTTPublishState_t state ); +/** @endcond */ #endif /* ifndef MQTT_STATE_H */ diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c index 0d961a457..dcae57ecb 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c @@ -19,6 +19,10 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt.c + * @brief Implements the user-facing functions in mqtt.h. + */ #include <string.h> #include <assert.h> @@ -41,6 +45,18 @@ #define MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ( 5U ) #endif +/** + * @brief Number of milliseconds to wait for a ping response to a ping + * request as part of the keep-alive mechanism. + * + * If a ping response is not received before this timeout, then + * #MQTT_ProcessLoop will return #MQTTKeepAliveTimeout. + */ +#ifndef MQTT_PINGRESP_TIMEOUT_MS + /* Wait 0.5 seconds by default for a ping response. */ + #define MQTT_PINGRESP_TIMEOUT_MS ( 500U ) +#endif + /*-----------------------------------------------------------*/ /** @@ -97,7 +113,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, /** * @brief Discard a packet from the transport interface. * - * @param[in] PContext MQTT Connection context. + * @param[in] pContext MQTT Connection context. * @param[in] remainingLength Remaining length of the packet to dump. * @param[in] timeoutMs Time remaining to discard the packet. * @@ -200,7 +216,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, * #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ; * #MQTTBadResponse if an invalid packet is received; * #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before - * pContext->pingRespTimeoutMs milliseconds; + * #MQTT_PINGRESP_TIMEOUT_MS milliseconds; * #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an * invalid transition for the internal state machine; * #MQTTSuccess on success. @@ -301,6 +317,251 @@ static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, const MQTTPublishInfo_t * pPublishInfo, uint16_t packetId ); +/** + * topic filter, this function handles the following 2 cases: + * - When the topic filter ends with "/+" or "/#" characters, but the topic + * name only ends with '/'. + * - When the topic filter ends with "/#" characters, but the topic name + * ends at the parent level. + * + * @note This function ASSUMES that the topic name been consumed in linear + * matching with the topic filer, but the topic filter has remaining characters + * to be matched. + * + * @param[in] pTopicFilter The topic filter containing the wildcard. + * @param[in] topicFilterLength Length of the topic filter being examined. + * @param[in] filterIndex Index of the topic filter being examined. + * + * @return Returns whether the topic filter and the topic name match. + */ +static bool matchEndWildcardsSpecialCases( const char * pTopicFilter, + uint16_t topicFilterLength, + uint16_t filterIndex ); + +/** + * @brief Attempt to match topic name with a topic filter starting with a wildcard. + * + * If the topic filter starts with a '+' (single-level) wildcard, the function + * advances the @a pNameIndex by a level in the topic name. + * If the topic filter starts with a '#' (multi-level) wildcard, the function + * concludes that both the topic name and topic filter match. + * + * @param[in] pTopicName The topic name to match. + * @param[in] topicNameLength Length of the topic name. + * @param[in] pTopicFilter The topic filter to match. + * @param[in] topicFilterLength Length of the topic filter. + * @param[in,out] pNameIndex Current index in topic name being examined.. It is + * advanced by one level for `+` wildcards. + * @param[in] filterIndex Current index in the topic filter being examined.. + * @param[out] pMatch Whether the topic filter and topic name match. + * + * @return `true` if the caller of this function should exit; `false` if the + * caller should continue parsing the topics. + */ +static bool matchWildcards( const char * pTopicName, + uint16_t topicNameLength, + const char * pTopicFilter, + uint16_t topicFilterLength, + uint16_t * pNameIndex, + uint16_t filterIndex, + bool * pMatch ); + +/** + * @brief Match a topic name and topic filter allowing the use of wildcards. + * + * @param[in] pTopicName The topic name to check. + * @param[in] topicNameLength Length of the topic name. + * @param[in] pTopicFilter The topic filter to check. + * @param[in] topicFilterLength Length of topic filter. + * + * @return `true` if the topic name and topic filter match; `false` otherwise. + */ +static bool matchTopicFilter( const char * pTopicName, + uint16_t topicNameLength, + const char * pTopicFilter, + uint16_t topicFilterLength ); + +/*-----------------------------------------------------------*/ + +static bool matchEndWildcardsSpecialCases( const char * pTopicFilter, + uint16_t topicFilterLength, + uint16_t filterIndex ) +{ + bool matchFound = false; + + assert( pTopicFilter != NULL ); + assert( topicFilterLength != 0 ); + + /* Check if the topic filter has 2 remaining characters and it ends in + * "/#". This check handles the case to match filter "sport/#" with topic + * "sport". The reason is that the '#' wildcard represents the parent and + * any number of child levels in the topic name.*/ + if( ( filterIndex == ( topicFilterLength - 3U ) ) && + ( pTopicFilter[ filterIndex + 1U ] == '/' ) && + ( pTopicFilter[ filterIndex + 2U ] == '#' ) ) + + { + matchFound = true; + } + + /* Check if the next character is "#" or "+" and the topic filter ends in + * "/#" or "/+". This check handles the cases to match: + * + * - Topic filter "sport/+" with topic "sport/". + * - Topic filter "sport/#" with topic "sport/". + */ + if( ( filterIndex == ( topicFilterLength - 2U ) ) && + ( pTopicFilter[ filterIndex ] == '/' ) ) + { + /* Check that the last character is a wildcard. */ + matchFound = ( ( pTopicFilter[ filterIndex + 1U ] == '+' ) || + ( pTopicFilter[ filterIndex + 1U ] == '#' ) ) ? true : false; + } + + return matchFound; +} + +/*-----------------------------------------------------------*/ + +static bool matchWildcards( const char * pTopicName, + uint16_t topicNameLength, + const char * pTopicFilter, + uint16_t topicFilterLength, + uint16_t * pNameIndex, + uint16_t filterIndex, + bool * pMatch ) +{ + bool shouldStopMatching = false; + bool locationIsValidForWildcard; + + assert( pTopicName != NULL ); + assert( topicNameLength != 0 ); + assert( pTopicFilter != NULL ); + assert( topicFilterLength != 0 ); + assert( pNameIndex != NULL ); + assert( pMatch != NULL ); + + /* Wild card in a topic filter is only valid either at the starting position + * or when it is preceded by a '/'.*/ + locationIsValidForWildcard = ( ( filterIndex == 0u ) || + ( pTopicFilter[ filterIndex - 1U ] == '/' ) + ) ? true : false; + + if( locationIsValidForWildcard == true ) + { + if( pTopicFilter[ filterIndex ] == '+' ) + { + /* Move topic name index to the end of the current level. The end of the + * current level is identified by '/'. */ + while( ( *pNameIndex < topicNameLength ) && ( pTopicName[ *pNameIndex ] != '/' ) ) + { + ( *pNameIndex )++; + } + + /* Decrement the topic name index for 2 different cases: + * - If the break condition is ( *pNameIndex < topicNameLength ), then + * we have reached past the end of the topic name and we move back the + * the index on the last character. + * - If the break condition is ( pTopicName[ *pNameIndex ] != '/' ), we + * move back the index on the '/' character. */ + ( *pNameIndex )--; + } + + /* '#' matches everything remaining in the topic name. It must be the + * last character in a topic filter. */ + else if( ( pTopicFilter[ filterIndex ] == '#' ) && + ( filterIndex == ( topicFilterLength - 1U ) ) ) + { + /* Subsequent characters don't need to be checked for the + * multi-level wildcard. */ + *pMatch = true; + shouldStopMatching = true; + } + else + { + /* Any character mismatch other than '+' or '#' means the topic + * name does not match the topic filter. */ + *pMatch = false; + shouldStopMatching = true; + } + } + else + { + /* If the location is not valid for a wildcard, the topic name does not + * match the topic filter. */ + *pMatch = false; + shouldStopMatching = true; + } + + return shouldStopMatching; +} + +/*-----------------------------------------------------------*/ + +static bool matchTopicFilter( const char * pTopicName, + uint16_t topicNameLength, + const char * pTopicFilter, + uint16_t topicFilterLength ) +{ + bool matchFound = false, shouldStopMatching = false; + uint16_t nameIndex = 0, filterIndex = 0; + + assert( pTopicName != NULL ); + assert( topicNameLength != 0 ); + assert( pTopicFilter != NULL ); + assert( topicFilterLength != 0 ); + + while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) ) + { + /* Check if the character in the topic name matches the corresponding + * character in the topic filter string. */ + if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] ) + { + /* If the topic name has been consumed but the topic filter has not + * been consumed, match for special cases when the topic filter ends + * with wildcard character. */ + if( nameIndex == ( topicNameLength - 1U ) ) + { + matchFound = matchEndWildcardsSpecialCases( pTopicFilter, + topicFilterLength, + filterIndex ); + } + } + else + { + /* Check for matching wildcards. */ + shouldStopMatching = matchWildcards( pTopicName, + topicNameLength, + pTopicFilter, + topicFilterLength, + &nameIndex, + filterIndex, + &matchFound ); + } + + if( ( matchFound == true ) || ( shouldStopMatching == true ) ) + { + break; + } + + /* Increment indexes. */ + nameIndex++; + filterIndex++; + } + + if( matchFound == false ) + { + /* If the end of both strings has been reached, they match. This represents the + * case when the topic filter contains the '+' wildcard at a non-starting position. + * For example, when matching either of "sport/+/player" OR "sport/hockey/+" topic + * filters with "sport/hockey/player" topic name. */ + matchFound = ( ( nameIndex == topicNameLength ) && + ( filterIndex == topicFilterLength ) ) ? true : false; + } + + return matchFound; +} + /*-----------------------------------------------------------*/ static int32_t sendPacket( MQTTContext_t * pContext, @@ -350,7 +611,7 @@ static int32_t sendPacket( MQTTContext_t * pContext, LogDebug( ( "BytesSent=%d, BytesRemaining=%lu," " TotalBytesSent=%d.", bytesSent, - bytesRemaining, + ( unsigned long ) bytesRemaining, totalBytesSent ) ); } } @@ -461,7 +722,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, LogDebug( ( "BytesReceived=%d, BytesRemaining=%lu, " "TotalBytesReceived=%d.", bytesRecvd, - bytesRemaining, + ( unsigned long ) bytesRemaining, totalBytesRecvd ) ); } @@ -493,6 +754,7 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, assert( pContext != NULL ); assert( pContext->getTime != NULL ); + bytesToReceive = pContext->networkBuffer.size; getTimeStampMs = pContext->getTime; @@ -512,7 +774,7 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, LogError( ( "Receive error while discarding packet." "ReceivedBytes=%d, ExpectedBytes=%lu.", bytesReceived, - bytesToReceive ) ); + ( unsigned long ) bytesToReceive ) ); receiveError = true; } else @@ -563,8 +825,8 @@ static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, LogError( ( "Incoming packet will be dumped: " "Packet length exceeds network buffer size." "PacketSize=%lu, NetworkBufferSize=%lu.", - incomingPacket.remainingLength, - pContext->networkBuffer.size ) ); + ( unsigned long ) incomingPacket.remainingLength, + ( unsigned long ) pContext->networkBuffer.size ) ); status = discardPacket( pContext, incomingPacket.remainingLength, remainingTimeMs ); @@ -585,7 +847,7 @@ static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, LogError( ( "Packet reception failed. ReceivedBytes=%d, " "ExpectedBytes=%lu.", bytesReceived, - bytesToReceive ) ); + ( unsigned long ) bytesToReceive ) ); status = MQTTRecvFailed; } } @@ -693,6 +955,8 @@ static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ) uint32_t now = 0U, keepAliveMs = 0U; assert( pContext != NULL ); + assert( pContext->getTime != NULL ); + now = pContext->getTime(); keepAliveMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec; @@ -704,7 +968,7 @@ static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext ) { /* Has time expired? */ if( calculateElapsedTime( now, pContext->pingReqSendTimeMs ) > - pContext->pingRespTimeoutMs ) + MQTT_PINGRESP_TIMEOUT_MS ) { status = MQTTKeepAliveTimeout; } @@ -727,13 +991,16 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, MQTTPublishState_t publishRecordState = MQTTStateNull; uint16_t packetIdentifier = 0U; MQTTPublishInfo_t publishInfo; + MQTTDeserializedInfo_t deserializedInfo; bool duplicatePublish = false; assert( pContext != NULL ); assert( pIncomingPacket != NULL ); + assert( pContext->appCallback != NULL ); status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo ); - LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%d.", status ) ); + LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.", + MQTT_Status_strerror( status ) ) ); if( status == MQTTSuccess ) { @@ -798,6 +1065,11 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, if( status == MQTTSuccess ) { + /* Set fields of deserialized struct. */ + deserializedInfo.packetIdentifier = packetIdentifier; + deserializedInfo.pPublishInfo = &publishInfo; + deserializedInfo.deserializationResult = status; + /* Invoke application callback to hand the buffer over to application * before sending acks. * Application callback will be invoked for all publishes, except for @@ -806,8 +1078,7 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext, { pContext->appCallback( pContext, pIncomingPacket, - packetIdentifier, - &publishInfo ); + &deserializedInfo ); } /* Send PUBACK or PUBREC if necessary. */ @@ -829,6 +1100,7 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, uint16_t packetIdentifier; MQTTPubAckType_t ackType; MQTTEventCallback_t appCallback; + MQTTDeserializedInfo_t deserializedInfo; assert( pContext != NULL ); assert( pIncomingPacket != NULL ); @@ -865,9 +1137,14 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext, if( status == MQTTSuccess ) { + /* Set fields of deserialized struct. */ + deserializedInfo.packetIdentifier = packetIdentifier; + deserializedInfo.deserializationResult = status; + deserializedInfo.pPublishInfo = NULL; + /* Invoke application callback to hand the buffer over to application * before sending acks. */ - appCallback( pContext, pIncomingPacket, packetIdentifier, NULL ); + appCallback( pContext, pIncomingPacket, &deserializedInfo ); /* Send PUBREL or PUBCOMP if necessary. */ status = sendPublishAcks( pContext, @@ -886,6 +1163,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, { MQTTStatus_t status = MQTTBadResponse; uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID; + MQTTDeserializedInfo_t deserializedInfo; /* We should always invoke the app callback unless we receive a PINGRESP * and are managing keep alive, or if we receive an unknown packet. We @@ -897,6 +1175,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, assert( pContext != NULL ); assert( pIncomingPacket != NULL ); + assert( pContext->appCallback != NULL ); appCallback = pContext->appCallback; @@ -914,7 +1193,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, case MQTT_PACKET_TYPE_PINGRESP: status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); - invokeAppCallback = ( manageKeepAlive == true ) ? false : true; + invokeAppCallback = ( ( status == MQTTSuccess ) && ( manageKeepAlive == false ) ) ? true : false; if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) ) { @@ -927,7 +1206,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, case MQTT_PACKET_TYPE_UNSUBACK: /* Deserialize and give these to the app provided callback. */ status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL ); - invokeAppCallback = true; + invokeAppCallback = ( ( status == MQTTSuccess ) || ( status == MQTTServerRefused ) ) ? true : false; break; default: @@ -938,9 +1217,15 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext, break; } - if( ( status == MQTTSuccess ) && ( invokeAppCallback == true ) ) + if( invokeAppCallback == true ) { - appCallback( pContext, pIncomingPacket, packetIdentifier, NULL ); + /* Set fields of deserialized struct. */ + deserializedInfo.packetIdentifier = packetIdentifier; + deserializedInfo.deserializationResult = status; + deserializedInfo.pPublishInfo = NULL; + appCallback( pContext, pIncomingPacket, &deserializedInfo ); + /* In case a SUBACK indicated refusal, reset the status to continue the loop. */ + status = MQTTSuccess; } return status; @@ -1031,8 +1316,8 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC { LogError( ( "Argument cannot be NULL: pContext=%p, " "pSubscriptionList=%p.", - pContext, - pSubscriptionList ) ); + ( void * ) pContext, + ( void * ) pSubscriptionList ) ); status = MQTTBadParameter; } else if( subscriptionCount == 0UL ) @@ -1104,7 +1389,7 @@ static MQTTStatus_t sendPublish( MQTTContext_t * pContext, } else { - LogDebug( "PUBLISH payload was not sent. Payload length was zero." ); + LogDebug( ( "PUBLISH payload was not sent. Payload length was zero." ) ); } } @@ -1280,8 +1565,8 @@ static MQTTStatus_t serializePublish( const MQTTContext_t * pContext, &remainingLength, &packetSize ); LogDebug( ( "PUBLISH packet size is %lu and remaining length is %lu.", - packetSize, - remainingLength ) ); + ( unsigned long ) packetSize, + ( unsigned long ) remainingLength ) ); if( status == MQTTSuccess ) { @@ -1291,7 +1576,7 @@ static MQTTStatus_t serializePublish( const MQTTContext_t * pContext, &( pContext->networkBuffer ), pHeaderSize ); LogDebug( ( "Serialized PUBLISH header size is %lu.", - *pHeaderSize ) ); + ( unsigned long ) *pHeaderSize ) ); } return status; @@ -1310,8 +1595,8 @@ static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext, { LogError( ( "Argument cannot be NULL: pContext=%p, " "pPublishInfo=%p.", - pContext, - pPublishInfo ) ); + ( void * ) pContext, + ( void * ) pPublishInfo ) ); status = MQTTBadParameter; } else if( ( pPublishInfo->qos != MQTTQoS0 ) && ( packetId == 0U ) ) @@ -1353,20 +1638,29 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext, LogError( ( "Argument cannot be NULL: pContext=%p, " "pTransportInterface=%p, " "pNetworkBuffer=%p", - pContext, - pTransportInterface, - pNetworkBuffer ) ); + ( void * ) pContext, + ( void * ) pTransportInterface, + ( void * ) pNetworkBuffer ) ); + status = MQTTBadParameter; + } + else if( getTimeFunction == NULL ) + { + LogError( ( "Invalid parameter: getTimeFunction is NULL" ) ); + status = MQTTBadParameter; + } + else if( userCallback == NULL ) + { + LogError( ( "Invalid parameter: userCallback is NULL" ) ); + status = MQTTBadParameter; + } + else if( pTransportInterface->recv == NULL ) + { + LogError( ( "Invalid parameter: pTransportInterface->recv is NULL" ) ); status = MQTTBadParameter; } - else if( ( getTimeFunction == NULL ) || ( userCallback == NULL ) || - ( pTransportInterface->recv == NULL ) || ( pTransportInterface->send == NULL ) ) + else if( pTransportInterface->send == NULL ) { - LogError( ( "Function pointers cannot be NULL: getTimeFunction=%p, userCallback=%p, " - "transportRecv=%p, transportRecvSend=%p", - getTimeFunction, - userCallback, - pTransportInterface->recv, - pTransportInterface->send ) ); + LogError( ( "Invalid parameter: pTransportInterface->send is NULL" ) ); status = MQTTBadParameter; } else @@ -1405,9 +1699,9 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, { LogError( ( "Argument cannot be NULL: pContext=%p, " "pConnectInfo=%p, pSessionPresent=%p.", - pContext, - pConnectInfo, - pSessionPresent ) ); + ( void * ) pContext, + ( void * ) pConnectInfo, + ( void * ) pSessionPresent ) ); status = MQTTBadParameter; } @@ -1419,8 +1713,8 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, &remainingLength, &packetSize ); LogDebug( ( "CONNECT packet size is %lu and remaining length is %lu.", - packetSize, - remainingLength ) ); + ( unsigned long ) packetSize, + ( unsigned long ) remainingLength ) ); } if( status == MQTTSuccess ) @@ -1504,8 +1798,8 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, &remainingLength, &packetSize ); LogDebug( ( "SUBSCRIBE packet size is %lu and remaining length is %lu.", - packetSize, - remainingLength ) ); + ( unsigned long ) packetSize, + ( unsigned long ) remainingLength ) ); } if( status == MQTTSuccess ) @@ -1636,7 +1930,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) if( status == MQTTSuccess ) { LogDebug( ( "MQTT PINGREQ packet size is %lu.", - packetSize ) ); + ( unsigned long ) packetSize ) ); } else { @@ -1699,8 +1993,8 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, &remainingLength, &packetSize ); LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.", - packetSize, - remainingLength ) ); + ( unsigned long ) packetSize, + ( unsigned long ) remainingLength ) ); } if( status == MQTTSuccess ) @@ -1755,7 +2049,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) /* Get MQTT DISCONNECT packet size. */ status = MQTT_GetDisconnectPacketSize( &packetSize ); LogDebug( ( "MQTT DISCONNECT packet size is %lu.", - packetSize ) ); + ( unsigned long ) packetSize ) ); } if( status == MQTTSuccess ) @@ -1930,6 +2224,133 @@ uint16_t MQTT_GetPacketId( MQTTContext_t * pContext ) /*-----------------------------------------------------------*/ +MQTTStatus_t MQTT_MatchTopic( const char * pTopicName, + const uint16_t topicNameLength, + const char * pTopicFilter, + const uint16_t topicFilterLength, + bool * pIsMatch ) +{ + MQTTStatus_t status = MQTTSuccess; + bool topicFilterStartsWithWildcard = false; + bool matchStatus = false; + + if( ( pTopicName == NULL ) || ( topicNameLength == 0u ) ) + { + LogError( ( "Invalid paramater: Topic name should be non-NULL and its " + "length should be > 0: TopicName=%p, TopicNameLength=%u", + ( void * ) pTopicName, + topicNameLength ) ); + + status = MQTTBadParameter; + } + else if( ( pTopicFilter == NULL ) || ( topicFilterLength == 0u ) ) + { + LogError( ( "Invalid paramater: Topic filter should be non-NULL and " + "its length should be > 0: TopicName=%p, TopicFilterLength=%u", + ( void * ) pTopicFilter, + topicFilterLength ) ); + status = MQTTBadParameter; + } + else if( pIsMatch == NULL ) + { + LogError( ( "Invalid paramater: Output parameter, pIsMatch, is NULL" ) ); + status = MQTTBadParameter; + } + else + { + /* Check for an exact match if the incoming topic name and the registered + * topic filter length match. */ + if( topicNameLength == topicFilterLength ) + { + matchStatus = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 ) ? true : false; + } + + if( matchStatus == false ) + { + /* If an exact match was not found, match against wildcard characters in + * topic filter.*/ + + /* Determine if topic filter starts with a wildcard. */ + topicFilterStartsWithWildcard = ( ( pTopicFilter[ 0 ] == '+' ) || + ( pTopicFilter[ 0 ] == '#' ) ) ? true : false; + + /* Note: According to the MQTT 3.1.1 specification, incoming PUBLISH topic names + * starting with "$" character cannot be matched against topic filter starting with + * a wildcard, i.e. for example, "$SYS/sport" cannot be matched with "#" or + * "+/sport" topic filters. */ + if( !( ( pTopicName[ 0 ] == '$' ) && ( topicFilterStartsWithWildcard == true ) ) ) + { + matchStatus = matchTopicFilter( pTopicName, topicNameLength, pTopicFilter, topicFilterLength ); + } + } + + /* Update the output parameter with the match result. */ + *pIsMatch = matchStatus; + } + + return status; +} + +/*-----------------------------------------------------------*/ + +MQTTStatus_t MQTT_GetSubAckStatusCodes( const MQTTPacketInfo_t * pSubackPacket, + uint8_t ** pPayloadStart, + size_t * pPayloadSize ) +{ + MQTTStatus_t status = MQTTSuccess; + + if( pSubackPacket == NULL ) + { + LogError( ( "Invalid parameter: pSubackPacket is NULL." ) ); + status = MQTTBadParameter; + } + else if( pPayloadStart == NULL ) + { + LogError( ( "Invalid parameter: pPayloadStart is NULL." ) ); + status = MQTTBadParameter; + } + else if( pPayloadSize == NULL ) + { + LogError( ( "Invalid parameter: pPayloadSize is NULL." ) ); + status = MQTTBadParameter; + } + else if( pSubackPacket->type != MQTT_PACKET_TYPE_SUBACK ) + { + LogError( ( "Invalid parameter: Input packet is not a SUBACK packet: " + "ExpectedType=%02x, InputType=%02x", + MQTT_PACKET_TYPE_SUBACK, pSubackPacket->type ) ); + status = MQTTBadParameter; + } + else if( pSubackPacket->pRemainingData == NULL ) + { + LogError( ( "Invalid parameter: pSubackPacket->pRemainingData is NULL" ) ); + status = MQTTBadParameter; + } + + /* A SUBACK must have a remaining length of at least 3 to accommodate the + * packet identifier and at least 1 return code. */ + else if( pSubackPacket->remainingLength < 3U ) + { + LogError( ( "Invalid parameter: Packet remaining length is invalid: " + "Should be greater than 2 for SUBACK packet: InputRemainingLength=%lu", + ( unsigned long ) pSubackPacket->remainingLength ) ); + status = MQTTBadParameter; + } + else + { + /* According to the MQTT 3.1.1 protocol specification, the "Remaining Length" field is a + * length of the variable header (2 bytes) plus the length of the payload. + * Therefore, we add 2 positions for the starting address of the payload, and + * subtract 2 bytes from the remaining length for the length of the payload.*/ + *pPayloadStart = pSubackPacket->pRemainingData + ( ( uint16_t ) sizeof( uint16_t ) ); + *pPayloadSize = pSubackPacket->remainingLength - sizeof( uint16_t ); + } + + return status; +} + +/*-----------------------------------------------------------*/ + const char * MQTT_Status_strerror( MQTTStatus_t status ) { const char * str = NULL; diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c index 7ea35e3e4..783de8691 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c @@ -19,6 +19,10 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt_lightweight.c + * @brief Implements the user-facing functions in mqtt_lightweight.h. + */ #include <string.h> #include <assert.h> @@ -58,7 +62,7 @@ */ #define MQTT_DISCONNECT_PACKET_SIZE ( 2UL ) -/* +/** * @brief A PINGREQ packet is always 2 bytes in size, defined by MQTT 3.1.1 spec. */ #define MQTT_PACKET_PINGREQ_SIZE ( 2U ) @@ -135,11 +139,13 @@ /*-----------------------------------------------------------*/ -/* MQTT Subscription packet types. */ +/** + * @brief MQTT Subscription packet types. + */ typedef enum MQTTSubscriptionType { - MQTT_SUBSCRIBE, - MQTT_UNSUBSCRIBE + MQTT_SUBSCRIBE, /**< @brief The type is a SUBSCRIBE packet. */ + MQTT_UNSUBSCRIBE /**< @brief The type is a UNSUBSCRIBE packet. */ } MQTTSubscriptionType_t; /*-----------------------------------------------------------*/ @@ -147,9 +153,9 @@ typedef enum MQTTSubscriptionType /** * @brief Serializes MQTT PUBLISH packet into the buffer provided. * - * This function serializes MQTT PUBLISH packet into #pFixedBuffer.pBuffer. + * This function serializes MQTT PUBLISH packet into #MQTTFixedBuffer_t.pBuffer. * Copy of the payload into the buffer is done as part of the serialization - * only if #serializePayload is true. + * only if @p serializePayload is true. * * @brief param[in] pPublishInfo Publish information. * @brief param[in] remainingLength Remaining length of the PUBLISH packet. @@ -237,16 +243,16 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, const MQTTFixedBuffer_t * pFixedBuffer ); /** - * Prints the appropriate message for the CONNACK response code if logs are - * enabled. + * @brief Prints the appropriate message for the CONNACK response code if logs + * are enabled. * * @param[in] responseCode MQTT standard CONNACK response code. */ static void logConnackResponse( uint8_t responseCode ); /** - * Encodes the remaining length of the packet using the variable length encoding - * scheme provided in the MQTT v3.1.1 specification. + * @brief Encodes the remaining length of the packet using the variable length + * encoding scheme provided in the MQTT v3.1.1 specification. * * @param[out] pDestination The destination buffer to store the encoded remaining * length. @@ -258,7 +264,7 @@ static uint8_t * encodeRemainingLength( uint8_t * pDestination, size_t length ); /** - * Retrieve the size of the remaining length if it were to be encoded. + * @brief Retrieve the size of the remaining length if it were to be encoded. * * @param[in] length The remaining length to be encoded. * @@ -267,7 +273,7 @@ static uint8_t * encodeRemainingLength( uint8_t * pDestination, static size_t remainingLengthEncodedSize( size_t length ); /** - * Encode a string whose size is at maximum 16 bits in length. + * @brief Encode a string whose size is at maximum 16 bits in length. * * @param[out] pDestination Destination buffer for the encoding. * @param[in] pSource The source string to encode. @@ -280,8 +286,8 @@ static uint8_t * encodeString( uint8_t * pDestination, uint16_t sourceLength ); /** - * Retrieves and decodes the Remaining Length from the network interface by - * reading a single byte at a time. + * @brief Retrieves and decodes the Remaining Length from the network interface + * by reading a single byte at a time. * * @param[in] recvFunc Network interface receive function. * @param[in] pNetworkContext Network interface context to the receive function. @@ -291,6 +297,133 @@ static uint8_t * encodeString( uint8_t * pDestination, static size_t getRemainingLength( TransportRecv_t recvFunc, NetworkContext_t * pNetworkContext ); +/** + * @brief Check if an incoming packet type is valid. + * + * @param[in] packetType The packet type to check. + * + * @return `true` if the packet type is valid; `false` otherwise. + */ +static bool incomingPacketValid( uint8_t packetType ); + +/** + * @brief Check the remaining length of an incoming PUBLISH packet against some + * value for QoS 0, or for QoS 1 and 2. + * + * The remaining length for a QoS 1 and 2 packet will always be two greater than + * for a QoS 0. + * + * @param[in] remainingLength Remaining length of the PUBLISH packet. + * @param[in] qos The QoS of the PUBLISH. + * @param[in] qos0Minimum Minimum possible remaining length for a QoS 0 PUBLISH. + * + * @return #MQTTSuccess or #MQTTBadResponse. + */ +static MQTTStatus_t checkPublishRemainingLength( size_t remainingLength, + MQTTQoS_t qos, + size_t qos0Minimum ); + +/** + * @brief Process the flags of an incoming PUBLISH packet. + * + * @param[in] publishFlags Flags of an incoming PUBLISH. + * @param[in, out] pPublishInfo Pointer to #MQTTPublishInfo_t struct where + * output will be written. + * + * @return #MQTTSuccess or #MQTTBadResponse. + */ +static MQTTStatus_t processPublishFlags( uint8_t publishFlags, + MQTTPublishInfo_t * pPublishInfo ); + +/** + * @brief Deserialize a CONNACK packet. + * + * Converts the packet from a stream of bytes to an #MQTTStatus_t. + * + * @param[in] pConnack Pointer to an MQTT packet struct representing a + * CONNACK. + * @param[out] pSessionPresent Whether a previous session was present. + * + * @return #MQTTSuccess if CONNACK specifies that CONNECT was accepted; + * #MQTTServerRefused if CONNACK specifies that CONNECT was rejected; + * #MQTTBadResponse if the CONNACK packet doesn't follow MQTT spec. + */ +static MQTTStatus_t deserializeConnack( const MQTTPacketInfo_t * pConnack, + bool * pSessionPresent ); + +/** + * @brief Decode the status bytes of a SUBACK packet to a #MQTTStatus_t. + * + * @param[in] statusCount Number of status bytes in the SUBACK. + * @param[in] pStatusStart The first status byte in the SUBACK. + * + * @return #MQTTSuccess, #MQTTServerRefused, or #MQTTBadResponse. + */ +static MQTTStatus_t readSubackStatus( size_t statusCount, + const uint8_t * pStatusStart ); + +/** + * @brief Deserialize a SUBACK packet. + * + * Converts the packet from a stream of bytes to an #MQTTStatus_t and extracts + * the packet identifier. + * + * @param[in] pSuback Pointer to an MQTT packet struct representing a SUBACK. + * @param[out] pPacketIdentifier Packet ID of the SUBACK. + * + * @return #MQTTSuccess if SUBACK is valid; #MQTTBadResponse if SUBACK packet + * doesn't follow the MQTT spec. + */ +static MQTTStatus_t deserializeSuback( const MQTTPacketInfo_t * pSuback, + uint16_t * pPacketIdentifier ); + +/** + * @brief Deserialize a PUBLISH packet received from the server. + * + * Converts the packet from a stream of bytes to an #MQTTPublishInfo_t and + * extracts the packet identifier. Also prints out debug log messages about the + * packet. + * + * @param[in] pIncomingPacket Pointer to an MQTT packet struct representing a + * PUBLISH. + * @param[out] pPacketId Packet identifier of the PUBLISH. + * @param[out] pPublishInfo Pointer to #MQTTPublishInfo_t where output is + * written. + * + * @return #MQTTSuccess if PUBLISH is valid; #MQTTBadResponse + * if the PUBLISH packet doesn't follow MQTT spec. + */ +static MQTTStatus_t deserializePublish( const MQTTPacketInfo_t * pIncomingPacket, + uint16_t * pPacketId, + MQTTPublishInfo_t * pPublishInfo ); + +/** + * @brief Deserialize an UNSUBACK, PUBACK, PUBREC, PUBREL, or PUBCOMP packet. + * + * Converts the packet from a stream of bytes to an #MQTTStatus_t and extracts + * the packet identifier. + * + * @param[in] pAck Pointer to the MQTT packet structure representing the packet. + * @param[out] pPacketIdentifier Packet ID of the ack type packet. + * + * @return #MQTTSuccess if UNSUBACK, PUBACK, PUBREC, PUBREL, or PUBCOMP is valid; + * #MQTTBadResponse if the packet doesn't follow the MQTT spec. + */ +static MQTTStatus_t deserializeSimpleAck( const MQTTPacketInfo_t * pAck, + uint16_t * pPacketIdentifier ); + +/** + * @brief Deserialize a PINGRESP packet. + * + * Converts the packet from a stream of bytes to an #MQTTStatus_t. + * + * @param[in] pPingresp Pointer to an MQTT packet struct representing a PINGRESP. + * + * @return #MQTTSuccess if PINGRESP is valid; #MQTTBadResponse if the PINGRESP + * packet doesn't follow MQTT spec. + */ +static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp ); + /*-----------------------------------------------------------*/ static size_t remainingLengthEncodedSize( size_t length ) @@ -322,8 +455,8 @@ static size_t remainingLengthEncodedSize( size_t length ) } LogDebug( ( "Encoded size for length %lu is %lu bytes.", - length, - encodedSize ) ); + ( unsigned long ) length, + ( unsigned long ) encodedSize ) ); return encodedSize; } @@ -433,8 +566,8 @@ static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, LogError( ( "PUBLISH payload length of %lu cannot exceed " "%lu so as not to exceed the maximum " "remaining length of MQTT 3.1.1 packet( %lu ).", - pPublishInfo->payloadLength, - payloadLimit, + ( unsigned long ) pPublishInfo->payloadLength, + ( unsigned long ) payloadLimit, MQTT_MAX_REMAINING_LENGTH ) ); status = false; } @@ -454,8 +587,8 @@ static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, LogError( ( "PUBLISH payload length of %lu cannot exceed " "%lu so as not to exceed the maximum " "remaining length of MQTT 3.1.1 packet( %lu ).", - pPublishInfo->payloadLength, - payloadLimit, + ( unsigned long ) pPublishInfo->payloadLength, + ( unsigned long ) payloadLimit, MQTT_MAX_REMAINING_LENGTH ) ); status = false; } @@ -471,8 +604,8 @@ static bool calculatePublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, } LogDebug( ( "PUBLISH packet remaining length=%lu and packet size=%lu.", - *pRemainingLength, - *pPacketSize ) ); + ( unsigned long ) *pRemainingLength, + ( unsigned long ) *pPacketSize ) ); return status; } @@ -493,9 +626,9 @@ static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo, assert( pPublishInfo != NULL ); assert( pFixedBuffer != NULL ); assert( pFixedBuffer->pBuffer != NULL ); - /* Packet Id should be non zero for QoS1 and QoS2. */ + /* Packet Id should be non zero for Qos 1 and Qos 2. */ assert( ( pPublishInfo->qos == MQTTQoS0 ) || ( packetIdentifier != 0U ) ); - /* Duplicate flag should be set only for Qos1 or Qos2. */ + /* Duplicate flag should be set only for Qos 1 or Qos 2. */ assert( ( pPublishInfo->dup != true ) || ( pPublishInfo->qos != MQTTQoS0 ) ); /* Get the start address of the buffer. */ @@ -557,7 +690,7 @@ static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo, ( serializePayload == true ) ) { LogDebug( ( "Copying PUBLISH payload of length =%lu to buffer", - pPublishInfo->payloadLength ) ); + ( unsigned long ) pPublishInfo->payloadLength ) ); /* Typecast const void * typed payload buffer to const uint8_t *. * This is to use same type buffers in memcpy. */ @@ -678,7 +811,7 @@ static MQTTStatus_t checkPublishRemainingLength( size_t remainingLength, if( remainingLength < qos0Minimum ) { LogDebug( ( "QoS 0 PUBLISH cannot have a remaining length less than %lu.", - qos0Minimum ) ); + ( unsigned long ) qos0Minimum ) ); status = MQTTBadResponse; } @@ -691,7 +824,7 @@ static MQTTStatus_t checkPublishRemainingLength( size_t remainingLength, if( remainingLength < ( qos0Minimum + 2U ) ) { LogDebug( ( "QoS 1 or 2 PUBLISH cannot have a remaining length less than %lu.", - qos0Minimum + 2U ) ); + ( unsigned long ) ( qos0Minimum + 2U ) ) ); status = MQTTBadResponse; } @@ -843,8 +976,7 @@ static MQTTStatus_t deserializeConnack( const MQTTPacketInfo_t * pConnack, /* In MQTT 3.1.1, only values 0 through 5 are valid CONNACK response codes. */ if( pRemainingData[ 1 ] > 5U ) { - LogError( ( "CONNACK response %hhu is not valid.", - pRemainingData[ 1 ] ) ); + LogError( ( "CONNACK response %u is invalid.", pRemainingData[ 1 ] ) ); status = MQTTBadResponse; } @@ -907,7 +1039,7 @@ static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * { LogError( ( "Subscription packet length of %lu exceeds" "the MQTT 3.1.1 maximum packet length of %lu.", - packetSize, + ( unsigned long ) packetSize, MQTT_MAX_REMAINING_LENGTH ) ); status = MQTTBadParameter; } @@ -925,8 +1057,8 @@ static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t * } LogDebug( ( "Subscription packet remaining length=%lu and packet size=%lu.", - *pRemainingLength, - *pPacketSize ) ); + ( unsigned long ) *pRemainingLength, + ( unsigned long ) *pPacketSize ) ); return status; } @@ -955,13 +1087,13 @@ static MQTTStatus_t readSubackStatus( size_t statusCount, case 0x01: case 0x02: - LogDebug( ( "Topic filter %lu accepted, max QoS %hhu.", + LogDebug( ( "Topic filter %lu accepted, max QoS %u.", ( unsigned long ) i, subscriptionStatus ) ); break; case 0x80: - LogDebug( ( "Topic filter %lu refused.", ( unsigned long ) i ) ); + LogWarn( ( "Topic filter %lu refused.", ( unsigned long ) i ) ); /* Application should remove subscription from the list */ status = MQTTServerRefused; @@ -969,7 +1101,7 @@ static MQTTStatus_t readSubackStatus( size_t statusCount, break; default: - LogDebug( ( "Bad SUBSCRIBE status %hhu.", subscriptionStatus ) ); + LogDebug( ( "Bad SUBSCRIBE status %u.", subscriptionStatus ) ); status = MQTTBadResponse; @@ -1031,20 +1163,15 @@ static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo const MQTTFixedBuffer_t * pFixedBuffer ) { MQTTStatus_t status = MQTTSuccess; - - /* The serialized packet size = First byte - * + length of encoded size of remaining length - * + remaining length. */ - size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) - + remainingLength; + size_t packetSize = 0; /* Validate all the parameters. */ if( ( pFixedBuffer == NULL ) || ( pSubscriptionList == NULL ) ) { LogError( ( "Argument cannot be NULL: pFixedBuffer=%p, " "pSubscriptionList=%p.", - pFixedBuffer, - pSubscriptionList ) ); + ( void * ) pFixedBuffer, + ( void * ) pSubscriptionList ) ); status = MQTTBadParameter; } /* A buffer must be configured for serialization. */ @@ -1063,17 +1190,22 @@ static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo LogError( ( "Packet Id for subscription packet is 0." ) ); status = MQTTBadParameter; } - else if( packetSize > pFixedBuffer->size ) - { - LogError( ( "Buffer size of %lu is not sufficient to hold " - "serialized packet of size of %lu.", - pFixedBuffer->size, - packetSize ) ); - status = MQTTNoMemory; - } else { - /* Empty else MISRA 15.7 */ + /* The serialized packet size = First byte + * + length of encoded size of remaining length + * + remaining length. */ + packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength; + + if( packetSize > pFixedBuffer->size ) + { + LogError( ( "Buffer size of %lu is not sufficient to hold " + "serialized packet of size of %lu.", + ( unsigned long ) pFixedBuffer->size, + ( unsigned long ) packetSize ) ); + status = MQTTNoMemory; + } } return status; @@ -1164,7 +1296,7 @@ static MQTTStatus_t deserializePublish( const MQTTPacketInfo_t * pIncomingPacket pPublishInfo->pPayload = pPacketIdentifierHigh + sizeof( uint16_t ); } - LogDebug( ( "Payload length %hu.", pPublishInfo->payloadLength ) ); + LogDebug( ( "Payload length %lu.", ( unsigned long ) pPublishInfo->payloadLength ) ); } return status; @@ -1337,7 +1469,7 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo, } LogDebug( ( "Length of serialized CONNECT packet is %lu.", - ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); + ( ( unsigned long ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); /* Ensure that the difference between the end and beginning of the buffer * is less than the buffer size. */ @@ -1363,9 +1495,9 @@ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, { LogError( ( "Argument cannot be NULL: pConnectInfo=%p, " "pRemainingLength=%p, pPacketSize=%p.", - pConnectInfo, - pRemainingLength, - pPacketSize ) ); + ( void * ) pConnectInfo, + ( void * ) pRemainingLength, + ( void * ) pPacketSize ) ); status = MQTTBadParameter; } else if( ( pConnectInfo->clientIdentifierLength == 0U ) || ( pConnectInfo->pClientIdentifier == NULL ) ) @@ -1383,7 +1515,7 @@ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, LogError( ( "The Will Message length must not exceed %d. " "pWillInfo->payloadLength=%lu.", UINT16_MAX, - pWillInfo->payloadLength ) ); + ( unsigned long ) pWillInfo->payloadLength ) ); status = MQTTBadParameter; } else @@ -1437,8 +1569,8 @@ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo, *pPacketSize = connectPacketSize; LogDebug( ( "CONNECT packet remaining length=%lu and packet size=%lu.", - *pRemainingLength, - *pPacketSize ) ); + ( unsigned long ) *pRemainingLength, + ( unsigned long ) *pPacketSize ) ); } return status; @@ -1459,8 +1591,8 @@ MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, { LogError( ( "Argument cannot be NULL: pConnectInfo=%p, " "pFixedBuffer=%p.", - pConnectInfo, - pFixedBuffer ) ); + ( void * ) pConnectInfo, + ( void * ) pFixedBuffer ) ); status = MQTTBadParameter; } /* A buffer must be configured for serialization. */ @@ -1486,8 +1618,8 @@ MQTTStatus_t MQTT_SerializeConnect( const MQTTConnectInfo_t * pConnectInfo, { LogError( ( "Buffer size of %lu is not sufficient to hold " "serialized CONNECT packet of size of %lu.", - pFixedBuffer->size, - connectPacketSize ) ); + ( unsigned long ) pFixedBuffer->size, + ( unsigned long ) connectPacketSize ) ); status = MQTTNoMemory; } else @@ -1517,9 +1649,9 @@ MQTTStatus_t MQTT_GetSubscribePacketSize( const MQTTSubscribeInfo_t * pSubscript { LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, " "pRemainingLength=%p, pPacketSize=%p.", - pSubscriptionList, - pRemainingLength, - pPacketSize ) ); + ( void * ) pSubscriptionList, + ( void * ) pRemainingLength, + ( void * ) pPacketSize ) ); status = MQTTBadParameter; } else if( subscriptionCount == 0U ) @@ -1595,7 +1727,7 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL } LogDebug( ( "Length of serialized SUBSCRIBE packet is %lu.", - ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); + ( ( unsigned long ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); } return status; @@ -1616,9 +1748,9 @@ MQTTStatus_t MQTT_GetUnsubscribePacketSize( const MQTTSubscribeInfo_t * pSubscri { LogError( ( "Argument cannot be NULL: pSubscriptionList=%p, " "pRemainingLength=%p, pPacketSize=%p.", - pSubscriptionList, - pRemainingLength, - pPacketSize ) ); + ( void * ) pSubscriptionList, + ( void * ) pRemainingLength, + ( void * ) pPacketSize ) ); status = MQTTBadParameter; } else if( subscriptionCount == 0U ) @@ -1691,7 +1823,7 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio } LogDebug( ( "Length of serialized UNSUBSCRIBE packet is %lu.", - ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); + ( ( unsigned long ) ( pIndex - pFixedBuffer->pBuffer ) ) ) ); } return status; @@ -1709,9 +1841,9 @@ MQTTStatus_t MQTT_GetPublishPacketSize( const MQTTPublishInfo_t * pPublishInfo, { LogError( ( "Argument cannot be NULL: pPublishInfo=%p, " "pRemainingLength=%p, pPacketSize=%p.", - pPublishInfo, - pRemainingLength, - pPacketSize ) ); + ( void * ) pPublishInfo, + ( void * ) pRemainingLength, + ( void * ) pPacketSize ) ); status = MQTTBadParameter; } else if( ( pPublishInfo->pTopicName == NULL ) || ( pPublishInfo->topicNameLength == 0U ) ) @@ -1746,19 +1878,14 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, const MQTTFixedBuffer_t * pFixedBuffer ) { MQTTStatus_t status = MQTTSuccess; - - /* Length of serialized packet = First byte - * + Length of encoded remaining length - * + Remaining length. */ - size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) - + remainingLength; + size_t packetSize = 0; if( ( pFixedBuffer == NULL ) || ( pPublishInfo == NULL ) ) { LogError( ( "Argument cannot be NULL: pFixedBuffer=%p, " "pPublishInfo=%p.", - pFixedBuffer, - pPublishInfo ) ); + ( void * ) pFixedBuffer, + ( void * ) pPublishInfo ) ); status = MQTTBadParameter; } /* A buffer must be configured for serialization. */ @@ -1797,15 +1924,25 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo, LogError( ( "Duplicate flag is set for PUBLISH with Qos 0," ) ); status = MQTTBadParameter; } - else if( packetSize > pFixedBuffer->size ) + else + { + /* Length of serialized packet = First byte + * + Length of encoded remaining length + * + Remaining length. */ + packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength; + } + + if( ( status == MQTTSuccess ) && ( packetSize > pFixedBuffer->size ) ) { LogError( ( "Buffer size of %lu is not sufficient to hold " "serialized PUBLISH packet of size of %lu.", - pFixedBuffer->size, - packetSize ) ); + ( unsigned long ) pFixedBuffer->size, + ( unsigned long ) packetSize ) ); status = MQTTNoMemory; } - else + + if( status == MQTTSuccess ) { /* Serialize publish with header and payload. */ serializePublishCommon( pPublishInfo, @@ -1827,24 +1964,16 @@ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo size_t * pHeaderSize ) { MQTTStatus_t status = MQTTSuccess; - - /* Length of serialized packet = First byte - * + Length of encoded remaining length - * + Remaining length - * - Payload Length. - * Payload length will be subtracted after verifying pPublishInfo parameter. - */ - size_t packetSize = 1U + remainingLengthEncodedSize( remainingLength ) - + remainingLength; + size_t packetSize = 0; if( ( pFixedBuffer == NULL ) || ( pPublishInfo == NULL ) || ( pHeaderSize == NULL ) ) { LogError( ( "Argument cannot be NULL: pFixedBuffer=%p, " "pPublishInfo=%p, pHeaderSize=%p.", - pFixedBuffer, - pPublishInfo, - pHeaderSize ) ); + ( void * ) pFixedBuffer, + ( void * ) pPublishInfo, + ( void * ) pHeaderSize ) ); status = MQTTBadParameter; } /* A buffer must be configured for serialization. */ @@ -1872,15 +2001,28 @@ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo LogError( ( "Duplicate flag is set for PUBLISH with Qos 0," ) ); status = MQTTBadParameter; } - else if( ( packetSize - pPublishInfo->payloadLength ) > pFixedBuffer->size ) + else + { + /* Length of serialized packet = First byte + * + Length of encoded remaining length + * + Remaining length + * - Payload Length. + */ + packetSize = 1U + remainingLengthEncodedSize( remainingLength ) + + remainingLength + - pPublishInfo->payloadLength; + } + + if( ( status == MQTTSuccess ) && ( packetSize > pFixedBuffer->size ) ) { LogError( ( "Buffer size of %lu is not sufficient to hold " "serialized PUBLISH header packet of size of %lu.", - pFixedBuffer->size, - ( packetSize - pPublishInfo->payloadLength ) ) ); + ( unsigned long ) pFixedBuffer->size, + ( unsigned long ) ( packetSize - pPublishInfo->payloadLength ) ) ); status = MQTTNoMemory; } - else + + if( status == MQTTSuccess ) { /* Serialize publish without copying the payload. */ serializePublishCommon( pPublishInfo, @@ -1890,7 +2032,7 @@ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo false ); /* Header size is the same as calculated packet size. */ - *pHeaderSize = ( packetSize - pPublishInfo->payloadLength ); + *pHeaderSize = packetSize; } return status; @@ -1999,7 +2141,7 @@ MQTTStatus_t MQTT_SerializeDisconnect( const MQTTFixedBuffer_t * pFixedBuffer ) { LogError( ( "Buffer size of %lu is not sufficient to hold " "serialized DISCONNECT packet of size of %lu.", - pFixedBuffer->size, + ( unsigned long ) pFixedBuffer->size, MQTT_DISCONNECT_PACKET_SIZE ) ); status = MQTTNoMemory; } @@ -2061,7 +2203,7 @@ MQTTStatus_t MQTT_SerializePingreq( const MQTTFixedBuffer_t * pFixedBuffer ) { LogError( ( "Buffer size of %lu is not sufficient to hold " "serialized PINGREQ packet of size of %u.", - pFixedBuffer->size, + ( unsigned long ) pFixedBuffer->size, MQTT_PACKET_PINGREQ_SIZE ) ); status = MQTTNoMemory; } @@ -2089,9 +2231,9 @@ MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket, { LogError( ( "Argument cannot be NULL: pIncomingPacket=%p, " "pPacketId=%p, pPublishInfo=%p", - pIncomingPacket, - pPacketId, - pPublishInfo ) ); + ( void * ) pIncomingPacket, + ( void * ) pPacketId, + ( void * ) pPublishInfo ) ); status = MQTTBadParameter; } else if( ( pIncomingPacket->type & 0xF0U ) != MQTT_PACKET_TYPE_PUBLISH ) diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c index 37b23e094..fad14e3b5 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c @@ -19,6 +19,10 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/** + * @file mqtt_state.c + * @brief Implements the functions in mqtt_state.h. + */ #include <assert.h> #include <string.h> #include "mqtt_state.h" @@ -127,11 +131,11 @@ static void compactRecords( MQTTPubAckInfo_t * records, * * @param[in] records State record array. * @param[in] recordCount Length of record array. - * @param[in] packetId, packet ID of new entry. + * @param[in] packetId Packet ID of new entry. * @param[in] qos QoS of new entry. - * @param[in] publishState state of new entry. + * @param[in] publishState State of new entry. * - * @return MQTTSuccess, MQTTNoMemory, MQTTStateCollision. + * @return #MQTTSuccess, #MQTTNoMemory, or #MQTTStateCollision. */ static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records, size_t recordCount, @@ -159,6 +163,8 @@ static void updateRecord( MQTTPubAckInfo_t * records, * @param[in] pMqttContext Initialized MQTT context. * @param[in] searchStates The states to search for in 2-byte bit map. * @param[in,out] pCursor Index at which to start searching. + * + * @return Packet ID of the outgoing publish. */ static uint16_t stateSelect( const MQTTContext_t * pMqttContext, uint16_t searchStates, @@ -615,6 +621,15 @@ static uint16_t stateSelect( const MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief Calculate the state from a PUBACK, PUBREC, PUBREL, or PUBCOMP. + * + * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. + * @param[in] opType Send or Receive. + * @param[in] qos 1 or 2. + * + * @return The calculated state. + */ MQTTPublishState_t MQTT_CalculateStateAck( MQTTPubAckType_t packetType, MQTTStateOperation_t opType, MQTTQoS_t qos ) @@ -779,6 +794,15 @@ static MQTTStatus_t updateStatePublish( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief Reserve an entry for an outgoing QoS 1 or Qos 2 publish. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId The ID of the publish packet. + * @param[in] qos 1 or 2. + * + * @return MQTTSuccess, MQTTNoMemory, or MQTTStateCollision. + */ MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, uint16_t packetId, MQTTQoS_t qos ) @@ -808,6 +832,14 @@ MQTTStatus_t MQTT_ReserveState( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief Calculate the new state for a publish from its qos and operation type. + * + * @param[in] opType Send or Receive. + * @param[in] qos 0, 1, or 2. + * + * @return The calculated state. + */ MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, MQTTQoS_t qos ) { @@ -837,6 +869,18 @@ MQTTPublishState_t MQTT_CalculateStatePublish( MQTTStateOperation_t opType, /*-----------------------------------------------------------*/ +/** + * @brief Update the state record for a PUBLISH packet. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId ID of the PUBLISH packet. + * @param[in] opType Send or Receive. + * @param[in] qos 0, 1, or 2. + * @param[out] pNewState Updated state of the publish. + * + * @return #MQTTBadParameter, #MQTTIllegalState, #MQTTStateCollision or + * #MQTTSuccess. + */ MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, uint16_t packetId, MQTTStateOperation_t opType, @@ -852,8 +896,8 @@ MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) { LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p", - pMqttContext, - pNewState ) ); + ( void * ) pMqttContext, + ( void * ) pNewState ) ); mqttStatus = MQTTBadParameter; } @@ -911,6 +955,17 @@ MQTTStatus_t MQTT_UpdateStatePublish( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief Update the state record for an ACKed publish. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in] packetId ID of the ack packet. + * @param[in] packetType PUBACK, PUBREC, PUBREL, or PUBCOMP. + * @param[in] opType Send or Receive. + * @param[out] pNewState Updated state of the publish. + * + * @return #MQTTBadParameter, #MQTTIllegalState, or #MQTTSuccess. + */ MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, uint16_t packetId, MQTTPubAckType_t packetType, @@ -928,8 +983,8 @@ MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, if( ( pMqttContext == NULL ) || ( pNewState == NULL ) ) { LogError( ( "Argument cannot be NULL: pMqttContext=%p, pNewState=%p.", - pMqttContext, - pNewState ) ); + ( void * ) pMqttContext, + ( void * ) pNewState ) ); } else { @@ -972,6 +1027,18 @@ MQTTStatus_t MQTT_UpdateStateAck( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief Get the packet ID of next pending PUBREL ack to be resent. + * + * This function will need to be called to get the packet for which a PUBREL + * need to be sent when a session is reestablished. Calling this function + * repeatedly until packet id is 0 will give all the packets for which + * a PUBREL need to be resent in the correct order. + * + * @param[in] pMqttContext Initialized MQTT context. + * @param[in,out] pCursor Index at which to start searching. + * @param[out] pState State indicating that PUBREL packet need to be sent. + */ uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, MQTTStateCursor_t * pCursor, MQTTPublishState_t * pState ) @@ -982,11 +1049,11 @@ uint16_t MQTT_PubrelToResend( const MQTTContext_t * pMqttContext, /* Validate arguments. */ if( ( pMqttContext == NULL ) || ( pCursor == NULL ) || ( pState == NULL ) ) { - LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p" + LogError( ( "Arguments cannot be NULL pMqttContext=%p, pCursor=%p" " pState=%p.", - pMqttContext, - pCursor, - pState ) ); + ( void * ) pMqttContext, + ( void * ) pCursor, + ( void * ) pState ) ); } else { @@ -1018,8 +1085,8 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, if( ( pMqttContext == NULL ) || ( pCursor == NULL ) ) { LogError( ( "Arguments cannot be NULL pMqttContext =%p, pCursor=%p", - pMqttContext, - pCursor ) ); + ( void * ) pMqttContext, + ( void * ) pCursor ) ); } else { @@ -1038,6 +1105,13 @@ uint16_t MQTT_PublishToResend( const MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ +/** + * @brief State to string conversion for state engine. + * + * @param[in] state The state to convert to a string. + * + * @return The string representation of the state. + */ const char * MQTT_State_strerror( MQTTPublishState_t state ) { const char * str = NULL; diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h index c62489573..cc7a6f61b 100644 --- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h +++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/private/mqtt_internal.h @@ -1,9 +1,41 @@ +/* + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +/** + * @file mqtt_internal.h + * @brief Internal header of the MQTT library. This header should not be + * included in typical application code. + */ #ifndef MQTT_INTERNAL_H_ #define MQTT_INTERNAL_H_ /* Include config file before other headers. */ #include "mqtt_config.h" +/** + * @cond DOXYGEN_IGNORE + * Doxygen should ignore this section. + * + * Configure logs for MQTT functions. + */ #ifndef LogError #define LogError( message ) #endif @@ -19,5 +51,6 @@ #ifndef LogDebug #define LogDebug( message ) #endif +/** @endcond */ #endif /* ifndef MQTT_INTERNAL_H_ */ |