summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c')
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c1373
1 files changed, 0 insertions, 1373 deletions
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
deleted file mode 100644
index af343f637..000000000
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
+++ /dev/null
@@ -1,1373 +0,0 @@
-/*
- * Amazon FreeRTOS MQTT V2.0.0
- * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- *
- * http://aws.amazon.com/freertos
- * http://www.FreeRTOS.org
- */
-
-/**
- * @file iot_mqtt_operation.c
- * @brief Implements functions that process MQTT operations.
- */
-
-/* The config header is always included first. */
-#include "iot_config.h"
-
-/* Standard includes. */
-#include <string.h>
-
-/* Error handling include. */
-#include "private/iot_error.h"
-
-/* MQTT internal include. */
-#include "private/iot_mqtt_internal.h"
-
-/* Platform layer includes. */
-#include "platform/iot_clock.h"
-#include "platform/iot_threads.h"
-
-/* Atomics include. */
-#include "iot_atomic.h"
-
-/*-----------------------------------------------------------*/
-
-/**
- * @brief First parameter to #_mqttOperation_match.
- */
-typedef struct _operationMatchParam
-{
- IotMqttOperationType_t type; /**< @brief The type of operation to look for. */
- const uint16_t * pPacketIdentifier; /**< @brief The packet identifier associated with the operation.
- * Set to `NULL` to ignore packet identifier. */
-} _operationMatchParam_t;
-
-/*-----------------------------------------------------------*/
-
-/**
- * @brief Match an MQTT operation by type and packet identifier.
- *
- * @param[in] pOperationLink Pointer to the link member of an #_mqttOperation_t.
- * @param[in] pMatch Pointer to an #_operationMatchParam_t.
- *
- * @return `true` if the operation matches the parameters in `pArgument`; `false`
- * otherwise.
- */
-static bool _mqttOperation_match( const IotLink_t * pOperationLink,
- void * pMatch );
-
-/**
- * @brief Check if an operation with retry has exceeded its retry limit.
- *
- * If a PUBLISH operation is available for retry, this function also sets any
- * necessary DUP flags.
- *
- * @param[in] pOperation The operation to check.
- *
- * @return `true` if the operation may be retried; `false` otherwise.
- */
-static bool _checkRetryLimit( _mqttOperation_t * pOperation );
-
-/**
- * @brief Schedule the next send of an operation with retry.
- *
- * @param[in] pOperation The operation to schedule.
- *
- * @return `true` if the reschedule succeeded; `false` otherwise.
- */
-static bool _scheduleNextRetry( _mqttOperation_t * pOperation );
-
-/*-----------------------------------------------------------*/
-
-static bool _mqttOperation_match( const IotLink_t * pOperationLink,
- void * pMatch )
-{
- bool match = false;
-
- /* Because this function is called from a container function, the given link
- * must never be NULL. */
- IotMqtt_Assert( pOperationLink != NULL );
-
- _mqttOperation_t * pOperation = IotLink_Container( _mqttOperation_t,
- pOperationLink,
- link );
- _operationMatchParam_t * pParam = ( _operationMatchParam_t * ) pMatch;
-
- /* Check for matching operations. */
- if( pParam->type == pOperation->u.operation.type )
- {
- /* Check for matching packet identifiers. */
- if( pParam->pPacketIdentifier == NULL )
- {
- match = true;
- }
- else
- {
- match = ( *( pParam->pPacketIdentifier ) == pOperation->u.operation.packetIdentifier );
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- return match;
-}
-
-/*-----------------------------------------------------------*/
-
-static bool _checkRetryLimit( _mqttOperation_t * pOperation )
-{
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
- bool status = true, setDup = false;
-
- /* Choose a set DUP function. */
- void ( * publishSetDup )( uint8_t *,
- uint8_t *,
- uint16_t * ) = _IotMqtt_PublishSetDup;
-
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
- if( pMqttConnection->pSerializer != NULL )
- {
- if( pMqttConnection->pSerializer->serialize.publishSetDup != NULL )
- {
- publishSetDup = pMqttConnection->pSerializer->serialize.publishSetDup;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
-
- /* Only PUBLISH may be retried. */
- IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );
-
- /* Check if the retry limit is exhausted. */
- if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )
- {
- /* The retry count may be at most one more than the retry limit, which
- * accounts for the final check for a PUBACK. */
- IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==
- pOperation->u.operation.periodic.retry.limit + 1 );
-
- IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
- pMqttConnection,
- pOperation,
- pOperation->u.operation.periodic.retry.limit );
-
- status = false;
- }
- else
- {
- if( pOperation->u.operation.periodic.retry.count == 1 )
- {
- /* The DUP flag should always be set on the first retry. */
- setDup = true;
- }
- else if( pMqttConnection->awsIotMqttMode == true )
- {
- /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
- * identifier) must be reset on every retry. */
- setDup = true;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- if( setDup == true )
- {
- /* In AWS IoT MQTT mode, the references mutex must be locked to
- * prevent the packet identifier from being read while it is being
- * changed. */
- if( pMqttConnection->awsIotMqttMode == true )
- {
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Always set the DUP flag on the first retry. */
- publishSetDup( pOperation->u.operation.pMqttPacket,
- pOperation->u.operation.pPacketIdentifierHigh,
- &( pOperation->u.operation.packetIdentifier ) );
-
- if( pMqttConnection->awsIotMqttMode == true )
- {
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
-
- return status;
-}
-
-/*-----------------------------------------------------------*/
-
-static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
-{
- bool firstRetry = false;
- uint32_t scheduleDelay = 0;
- IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
-
- /* This function should never be called with retry count greater than
- * retry limit. */
- IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=
- pOperation->u.operation.periodic.retry.limit );
-
- /* Increment the retry count. */
- ( pOperation->u.operation.periodic.retry.count )++;
-
- /* Check for a response shortly for the final retry. Otherwise, calculate the
- * next retry period. */
- if( pOperation->u.operation.periodic.retry.count >
- pOperation->u.operation.periodic.retry.limit )
- {
- scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;
-
- IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Final retry was sent. Will check "
- "for response in %d ms.",
- pMqttConnection,
- pOperation,
- IOT_MQTT_RESPONSE_WAIT_MS );
- }
- else
- {
- scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;
-
- /* Double the retry period, subject to a ceiling value. */
- pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;
-
- if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )
- {
- pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
- pMqttConnection,
- pOperation,
- ( unsigned long ) pOperation->u.operation.periodic.retry.count,
- ( unsigned long ) pOperation->u.operation.periodic.retry.limit,
- ( unsigned long ) scheduleDelay );
-
- /* Check if this is the first retry. */
- firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );
-
- /* On the first retry, the PUBLISH will be moved from the pending processing
- * list to the pending responses list. Lock the connection references mutex
- * to manipulate the lists. */
- if( firstRetry == true )
- {
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
-
- /* Reschedule the PUBLISH for another send. */
- status = _IotMqtt_ScheduleOperation( pOperation,
- _IotMqtt_ProcessSend,
- scheduleDelay );
-
- /* Check for successful reschedule. */
- if( status == IOT_MQTT_SUCCESS )
- {
- /* Move a successfully rescheduled PUBLISH from the pending processing
- * list to the pending responses list on the first retry. */
- if( firstRetry == true )
- {
- /* Operation must be linked. */
- IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) == true );
-
- /* Transfer to pending response list. */
- IotListDouble_Remove( &( pOperation->link ) );
- IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
- &( pOperation->link ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* The references mutex only needs to be unlocked on the first retry, since
- * only the first retry manipulates the connection lists. */
- if( firstRetry == true )
- {
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- return( status == IOT_MQTT_SUCCESS );
-}
-
-/*-----------------------------------------------------------*/
-
-IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
- uint32_t flags,
- const IotMqttCallbackInfo_t * pCallbackInfo,
- _mqttOperation_t ** pNewOperation )
-{
- IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
- bool decrementOnError = false;
- _mqttOperation_t * pOperation = NULL;
- bool waitable = ( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE );
-
- /* If the waitable flag is set, make sure that there's no callback. */
- if( waitable == true )
- {
- if( pCallbackInfo != NULL )
- {
- IotLogError( "Callback should not be set for a waitable operation." );
-
- IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IotLogDebug( "(MQTT connection %p) Creating new operation record.",
- pMqttConnection );
-
- /* Increment the reference count for the MQTT connection when creating a new
- * operation. */
- if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == false )
- {
- IotLogError( "(MQTT connection %p) New operation record cannot be created"
- " for a closed connection",
- pMqttConnection );
-
- IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NETWORK_ERROR );
- }
- else
- {
- /* Reference count will need to be decremented on error. */
- decrementOnError = true;
- }
-
- /* Allocate memory for a new operation. */
- pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
-
- if( pOperation == NULL )
- {
- IotLogError( "(MQTT connection %p) Failed to allocate memory for new "
- "operation record.",
- pMqttConnection );
-
- IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
- }
- else
- {
- /* Clear the operation data. */
- ( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
-
- /* Initialize some members of the new operation. */
- pOperation->pMqttConnection = pMqttConnection;
- pOperation->u.operation.jobReference = 1;
- pOperation->u.operation.flags = flags;
- pOperation->u.operation.status = IOT_MQTT_STATUS_PENDING;
- }
-
- /* Check if the waitable flag is set. If it is, create a semaphore to
- * wait on. */
- if( waitable == true )
- {
- /* Create a semaphore to wait on for a waitable operation. */
- if( IotSemaphore_Create( &( pOperation->u.operation.notify.waitSemaphore ), 0, 1 ) == false )
- {
- IotLogError( "(MQTT connection %p) Failed to create semaphore for "
- "waitable operation.",
- pMqttConnection );
-
- IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
- }
- else
- {
- /* A waitable operation is created with an additional reference for the
- * Wait function. */
- ( pOperation->u.operation.jobReference )++;
- }
- }
- else
- {
- /* If the waitable flag isn't set but a callback is, copy the callback
- * information. */
- if( pCallbackInfo != NULL )
- {
- pOperation->u.operation.notify.callback = *pCallbackInfo;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
-
- /* Add this operation to the MQTT connection's operation list. */
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
- IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
- &( pOperation->link ) );
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
-
- /* Set the output parameter. */
- *pNewOperation = pOperation;
-
- /* Clean up operation and decrement reference count if this function failed. */
- IOT_FUNCTION_CLEANUP_BEGIN();
-
- if( status != IOT_MQTT_SUCCESS )
- {
- if( decrementOnError == true )
- {
- _IotMqtt_DecrementConnectionReferences( pMqttConnection );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- if( pOperation != NULL )
- {
- IotMqtt_FreeOperation( pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IOT_FUNCTION_CLEANUP_END();
-}
-
-/*-----------------------------------------------------------*/
-
-bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
- bool cancelJob )
-{
- bool destroyOperation = false;
- IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
-
- /* Attempt to cancel the operation's job. */
- if( cancelJob == true )
- {
- taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
- pOperation->job,
- NULL );
-
- /* If the operation's job was not canceled, it must be already executing.
- * Any other return value is invalid. */
- IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
- ( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
-
- if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Job canceled.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Decrement job reference count. */
- if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
- {
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
- pOperation->u.operation.jobReference--;
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed"
- " from %ld to %ld.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation,
- ( long ) ( pOperation->u.operation.jobReference + 1 ),
- ( long ) ( pOperation->u.operation.jobReference ) );
-
- /* The job reference count must be 0 or 1 after the decrement. */
- IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
- ( pOperation->u.operation.jobReference == 1 ) );
-
- /* This operation may be destroyed if its reference count is 0. */
- if( pOperation->u.operation.jobReference == 0 )
- {
- destroyOperation = true;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- return destroyOperation;
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation )
-{
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
-
- /* Default free packet function. */
- void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) Destroying operation.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
-
- /* The job reference count must be between 0 and 2. */
- IotMqtt_Assert( ( pOperation->u.operation.jobReference >= 0 ) &&
- ( pOperation->u.operation.jobReference <= 2 ) );
-
- /* Jobs to be destroyed should be removed from the MQTT connection's
- * lists. */
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
- if( IotLink_IsLinked( &( pOperation->link ) ) == true )
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Removed operation from connection lists.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation,
- pMqttConnection );
-
- IotListDouble_Remove( &( pOperation->link ) );
- }
- else
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Operation was not present in connection lists.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
-
- /* Free any allocated MQTT packet. */
- if( pOperation->u.operation.pMqttPacket != NULL )
- {
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
- if( pMqttConnection->pSerializer != NULL )
- {
- if( pMqttConnection->pSerializer->freePacket != NULL )
- {
- freePacket = pMqttConnection->pSerializer->freePacket;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
-
- freePacket( pOperation->u.operation.pMqttPacket );
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) MQTT packet freed.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
- else
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Operation has no allocated MQTT packet.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
-
- /* Check if a wait semaphore was created for this operation. */
- if( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
- {
- IotSemaphore_Destroy( &( pOperation->u.operation.notify.waitSemaphore ) );
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) Wait semaphore destroyed.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) Operation record destroyed.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
-
- /* Free the memory used to hold operation data. */
- IotMqtt_FreeOperation( pOperation );
-
- /* Decrement the MQTT connection's reference count after destroying an
- * operation. */
- _IotMqtt_DecrementConnectionReferences( pMqttConnection );
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
- IotTaskPoolJob_t pKeepAliveJob,
- void * pContext )
-{
- bool status = true;
- uint32_t swapStatus = 0;
- IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
- size_t bytesSent = 0;
-
- /* Swap status is not checked when asserts are disabled. */
- ( void ) swapStatus;
-
- /* Retrieve the MQTT connection from the context. */
- _mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
- _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );
-
- /* Check parameters. */
- /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
- IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );
-
- /* Check that keep-alive interval is valid. The MQTT spec states its maximum
- * value is 65,535 seconds. */
- IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );
-
- /* Only two values are valid for the next keep alive job delay. */
- IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
- pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||
- ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs
- == IOT_MQTT_RESPONSE_WAIT_MS ) );
-
- IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );
-
- /* Re-create the keep-alive job for rescheduling. This should never fail. */
- taskPoolStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
- pContext,
- IotTaskPool_GetJobStorageFromHandle( pKeepAliveJob ),
- &pKeepAliveJob );
- IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
-
- /* Determine whether to send a PINGREQ or check for PINGRESP. */
- if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
- pPingreqOperation->u.operation.periodic.ping.keepAliveMs )
- {
- IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );
-
- /* Because PINGREQ may be used to keep the MQTT connection alive, it is
- * more important than other operations. Bypass the queue of jobs for
- * operations by directly sending the PINGREQ in this job. */
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
- pPingreqOperation->u.operation.pMqttPacket,
- pPingreqOperation->u.operation.packetSize );
-
- if( bytesSent != pPingreqOperation->u.operation.packetSize )
- {
- IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
- status = false;
- }
- else
- {
- /* Assume the keep-alive will fail. The network receive callback will
- * clear the failure flag upon receiving a PINGRESP. */
- swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),
- 1,
- 0 );
- IotMqtt_Assert( swapStatus == 1 );
-
- /* Schedule a check for PINGRESP. */
- pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;
-
- IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
- pMqttConnection,
- IOT_MQTT_RESPONSE_WAIT_MS );
- }
- }
- else
- {
- IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );
-
- if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )
- {
- IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );
-
- /* PINGRESP was received. Schedule the next PINGREQ transmission. */
- pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =
- pPingreqOperation->u.operation.periodic.ping.keepAliveMs;
- }
- else
- {
- IotLogError( "(MQTT connection %p) Failed to receive PINGRESP within %d ms.",
- pMqttConnection,
- IOT_MQTT_RESPONSE_WAIT_MS );
-
- /* The network receive callback did not clear the failure flag. */
- status = false;
- }
- }
-
- /* When a PINGREQ is successfully sent, reschedule this job to check for a
- * response shortly. */
- if( status == true )
- {
- taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
- pKeepAliveJob,
- pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
-
- if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
- {
- IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",
- pMqttConnection,
- ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
- }
- else
- {
- IotLogError( "(MQTT connection %p) Failed to reschedule keep-alive job, error %s.",
- pMqttConnection,
- IotTaskPool_strerror( taskPoolStatus ) );
-
- status = false;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Close the connection on failures. */
- if( status == false )
- {
- _IotMqtt_CloseNetworkConnection( IOT_MQTT_KEEP_ALIVE_TIMEOUT,
- pMqttConnection );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
- IotTaskPoolJob_t pPublishJob,
- void * pContext )
-{
- _mqttOperation_t * pOperation = pContext;
- IotMqttCallbackParam_t callbackParam = { .mqttConnection = NULL };
-
- /* Check parameters. The task pool and job parameter is not used when asserts
- * are disabled. */
- ( void ) pTaskPool;
- ( void ) pPublishJob;
- /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
- IotMqtt_Assert( pOperation->incomingPublish == true );
- IotMqtt_Assert( pPublishJob == pOperation->job );
-
- /* Remove the operation from the pending processing list. */
- IotMutex_Lock( &( pOperation->pMqttConnection->referencesMutex ) );
-
- if( IotLink_IsLinked( &( pOperation->link ) ) == true )
- {
- IotListDouble_Remove( &( pOperation->link ) );
- }
- else
- {
- /* This operation may have already been removed by cleanup of pending
- * operations (called from Disconnect). In that case, do nothing here. */
- EMPTY_ELSE_MARKER;
- }
-
- IotMutex_Unlock( &( pOperation->pMqttConnection->referencesMutex ) );
-
- /* Process the current PUBLISH. */
- callbackParam.u.message.info = pOperation->u.publish.publishInfo;
-
- _IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
- &callbackParam );
-
- /* Free buffers associated with the current PUBLISH message. */
- IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );
- IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
-
- /* Free the incoming PUBLISH operation. */
- IotMqtt_FreeOperation( pOperation );
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
- IotTaskPoolJob_t pSendJob,
- void * pContext )
-{
- size_t bytesSent = 0;
- bool destroyOperation = false, waitable = false, networkPending = false;
- _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
-
- /* Check parameters. The task pool and job parameter is not used when asserts
- * are disabled. */
- ( void ) pTaskPool;
- ( void ) pSendJob;
- /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
- IotMqtt_Assert( pSendJob == pOperation->job );
-
- /* The given operation must have an allocated packet and be waiting for a status. */
- IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
- IotMqtt_Assert( pOperation->u.operation.packetSize != 0 );
- IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
-
- /* Check if this operation is waitable. */
- waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
-
- /* Check PUBLISH retry counts and limits. */
- if( pOperation->u.operation.periodic.retry.limit > 0 )
- {
- if( _checkRetryLimit( pOperation ) == false )
- {
- pOperation->u.operation.status = IOT_MQTT_RETRY_NO_RESPONSE;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Send an operation that is waiting for a response. */
- if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Sending MQTT packet.",
- pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
-
- /* Transmit the MQTT packet from the operation over the network. */
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
- pOperation->u.operation.pMqttPacket,
- pOperation->u.operation.packetSize );
-
- /* Check transmission status. */
- if( bytesSent != pOperation->u.operation.packetSize )
- {
- pOperation->u.operation.status = IOT_MQTT_NETWORK_ERROR;
- }
- else
- {
- /* DISCONNECT operations are considered successful upon successful
- * transmission. In addition, non-waitable operations with no callback
- * may also be considered successful. */
- if( pOperation->u.operation.type == IOT_MQTT_DISCONNECT )
- {
- /* DISCONNECT operations are always waitable. */
- IotMqtt_Assert( waitable == true );
-
- pOperation->u.operation.status = IOT_MQTT_SUCCESS;
- }
- else if( waitable == false )
- {
- if( pOperation->u.operation.notify.callback.function == NULL )
- {
- pOperation->u.operation.status = IOT_MQTT_SUCCESS;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Check if this operation requires further processing. */
- if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
- {
- /* Check if this operation should be scheduled for retransmission. */
- if( pOperation->u.operation.periodic.retry.limit > 0 )
- {
- if( _scheduleNextRetry( pOperation ) == false )
- {
- pOperation->u.operation.status = IOT_MQTT_SCHEDULING_ERROR;
- }
- else
- {
- /* A successfully scheduled PUBLISH retry is awaiting a response
- * from the network. */
- networkPending = true;
- }
- }
- else
- {
- /* Decrement reference count to signal completion of send job. Check
- * if the operation should be destroyed. */
- if( waitable == true )
- {
- destroyOperation = _IotMqtt_DecrementOperationReferences( pOperation, false );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* If the operation should not be destroyed, transfer it from the
- * pending processing to the pending response list. */
- if( destroyOperation == false )
- {
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
- /* Operation must be linked. */
- IotMqtt_Assert( IotLink_IsLinked( &( pOperation->link ) ) );
-
- /* Transfer to pending response list. */
- IotListDouble_Remove( &( pOperation->link ) );
- IotListDouble_InsertHead( &( pMqttConnection->pendingResponse ),
- &( pOperation->link ) );
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
-
- /* This operation is now awaiting a response from the network. */
- networkPending = true;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Destroy the operation or notify of completion if necessary. */
- if( destroyOperation == true )
- {
- _IotMqtt_DestroyOperation( pOperation );
- }
- else
- {
- /* Do not check the operation status if a network response is pending,
- * since a network response could modify the status. */
- if( networkPending == false )
- {
- /* Notify of operation completion if this job set a status. */
- if( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING )
- {
- _IotMqtt_Notify( pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
- IotTaskPoolJob_t pOperationJob,
- void * pContext )
-{
- _mqttOperation_t * pOperation = ( _mqttOperation_t * ) pContext;
- IotMqttCallbackParam_t callbackParam = { 0 };
-
- /* Check parameters. The task pool and job parameter is not used when asserts
- * are disabled. */
- ( void ) pTaskPool;
- ( void ) pOperationJob;
- /* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
- IotMqtt_Assert( pOperationJob == pOperation->job );
-
- /* The operation's callback function and status must be set. */
- IotMqtt_Assert( pOperation->u.operation.notify.callback.function != NULL );
- IotMqtt_Assert( pOperation->u.operation.status != IOT_MQTT_STATUS_PENDING );
-
- callbackParam.mqttConnection = pOperation->pMqttConnection;
- callbackParam.u.operation.type = pOperation->u.operation.type;
- callbackParam.u.operation.reference = pOperation;
- callbackParam.u.operation.result = pOperation->u.operation.status;
-
- /* Invoke the user callback function. */
- pOperation->u.operation.notify.callback.function( pOperation->u.operation.notify.callback.pCallbackContext,
- &callbackParam );
-
- /* Attempt to destroy the operation once the user callback returns. */
- if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
- {
- _IotMqtt_DestroyOperation( pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-}
-
-/*-----------------------------------------------------------*/
-
-IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
- IotTaskPoolRoutine_t jobRoutine,
- uint32_t delay )
-{
- IotMqttError_t status = IOT_MQTT_SUCCESS;
- IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
-
- /* Check that job routine is valid. */
- IotMqtt_Assert( ( jobRoutine == _IotMqtt_ProcessSend ) ||
- ( jobRoutine == _IotMqtt_ProcessCompletedOperation ) ||
- ( jobRoutine == _IotMqtt_ProcessIncomingPublish ) );
-
- /* Creating a new job should never fail when parameters are valid. */
- taskPoolStatus = IotTaskPool_CreateJob( jobRoutine,
- pOperation,
- &( pOperation->jobStorage ),
- &( pOperation->job ) );
- IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
-
- /* Schedule the new job with a delay. */
- taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
- pOperation->job,
- delay );
-
- if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
- {
- /* Scheduling a newly-created job should never be invalid or illegal. */
- IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_BAD_PARAMETER );
- IotMqtt_Assert( taskPoolStatus != IOT_TASKPOOL_ILLEGAL_OPERATION );
-
- IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule operation job, error %s.",
- pOperation->pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation,
- IotTaskPool_strerror( taskPoolStatus ) );
-
- status = IOT_MQTT_SCHEDULING_ERROR;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- return status;
-}
-
-/*-----------------------------------------------------------*/
-
-_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
- IotMqttOperationType_t type,
- const uint16_t * pPacketIdentifier )
-{
- bool waitable = false;
- IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
- _mqttOperation_t * pResult = NULL;
- IotLink_t * pResultLink = NULL;
- _operationMatchParam_t param = { 0 };
-
- param.type = type;
- param.pPacketIdentifier = pPacketIdentifier;
-
- if( pPacketIdentifier != NULL )
- {
- IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response "
- "with packet identifier %hu.",
- pMqttConnection,
- IotMqtt_OperationType( type ),
- *pPacketIdentifier );
- }
- else
- {
- IotLogDebug( "(MQTT connection %p) Searching for operation %s pending response.",
- pMqttConnection,
- IotMqtt_OperationType( type ) );
- }
-
- /* Find and remove the first matching element in the list. */
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
- pResultLink = IotListDouble_FindFirstMatch( &( pMqttConnection->pendingResponse ),
- NULL,
- _mqttOperation_match,
- &param );
-
- /* Check if a match was found. */
- if( pResultLink != NULL )
- {
- /* Get operation pointer and check if it is waitable. */
- pResult = IotLink_Container( _mqttOperation_t, pResultLink, link );
- waitable = ( pResult->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
-
- /* Check if the matched operation is a PUBLISH with retry. If it is, cancel
- * the retry job. */
- if( pResult->u.operation.periodic.retry.limit > 0 )
- {
- taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
- pResult->job,
- NULL );
-
- /* If the retry job could not be canceled, then it is currently
- * executing. Ignore the operation. */
- if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
- {
- pResult = NULL;
- }
- else
- {
- /* Check job reference counts. A waitable operation should have a
- * count of 2; a non-waitable operation should have a count of 1. */
- IotMqtt_Assert( pResult->u.operation.jobReference == ( 1 + ( waitable == true ) ) );
- }
- }
- else
- {
- /* An operation with no retry in the pending responses list should
- * always have a job reference of 1. */
- IotMqtt_Assert( pResult->u.operation.jobReference == 1 );
-
- /* Increment job references of a waitable operation to prevent Wait from
- * destroying this operation if it times out. */
- if( waitable == true )
- {
- ( pResult->u.operation.jobReference )++;
-
- IotLogDebug( "(MQTT connection %p, %s operation %p) Job reference changed from %ld to %ld.",
- pMqttConnection,
- IotMqtt_OperationType( type ),
- pResult,
- ( long int ) ( pResult->u.operation.jobReference - 1 ),
- ( long int ) ( pResult->u.operation.jobReference ) );
- }
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- if( pResult != NULL )
- {
- IotLogDebug( "(MQTT connection %p) Found operation %s.",
- pMqttConnection,
- IotMqtt_OperationType( type ) );
-
- /* Remove the matched operation from the list. */
- IotListDouble_Remove( &( pResult->link ) );
- }
- else
- {
- IotLogDebug( "(MQTT connection %p) Operation %s not found.",
- pMqttConnection,
- IotMqtt_OperationType( type ) );
- }
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
-
- return pResult;
-}
-
-/*-----------------------------------------------------------*/
-
-void _IotMqtt_Notify( _mqttOperation_t * pOperation )
-{
- IotMqttError_t status = IOT_MQTT_SCHEDULING_ERROR;
- _mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
-
- /* Check if operation is waitable. */
- bool waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
-
- /* Remove any lingering subscriptions if a SUBSCRIBE failed. Rejected
- * subscriptions are removed by the deserializer, so not removed here. */
- if( pOperation->u.operation.type == IOT_MQTT_SUBSCRIBE )
- {
- switch( pOperation->u.operation.status )
- {
- case IOT_MQTT_SUCCESS:
- break;
-
- case IOT_MQTT_SERVER_REFUSED:
- break;
-
- default:
- _IotMqtt_RemoveSubscriptionByPacket( pOperation->pMqttConnection,
- pOperation->u.operation.packetIdentifier,
- -1 );
- break;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Schedule callback invocation for non-waitable operation. */
- if( waitable == false )
- {
- /* Non-waitable operation should have job reference of 1. */
- IotMqtt_Assert( pOperation->u.operation.jobReference == 1 );
-
- /* Schedule an invocation of the callback. */
- if( pOperation->u.operation.notify.callback.function != NULL )
- {
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
- status = _IotMqtt_ScheduleOperation( pOperation,
- _IotMqtt_ProcessCompletedOperation,
- 0 );
-
- if( status == IOT_MQTT_SUCCESS )
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Callback scheduled.",
- pOperation->pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
-
- /* Place the scheduled operation back in the list of operations pending
- * processing. */
- if( IotLink_IsLinked( &( pOperation->link ) ) == true )
- {
- IotListDouble_Remove( &( pOperation->link ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
- &( pOperation->link ) );
- }
- else
- {
- IotLogWarn( "(MQTT connection %p, %s operation %p) Failed to schedule callback.",
- pOperation->pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
- }
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Operations that weren't scheduled may be destroyed. */
- if( status == IOT_MQTT_SCHEDULING_ERROR )
- {
- /* Decrement reference count of operations not scheduled. */
- if( _IotMqtt_DecrementOperationReferences( pOperation, false ) == true )
- {
- _IotMqtt_DestroyOperation( pOperation );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
-
- /* Post to a waitable operation's semaphore. */
- if( waitable == true )
- {
- IotLogDebug( "(MQTT connection %p, %s operation %p) Waitable operation "
- "notified of completion.",
- pOperation->pMqttConnection,
- IotMqtt_OperationType( pOperation->u.operation.type ),
- pOperation );
-
- IotSemaphore_Post( &( pOperation->u.operation.notify.waitSemaphore ) );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- IotMqtt_Assert( status == IOT_MQTT_SUCCESS );
- }
-}
-
-/*-----------------------------------------------------------*/