summaryrefslogtreecommitdiff
path: root/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c')
-rw-r--r--FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c645
1 files changed, 645 insertions, 0 deletions
diff --git a/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c b/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
new file mode 100644
index 000000000..00b241a7b
--- /dev/null
+++ b/FreeRTOS-Labs/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
@@ -0,0 +1,645 @@
+/*
+ * IoT MQTT V2.1.0
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+/**
+ * @file iot_mqtt_subscription.c
+ * @brief Implements functions that manage subscriptions for an MQTT connection.
+ */
+
+/* The config header is always included first. */
+#include "iot_config.h"
+
+/* Standard includes. */
+#include <stdbool.h>
+#include <string.h>
+
+/* Error handling include. */
+#include "iot_error.h"
+
+/* MQTT internal include. */
+#include "private/iot_mqtt_internal.h"
+
+/* Platform layer includes. */
+#include "platform/iot_threads.h"
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief First parameter to #_topicMatch.
+ */
+typedef struct _topicMatchParams
+{
+ const char * pTopicName; /**< @brief The topic name to parse. */
+ uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
+ bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
+} _topicMatchParams_t;
+
+/**
+ * @brief First parameter to #_packetMatch.
+ */
+typedef struct _packetMatchParams
+{
+ uint16_t packetIdentifier; /**< Packet identifier to match. */
+ int32_t order; /**< Order to match. Set to #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore. */
+} _packetMatchParams_t;
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Matches a topic name (from a publish) with a topic filter (from a
+ * subscription).
+ *
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
+ * @param[in] pMatch Pointer to a #_topicMatchParams_t.
+ *
+ * @return `true` if the arguments match the subscription topic filter; `false`
+ * otherwise.
+ */
+static bool _topicMatch( const IotLink_t * pSubscriptionLink,
+ void * pMatch );
+
+/**
+ * @brief Matches a packet identifier and order.
+ *
+ * @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
+ * @param[in] pMatch Pointer to a #_packetMatchParams_t.
+ *
+ * @return `true` if the arguments match the subscription's packet info; `false`
+ * otherwise.
+ */
+static bool _packetMatch( const IotLink_t * pSubscriptionLink,
+ void * pMatch );
+
+/*-----------------------------------------------------------*/
+
+static bool _topicMatch( const IotLink_t * pSubscriptionLink,
+ void * pMatch )
+{
+ IOT_FUNCTION_ENTRY( bool, false );
+ uint16_t nameIndex = 0, filterIndex = 0;
+
+ /* Because this function is called from a container function, the given link
+ * must never be NULL. */
+ IotMqtt_Assert( pSubscriptionLink != NULL );
+
+ _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
+ pSubscriptionLink,
+ link );
+ _topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
+
+ /* Extract the relevant strings and lengths from parameters. */
+ const char * pTopicName = pParam->pTopicName;
+ const char * pTopicFilter = pSubscription->pTopicFilter;
+ const uint16_t topicNameLength = pParam->topicNameLength;
+ const uint16_t topicFilterLength = pSubscription->topicFilterLength;
+
+ /* Check for an exact match. */
+ if( topicNameLength == topicFilterLength )
+ {
+ status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
+
+ IOT_GOTO_CLEANUP();
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* If the topic lengths are different but an exact match is required, return
+ * false. */
+ if( pParam->exactMatchOnly == true )
+ {
+ IOT_SET_AND_GOTO_CLEANUP( false );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
+ {
+ /* Check if the character in the topic name matches the corresponding
+ * character in the topic filter string. */
+ if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
+ {
+ /* Handle special corner cases as documented by the MQTT protocol spec. */
+
+ /* Filter "sport/#" also matches "sport" since # includes the parent level. */
+ if( nameIndex == topicNameLength - 1 )
+ {
+ if( filterIndex == topicFilterLength - 3 )
+ {
+ if( pTopicFilter[ filterIndex + 1 ] == '/' )
+ {
+ if( pTopicFilter[ filterIndex + 2 ] == '#' )
+ {
+ IOT_SET_AND_GOTO_CLEANUP( true );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Filter "sport/+" also matches the "sport/" but not "sport". */
+ if( nameIndex == topicNameLength - 1 )
+ {
+ if( filterIndex == topicFilterLength - 2 )
+ {
+ if( pTopicFilter[ filterIndex + 1 ] == '+' )
+ {
+ IOT_SET_AND_GOTO_CLEANUP( true );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ /* Check for wildcards. */
+ if( pTopicFilter[ filterIndex ] == '+' )
+ {
+ /* Move topic name index to the end of the current level.
+ * This is identified by '/'. */
+ while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
+ {
+ nameIndex++;
+ }
+
+ /* Increment filter index to skip '/'. */
+ filterIndex++;
+ continue;
+ }
+ else if( pTopicFilter[ filterIndex ] == '#' )
+ {
+ /* Subsequent characters don't need to be checked if the for the
+ * multi-level wildcard. */
+ IOT_SET_AND_GOTO_CLEANUP( true );
+ }
+ else
+ {
+ /* Any character mismatch other than '+' or '#' means the topic
+ * name does not match the topic filter. */
+ IOT_SET_AND_GOTO_CLEANUP( false );
+ }
+ }
+
+ /* Increment indexes. */
+ nameIndex++;
+ filterIndex++;
+ }
+
+ /* If the end of both strings has been reached, they match. */
+ if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
+ {
+ IOT_SET_AND_GOTO_CLEANUP( true );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ IOT_FUNCTION_EXIT_NO_CLEANUP();
+}
+
+/*-----------------------------------------------------------*/
+
+static bool _packetMatch( const IotLink_t * pSubscriptionLink,
+ void * pMatch )
+{
+ bool match = false;
+
+ /* Because this function is called from a container function, the given link
+ * must never be NULL. */
+ IotMqtt_Assert( pSubscriptionLink != NULL );
+
+ _mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
+ pSubscriptionLink,
+ link );
+ _packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
+
+ /* Compare packet identifiers. */
+ if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
+ {
+ /* Compare orders if order is not MQTT_REMOVE_ALL_SUBSCRIPTIONS. */
+ if( pParam->order == MQTT_REMOVE_ALL_SUBSCRIPTIONS )
+ {
+ match = true;
+ }
+ else
+ {
+ match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
+ }
+ }
+
+ /* If this subscription should be removed, check the reference count. */
+ if( match == true )
+ {
+ /* Reference count must not be negative. */
+ IotMqtt_Assert( pSubscription->references >= 0 );
+
+ /* If the reference count is positive, this subscription cannot be
+ * removed yet because there are subscription callbacks using it. */
+ if( pSubscription->references > 0 )
+ {
+ match = false;
+
+ /* Set the unsubscribed flag. The last active subscription callback
+ * will remove and clean up this subscription. */
+ pSubscription->unsubscribed = true;
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ return match;
+}
+
+/*-----------------------------------------------------------*/
+
+IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
+ uint16_t subscribePacketIdentifier,
+ const IotMqttSubscription_t * pSubscriptionList,
+ size_t subscriptionCount )
+{
+ IotMqttError_t status = IOT_MQTT_SUCCESS;
+ size_t i = 0;
+ _mqttSubscription_t * pNewSubscription = NULL;
+ IotLink_t * pSubscriptionLink = NULL;
+ _topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
+
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
+
+ for( i = 0; i < subscriptionCount; i++ )
+ {
+ /* Check if this topic filter is already registered. */
+ topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
+ topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
+ pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
+ NULL,
+ _topicMatch,
+ &topicMatchParams );
+
+ if( pSubscriptionLink != NULL )
+ {
+ pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
+
+ /* The lengths of exactly matching topic filters must match. */
+ IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
+
+ /* Replace the callback and packet info with the new parameters. */
+ pNewSubscription->callback = pSubscriptionList[ i ].callback;
+ pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
+ pNewSubscription->packetInfo.order = i;
+ }
+ else
+ {
+ /* Allocate memory for a new subscription. */
+ pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
+ pSubscriptionList[ i ].topicFilterLength );
+
+ if( pNewSubscription == NULL )
+ {
+ status = IOT_MQTT_NO_MEMORY;
+ break;
+ }
+ else
+ {
+ /* Clear the new subscription. */
+ ( void ) memset( pNewSubscription,
+ 0x00,
+ sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
+
+ /* Set the members of the new subscription and add it to the list. */
+ pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
+ pNewSubscription->packetInfo.order = i;
+ pNewSubscription->callback = pSubscriptionList[ i ].callback;
+ pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
+ ( void ) memcpy( pNewSubscription->pTopicFilter,
+ pSubscriptionList[ i ].pTopicFilter,
+ ( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
+
+ IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
+ &( pNewSubscription->link ) );
+ }
+ }
+ }
+
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
+
+ /* If memory allocation failed, remove all previously added subscriptions. */
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ _IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
+ pSubscriptionList,
+ i );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
+ IotMqttCallbackParam_t * pCallbackParam )
+{
+ _mqttSubscription_t * pSubscription = NULL;
+ IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
+ void * pCallbackContext = NULL;
+
+ void ( * callbackFunction )( void *,
+ IotMqttCallbackParam_t * ) = NULL;
+ _topicMatchParams_t topicMatchParams = { 0 };
+
+ /* Set the members of the search parameter. */
+ topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;
+ topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;
+ topicMatchParams.exactMatchOnly = false;
+
+ /* Prevent any other thread from modifying the subscription list while this
+ * function is searching. */
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
+
+ /* Search the subscription list for all matching subscriptions starting at
+ * the list head. */
+ while( true )
+ {
+ pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
+ pCurrentLink,
+ _topicMatch,
+ &topicMatchParams );
+
+ /* No subscription found. Exit loop. */
+ if( pCurrentLink == NULL )
+ {
+ break;
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Subscription found. Calculate pointer to subscription object. */
+ pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
+
+ /* Subscription validation should not have allowed a NULL callback function. */
+ IotMqtt_Assert( pSubscription->callback.function != NULL );
+
+ /* Increment the subscription's reference count. */
+ ( pSubscription->references )++;
+
+ /* Copy the necessary members of the subscription before releasing the
+ * subscription list mutex. */
+ pCallbackContext = pSubscription->callback.pCallbackContext;
+ callbackFunction = pSubscription->callback.function;
+
+ /* Unlock the subscription list mutex. */
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
+
+ /* Set the members of the callback parameter. */
+ pCallbackParam->mqttConnection = pMqttConnection;
+ pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
+ pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
+
+ /* Invoke the subscription callback. */
+ callbackFunction( pCallbackContext, pCallbackParam );
+
+ /* Lock the subscription list mutex to decrement the reference count. */
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
+
+ /* Decrement the reference count. It must still be positive. */
+ ( pSubscription->references )--;
+ IotMqtt_Assert( pSubscription->references >= 0 );
+
+ /* Save the pointer to the next link in case this subscription is freed. */
+ pNextLink = pCurrentLink->pNext;
+
+ /* Remove this subscription if it has no references and the unsubscribed
+ * flag is set. */
+ if( pSubscription->unsubscribed == true )
+ {
+ /* An unsubscribed subscription should have been removed from the list. */
+ IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
+
+ /* Free subscriptions with no references. */
+ if( pSubscription->references == 0 )
+ {
+ IotMqtt_FreeSubscription( pSubscription );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Move current link pointer. */
+ pCurrentLink = pNextLink;
+ }
+
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
+
+ _IotMqtt_DecrementConnectionReferences( pMqttConnection );
+}
+
+/*-----------------------------------------------------------*/
+
+void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
+ uint16_t packetIdentifier,
+ int32_t order )
+{
+ _packetMatchParams_t packetMatchParams = { 0 };
+
+ /* Set the members of the search parameter. */
+ packetMatchParams.packetIdentifier = packetIdentifier;
+ packetMatchParams.order = order;
+
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
+ IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
+ _packetMatch,
+ ( void * ) ( &packetMatchParams ),
+ IotMqtt_FreeSubscription,
+ offsetof( _mqttSubscription_t, link ) );
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
+}
+
+/*-----------------------------------------------------------*/
+
+void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
+ const IotMqttSubscription_t * pSubscriptionList,
+ size_t subscriptionCount )
+{
+ size_t i = 0;
+ _mqttSubscription_t * pSubscription = NULL;
+ IotLink_t * pSubscriptionLink = NULL;
+ _topicMatchParams_t topicMatchParams = { 0 };
+
+ /* Prevent any other thread from modifying the subscription list while this
+ * function is running. */
+ IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
+
+ /* Find and remove each topic filter from the list. */
+ for( i = 0; i < subscriptionCount; i++ )
+ {
+ topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
+ topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
+ topicMatchParams.exactMatchOnly = true;
+
+ pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
+ NULL,
+ _topicMatch,
+ &topicMatchParams );
+
+ if( pSubscriptionLink != NULL )
+ {
+ pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
+
+ /* Reference count must not be negative. */
+ IotMqtt_Assert( pSubscription->references >= 0 );
+
+ /* Remove subscription from list. */
+ IotListDouble_Remove( pSubscriptionLink );
+
+ /* Check the reference count. This subscription cannot be removed if
+ * there are subscription callbacks using it. */
+ if( pSubscription->references > 0 )
+ {
+ /* Set the unsubscribed flag. The last active subscription callback
+ * will remove and clean up this subscription. */
+ pSubscription->unsubscribed = true;
+ }
+ else
+ {
+ /* Free a subscription with no references. */
+ IotMqtt_FreeSubscription( pSubscription );
+ }
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+ }
+
+ IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
+}
+
+/*-----------------------------------------------------------*/
+
+bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
+ const char * pTopicFilter,
+ uint16_t topicFilterLength,
+ IotMqttSubscription_t * const pCurrentSubscription )
+{
+ bool status = false;
+ _mqttSubscription_t * pSubscription = NULL;
+ IotLink_t * pSubscriptionLink = NULL;
+ _topicMatchParams_t topicMatchParams = { 0 };
+
+ /* Set the members of the search parameter. */
+ topicMatchParams.pTopicName = pTopicFilter;
+ topicMatchParams.topicNameLength = topicFilterLength;
+ topicMatchParams.exactMatchOnly = true;
+
+ /* Prevent any other thread from modifying the subscription list while this
+ * function is running. */
+ IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
+
+ /* Search for a matching subscription. */
+ pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
+ NULL,
+ _topicMatch,
+ &topicMatchParams );
+
+ /* Check if a matching subscription was found. */
+ if( pSubscriptionLink != NULL )
+ {
+ pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
+
+ /* Copy the matching subscription to the output parameter. */
+ if( pCurrentSubscription != NULL )
+ {
+ pCurrentSubscription->pTopicFilter = pTopicFilter;
+ pCurrentSubscription->topicFilterLength = topicFilterLength;
+ pCurrentSubscription->qos = IOT_MQTT_QOS_0;
+ pCurrentSubscription->callback = pSubscription->callback;
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ status = true;
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
+
+ return status;
+}
+
+/*-----------------------------------------------------------*/
+
+/* Provide access to internal functions and variables if testing. */
+#if IOT_BUILD_TESTS == 1
+ #include "iot_test_access_mqtt_subscription.c"
+#endif