summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c')
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c210
1 files changed, 142 insertions, 68 deletions
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
index 169a292df..42843d28c 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
@@ -43,6 +43,9 @@
/* Platform layer includes. */
#include "platform/iot_threads.h"
+/* Atomics include. */
+#include "iot_atomic.h"
+
/*-----------------------------------------------------------*/
/**
@@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier );
+/**
+ * @brief Flush a packet from the stream of incoming data.
+ *
+ * This function is called when memory for a packet cannot be allocated. The
+ * packet is flushed from the stream of incoming data so that the next packet
+ * may be read.
+ *
+ * @param[in] pNetworkConnection Network connection to use for receive, which
+ * may be different from the network connection associated with the MQTT connection.
+ * @param[in] pMqttConnection The associated MQTT connection.
+ * @param[in] length The length of the packet to flush.
+ */
+static void _flushPacket( void * pNetworkConnection,
+ const _mqttConnection_t * pMqttConnection,
+ size_t length );
+
/*-----------------------------------------------------------*/
static bool _incomingPacketValid( uint8_t packetType )
@@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
if( pIncomingPacket->pRemainingData == NULL )
{
+ IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
+ "%lu for incoming packet type %lu.",
+ pMqttConnection,
+ ( unsigned long ) pIncomingPacket->remainingLength,
+ ( unsigned long ) pIncomingPacket->type );
+
+ _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
+
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
@@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
if( status == IOT_MQTT_SUCCESS )
{
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
- if( pMqttConnection->keepAliveFailure == false )
+ if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
+ 0,
+ 1 ) == 1 )
{
- IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
- pMqttConnection );
+ IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
+ pMqttConnection );
}
else
{
- IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
- pMqttConnection );
-
- pMqttConnection->keepAliveFailure = false;
+ IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
+ pMqttConnection );
}
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
else
{
@@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier )
{
- IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
- uint8_t * pPuback = NULL;
- size_t pubackSize = 0, bytesSent = 0;
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
+ _mqttOperation_t * pPubackOperation = NULL;
- /* Default PUBACK serializer and free packet functions. */
+ /* Default PUBACK serializer function. */
IotMqttError_t ( * serializePuback )( uint16_t,
uint8_t **,
size_t * ) = _IotMqtt_SerializePuback;
- void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
pMqttConnection,
@@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection,
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
- if( pMqttConnection->pSerializer != NULL )
- {
- if( pMqttConnection->pSerializer->freePacket != NULL )
- {
- freePacket = pMqttConnection->pSerializer->freePacket;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
+
+ /* Create a PUBACK operation. */
+ status = _IotMqtt_CreateOperation( pMqttConnection,
+ 0,
+ NULL,
+ &pPubackOperation );
+
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ IOT_GOTO_CLEANUP();
+ }
+
+ /* Set the operation type. */
+ pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
/* Generate a PUBACK packet from the packet identifier. */
- serializeStatus = serializePuback( packetIdentifier,
- &pPuback,
- &pubackSize );
+ status = serializePuback( packetIdentifier,
+ &( pPubackOperation->u.operation.pMqttPacket ),
+ &( pPubackOperation->u.operation.packetSize ) );
+
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ IOT_GOTO_CLEANUP();
+ }
+
+ /* Add the PUBACK operation to the send queue for network transmission. */
+ status = _IotMqtt_ScheduleOperation( pPubackOperation,
+ _IotMqtt_ProcessSend,
+ 0 );
- if( serializeStatus != IOT_MQTT_SUCCESS )
+ if( status != IOT_MQTT_SUCCESS )
{
- IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "
- "received PUBLISH %hu.",
- pMqttConnection,
- packetIdentifier );
+ IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
+ pMqttConnection );
+
+ IOT_GOTO_CLEANUP();
}
else
{
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
- pPuback,
- pubackSize );
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Clean up on error. */
+ IOT_FUNCTION_CLEANUP_BEGIN();
- if( bytesSent != pubackSize )
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ if( pPubackOperation != NULL )
{
- IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"
- " PUBLISH %hu.",
- pMqttConnection,
- packetIdentifier );
+ _IotMqtt_DestroyOperation( pPubackOperation );
}
else
{
- IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",
- pMqttConnection,
- packetIdentifier );
+ EMPTY_ELSE_MARKER;
}
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+}
+
+/*-----------------------------------------------------------*/
- freePacket( pPuback );
+static void _flushPacket( void * pNetworkConnection,
+ const _mqttConnection_t * pMqttConnection,
+ size_t length )
+{
+ size_t bytesFlushed = 0;
+ uint8_t receivedByte = 0;
+
+ for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
+ {
+ ( void ) _IotMqtt_GetNextByte( pNetworkConnection,
+ pMqttConnection->pNetworkInterface,
+ &receivedByte );
}
}
@@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
+ void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
+
+ /* Disconnect callback function. */
+ void ( * disconnectCallback )( void *,
+ IotMqttCallbackParam_t * ) = NULL;
+
+ /* Network close function. */
+ IotNetworkError_t ( * closeConnection) ( void * ) = NULL;
+
+ /* Default free packet function. */
+ void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
pMqttConnection->disconnected = true;
- pMqttConnection->keepAliveFailure = true;
- if( pMqttConnection->keepAliveMs != 0 )
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
/* Keep-alive must have a PINGREQ allocated. */
- IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
/* PINGREQ provides a reference to the connection, so reference count must
* be nonzero. */
@@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
/* Attempt to cancel the keep-alive job. */
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
- pMqttConnection->keepAliveJob,
+ pMqttConnection->pingreq.job,
NULL );
/* If the keep-alive job was not canceled, it must be already executing.
@@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
* the executing keep-alive job will clean up itself. */
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
- /* Clean up PINGREQ packet and job. */
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
+ /* Choose a function to free the packet. */
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
+ if( pMqttConnection->pSerializer != NULL )
+ {
+ if( pMqttConnection->pSerializer->freePacket != NULL )
+ {
+ freePacket = pMqttConnection->pSerializer->freePacket;
+ }
+ }
+ #endif
+
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
- pMqttConnection->keepAliveMs = 0;
- pMqttConnection->pPingreqPacket = NULL;
- pMqttConnection->pingreqPacketSize = 0;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
+ pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Keep-alive is cleaned up; decrement reference count. Since this
* function must be followed with a call to DISCONNECT, a check to
@@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
EMPTY_ELSE_MARKER;
}
+ /* Copy the function pointers and contexts, as the MQTT connection may be
+ * modified after the mutex is released. */
+ disconnectCallback = pMqttConnection->disconnectCallback.function;
+ pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
+
+ closeConnection = pMqttConnection->pNetworkInterface->close;
+ pNetworkConnection = pMqttConnection->pNetworkConnection;
+
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Close the network connection. */
- if( pMqttConnection->pNetworkInterface->close != NULL )
+ if( closeConnection != NULL )
{
- closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );
+ closeStatus = closeConnection( pNetworkConnection );
if( closeStatus == IOT_NETWORK_SUCCESS )
{
@@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
}
/* Invoke the disconnect callback. */
- if( pMqttConnection->disconnectCallback.function != NULL )
+ if( disconnectCallback != NULL )
{
/* Set the members of the callback parameter. */
callbackParam.mqttConnection = pMqttConnection;
callbackParam.u.disconnectReason = disconnectReason;
- pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,
- &callbackParam );
+ disconnectCallback( pDisconnectCallbackContext,
+ &callbackParam );
}
else
{