summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c')
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c101
1 files changed, 59 insertions, 42 deletions
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
index 23133f46b..0d6a259ed 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
@@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData );
static void _mqttOperation_tryDestroy( void * pData );
/**
- * @brief Create a keep-alive job for an MQTT connection.
+ * @brief Initialize the keep-alive operation for an MQTT connection.
*
* @param[in] pNetworkInfo User-provided network information for the new
* connection.
@@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData );
*
* @return `true` if the keep-alive job was successfully created; `false` otherwise.
*/
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
- uint16_t keepAliveSeconds,
- _mqttConnection_t * pMqttConnection );
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
+ uint16_t keepAliveSeconds,
+ _mqttConnection_t * pMqttConnection );
/**
* @brief Creates a new MQTT connection and initializes its members.
@@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pOperationReference );
+ IotMqttOperation_t * const pOperationReference );
/*-----------------------------------------------------------*/
@@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData )
/*-----------------------------------------------------------*/
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
- uint16_t keepAliveSeconds,
- _mqttConnection_t * pMqttConnection )
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
+ uint16_t keepAliveSeconds,
+ _mqttConnection_t * pMqttConnection )
{
bool status = true;
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
@@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqttError_t ( * serializePingreq )( uint8_t **,
size_t * ) = _IotMqtt_SerializePingreq;
+ /* Set PINGREQ operation members. */
+ pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;
+
/* Convert the keep-alive interval to milliseconds. */
- pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;
+ pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;
/* Choose a PINGREQ serializer function. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
@@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Generate a PINGREQ packet. */
- serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),
- &( pMqttConnection->pingreqPacketSize ) );
+ serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),
+ &( pMqttConnection->pingreq.u.operation.packetSize ) );
if( serializeStatus != IOT_MQTT_SUCCESS )
{
@@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
/* Create the task pool job that processes keep-alive. */
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
pMqttConnection,
- &( pMqttConnection->keepAliveJobStorage ),
- &( pMqttConnection->keepAliveJob ) );
+ &( pMqttConnection->pingreq.jobStorage ),
+ &( pMqttConnection->pingreq.job ) );
/* Task pool job creation for a pre-allocated job should never fail.
* Abort the program if it does. */
@@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
/* Check if keep-alive is active for this connection. */
if( keepAliveSeconds != 0 )
{
- if( _createKeepAliveJob( pNetworkInfo,
- keepAliveSeconds,
- pMqttConnection ) == false )
+ if( _createKeepAliveOperation( pNetworkInfo,
+ keepAliveSeconds,
+ pMqttConnection ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
@@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
{
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
+ /* Default free packet function. */
+ void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;
+
/* Clean up keep-alive if still allocated. */
- if( pMqttConnection->keepAliveMs != 0 )
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
+ /* Choose a function to free the packet. */
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
+ if( pMqttConnection->pSerializer != NULL )
+ {
+ if( pMqttConnection->pSerializer->freePacket != NULL )
+ {
+ freePacket = pMqttConnection->pSerializer->freePacket;
+ }
+ }
+ #endif
+
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
- pMqttConnection->keepAliveMs = 0;
- pMqttConnection->pPingreqPacket = NULL;
- pMqttConnection->pingreqPacketSize = 0;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
+ pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Decrement reference count. */
pMqttConnection->references--;
@@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
/* A connection to be destroyed should have no keep-alive and at most 1
* reference. */
IotMqtt_Assert( pMqttConnection->references <= 1 );
- IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );
- IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );
/* Remove all subscriptions. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
@@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pOperationReference )
+ IotMqttOperation_t * const pOperationReference )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pSubscriptionOperation = NULL;
@@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
/* Check the subscription operation data and set the operation type. */
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
- IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );
pSubscriptionOperation->u.operation.type = operation;
/* Generate a subscription packet from the subscription list. */
@@ -853,7 +870,7 @@ IotMqttError_t IotMqtt_Init( void )
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Log initialization status. */
- if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0
+ if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to initialize MQTT library serializer. " );
}
@@ -896,7 +913,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
_mqttConnection_t * pNewMqttConnection = NULL;
/* Default CONNECT serializer function. */
- IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.
+ IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
uint8_t **,
size_t * ) = _IotMqtt_SerializeConnect;
@@ -911,7 +928,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
}
/* Validate network interface and connect info. */
- if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().
+ if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
@@ -1002,7 +1019,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotLogInfo( "Establishing new MQTT connection." );
- /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below.
+ /* Initialize a new MQTT connection object. */
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
pNetworkInfo,
pConnectInfo->keepAliveSeconds );
@@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_CONNECT;
@@ -1127,7 +1144,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
/* Add the CONNECT operation to the send queue for network transmission. */
- status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?
+ status = _IotMqtt_ScheduleOperation( pOperation,
_IotMqtt_ProcessSend,
0 );
@@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
if( status == IOT_MQTT_SUCCESS )
{
/* Check if a keep-alive job should be scheduled. */
- if( pNewMqttConnection->keepAliveMs != 0 )
+ if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "Scheduling first MQTT keep-alive job." );
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
- pNewMqttConnection->keepAliveJob,
- pNewMqttConnection->nextKeepAliveMs );
+ pNewMqttConnection->pingreq.job,
+ pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
{
@@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
@@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pSubscribeOperation )
+ IotMqttOperation_t * const pSubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
mqttConnection,
@@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pUnsubscribeOperation )
+ IotMqttOperation_t * const pUnsubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
mqttConnection,
@@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pPublishOperation )
+ IotMqttOperation_t * const pPublishOperation )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pOperation = NULL;
@@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
/* A QoS 0 PUBLISH may not be retried. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
- pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;
- pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;
+ pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;
+ pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;
}
else
{