summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArchit Aggarwal <architag@amazon.com>2020-07-29 13:52:36 -0700
committerGitHub <noreply@github.com>2020-07-29 13:52:36 -0700
commita007d6fc90d205f66e547c5b95cecb1ccd3507ff (patch)
treecd5ba567f816674e4f1a270d066291566a0c1611
parentfce6ec479dfb24688cfd9b7644ef17a1367833b7 (diff)
downloadfreertos-git-a007d6fc90d205f66e547c5b95cecb1ccd3507ff.tar.gz
Sync MQTT files from C-SDK repository (#174)
* Sync MQTT files with 156af10bae575ec500280e401b3d03d8759291f1 commit of C-SDK * Update plaintext demo with changes to MQTT_Init API
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c8
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h40
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c209
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c56
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c2
5 files changed, 182 insertions, 133 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c
index b70f1bbc0..4073230b9 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c
+++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_plain_text/DemoTasks/PlaintextMQTTExample.c
@@ -400,7 +400,6 @@ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
MQTTConnectInfo_t xConnectInfo;
bool xSessionPresent;
TransportInterface_t xTransport;
- MQTTApplicationCallbacks_t xCallbacks;
/***
* For readability, error handling in this function is restricted to the use of
@@ -412,13 +411,8 @@ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
xTransport.send = Plaintext_FreeRTOS_send;
xTransport.recv = Plaintext_FreeRTOS_recv;
- /* Application callbacks for receiving incoming published and incoming acks
- * from MQTT library. */
- xCallbacks.appCallback = prvEventCallback;
- xCallbacks.getTime = prvGetTimeMs;
-
/* Initialize MQTT library. */
- xResult = MQTT_Init( pxMQTTContext, &xTransport, &xCallbacks, &xBuffer );
+ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer );
configASSERT( xResult == MQTTSuccess );
/* Many fields not used in this demo so start with everything at 0. */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h
index 8db419707..2e5c15c0d 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/include/mqtt.h
@@ -35,17 +35,29 @@
*/
#define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U )
-struct MQTTApplicationCallbacks;
-typedef struct MQTTApplicationCallbacks MQTTApplicationCallbacks_t;
-
struct MQTTPubAckInfo;
-typedef struct MQTTPubAckInfo MQTTPubAckInfo_t;
+typedef struct MQTTPubAckInfo MQTTPubAckInfo_t;
struct MQTTContext;
-typedef struct MQTTContext MQTTContext_t;
+typedef struct MQTTContext MQTTContext_t;
+/**
+ * @brief Application provided callback to retrieve the current time in
+ * milliseconds.
+ *
+ * @return The current time in milliseconds.
+ */
typedef uint32_t (* MQTTGetCurrentTimeFunc_t )( void );
+/**
+ * @brief Application callback for receiving incoming publishes and incoming
+ * acks.
+ *
+ * @param[in] pContext Initialized MQTT context.
+ * @param[in] pPacketInfo Information on the type of incoming MQTT packet.
+ * @param[in] packetIdentifier Packet identifier of incoming PUBLISH packet.
+ * @param[in] pPublishInfo Incoming PUBLISH packet parameters.
+ */
typedef void (* MQTTEventCallback_t )( MQTTContext_t * pContext,
MQTTPacketInfo_t * pPacketInfo,
uint16_t packetIdentifier,
@@ -80,12 +92,6 @@ typedef enum MQTTPubAckType
MQTTPubcomp
} MQTTPubAckType_t;
-struct MQTTApplicationCallbacks
-{
- MQTTGetCurrentTimeFunc_t getTime;
- MQTTEventCallback_t appCallback;
-};
-
struct MQTTPubAckInfo
{
uint16_t packetId;
@@ -105,7 +111,8 @@ struct MQTTContext
uint16_t nextPacketId;
MQTTConnectionStatus_t connectStatus;
- MQTTApplicationCallbacks_t callbacks;
+ MQTTGetCurrentTimeFunc_t getTime;
+ MQTTEventCallback_t appCallback;
uint32_t lastPacketTime;
bool controlPacketSent;
@@ -128,7 +135,9 @@ struct MQTTContext
*
* @brief param[in] pContext The context to initialize.
* @brief param[in] pTransportInterface The transport interface to use with the context.
- * @brief param[in] pCallbacks Callbacks to use with the context.
+ * @brief param[in] getTimeFunction The time utility function to use with the context.
+ * @brief param[in] userCallback The user callback to use with the context to notify about
+ * incoming packet events.
* @brief param[in] pNetworkBuffer Network buffer provided for the context.
*
* @return #MQTTBadParameter if invalid parameters are passed;
@@ -136,7 +145,8 @@ struct MQTTContext
*/
MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
const TransportInterface_t * pTransportInterface,
- const MQTTApplicationCallbacks_t * pCallbacks,
+ MQTTGetCurrentTimeFunc_t getTimeFunction,
+ MQTTEventCallback_t userCallback,
const MQTTFixedBuffer_t * pNetworkBuffer );
/**
@@ -160,7 +170,7 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
* Testament is not used.
* @param[in] timeoutMs Maximum time in milliseconds to wait for a CONNACK packet.
* A zero timeout makes use of the retries for receiving CONNACK as configured with
- * #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT .
+ * #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT.
* @param[out] pSessionPresent Whether a previous session was present.
* Only relevant if not establishing a clean session.
*
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c
index 1eeebf120..0d961a457 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt.c
@@ -41,11 +41,6 @@
#define MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT ( 5U )
#endif
-/**
- * @brief A return code indicating an error from the transport interface.
- */
-#define TRANSPORT_ERROR ( -1 )
-
/*-----------------------------------------------------------*/
/**
@@ -55,7 +50,7 @@
* @brief param[in] pBufferToSend Buffer to be sent to network.
* @brief param[in] bytesToSend Number of bytes to be sent.
*
- * @return Total number of bytes sent; -1 if there is an error.
+ * @return Total number of bytes sent, or negative number on network error.
*/
static int32_t sendPacket( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
@@ -316,39 +311,48 @@ static int32_t sendPacket( MQTTContext_t * pContext,
size_t bytesRemaining = bytesToSend;
int32_t totalBytesSent = 0, bytesSent;
uint32_t sendTime = 0U;
+ bool sendError = false;
assert( pContext != NULL );
- assert( pContext->callbacks.getTime != NULL );
+ assert( pContext->getTime != NULL );
+ assert( pContext->transportInterface.send != NULL );
+ assert( pIndex != NULL );
bytesRemaining = bytesToSend;
/* Record the time of transmission. */
- sendTime = pContext->callbacks.getTime();
+ sendTime = pContext->getTime();
/* Loop until the entire packet is sent. */
- while( bytesRemaining > 0UL )
+ while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
{
bytesSent = pContext->transportInterface.send( pContext->transportInterface.pNetworkContext,
pIndex,
bytesRemaining );
- if( bytesSent > 0 )
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed. Error code=%d.", bytesSent ) );
+ totalBytesSent = bytesSent;
+ sendError = true;
+ }
+ else
{
+ /* It is a bug in the application's transport send implementation if
+ * more bytes than expected are sent. To avoid a possible overflow
+ * in converting bytesRemaining from unsigned to signed, this assert
+ * must exist after the check for bytesSent being negative. */
+ assert( ( size_t ) bytesSent <= bytesRemaining );
+
bytesRemaining -= ( size_t ) bytesSent;
totalBytesSent += bytesSent;
pIndex += bytesSent;
- LogDebug( ( "Bytes sent=%d, bytes remaining=%ul,"
- "total bytes sent=%d.",
+ LogDebug( ( "BytesSent=%d, BytesRemaining=%lu,"
+ " TotalBytesSent=%d.",
bytesSent,
bytesRemaining,
totalBytesSent ) );
}
- else
- {
- LogError( ( "Transport send failed. Error code=%d.", bytesSent ) );
- totalBytesSent = TRANSPORT_ERROR;
- break;
- }
}
/* Update time of last transmission if the entire packet is successfully sent. */
@@ -419,11 +423,13 @@ static int32_t recvExact( const MQTTContext_t * pContext,
assert( pContext != NULL );
assert( bytesToRecv <= pContext->networkBuffer.size );
- assert( pContext->callbacks.getTime != NULL );
+ assert( pContext->getTime != NULL );
+ assert( pContext->transportInterface.recv != NULL );
+ assert( pContext->networkBuffer.pBuffer != NULL );
pIndex = pContext->networkBuffer.pBuffer;
recvFunc = pContext->transportInterface.recv;
- getTimeStampMs = pContext->callbacks.getTime;
+ getTimeStampMs = pContext->getTime;
entryTimeMs = getTimeStampMs();
@@ -433,19 +439,31 @@ static int32_t recvExact( const MQTTContext_t * pContext,
pIndex,
bytesRemaining );
- if( bytesRecvd >= 0 )
- {
- bytesRemaining -= ( size_t ) bytesRecvd;
- totalBytesRecvd += ( int32_t ) bytesRecvd;
- pIndex += bytesRecvd;
- }
- else
+ if( bytesRecvd < 0 )
{
- LogError( ( "Network error while receiving packet: ReturnCode=%d",
+ LogError( ( "Network error while receiving packet: ReturnCode=%d.",
bytesRecvd ) );
totalBytesRecvd = bytesRecvd;
receiveError = true;
}
+ else
+ {
+ /* It is a bug in the application's transport receive implementation
+ * if more bytes than expected are received. To avoid a possible
+ * overflow in converting bytesRemaining from unsigned to signed,
+ * this assert must exist after the check for bytesRecvd being
+ * negative. */
+ assert( ( size_t ) bytesRecvd <= bytesRemaining );
+
+ bytesRemaining -= ( size_t ) bytesRecvd;
+ totalBytesRecvd += ( int32_t ) bytesRecvd;
+ pIndex += bytesRecvd;
+ LogDebug( ( "BytesReceived=%d, BytesRemaining=%lu, "
+ "TotalBytesReceived=%d.",
+ bytesRecvd,
+ bytesRemaining,
+ totalBytesRecvd ) );
+ }
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
@@ -474,9 +492,9 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
bool receiveError = false;
assert( pContext != NULL );
- assert( pContext->callbacks.getTime != NULL );
+ assert( pContext->getTime != NULL );
bytesToReceive = pContext->networkBuffer.size;
- getTimeStampMs = pContext->callbacks.getTime;
+ getTimeStampMs = pContext->getTime;
entryTimeMs = getTimeStampMs();
@@ -538,12 +556,13 @@ static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
size_t bytesToReceive = 0U;
assert( pContext != NULL );
+ assert( pContext->networkBuffer.pBuffer != NULL );
if( incomingPacket.remainingLength > pContext->networkBuffer.size )
{
LogError( ( "Incoming packet will be dumped: "
"Packet length exceeds network buffer size."
- "PacketSize=%lu, NetworkBufferSize=%lu",
+ "PacketSize=%lu, NetworkBufferSize=%lu.",
incomingPacket.remainingLength,
pContext->networkBuffer.size ) );
status = discardPacket( pContext,
@@ -674,7 +693,7 @@ static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext )
uint32_t now = 0U, keepAliveMs = 0U;
assert( pContext != NULL );
- now = pContext->callbacks.getTime();
+ now = pContext->getTime();
keepAliveMs = 1000U * ( uint32_t ) pContext->keepAliveIntervalSec;
/* If keep alive interval is 0, it is disabled. */
@@ -714,7 +733,7 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
assert( pIncomingPacket != NULL );
status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
- LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%d", status ) );
+ LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%d.", status ) );
if( status == MQTTSuccess )
{
@@ -785,10 +804,10 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
* duplicate incoming publishes. */
if( duplicatePublish == false )
{
- pContext->callbacks.appCallback( pContext,
- pIncomingPacket,
- packetIdentifier,
- &publishInfo );
+ pContext->appCallback( pContext,
+ pIncomingPacket,
+ packetIdentifier,
+ &publishInfo );
}
/* Send PUBACK or PUBREC if necessary. */
@@ -813,9 +832,9 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
assert( pContext != NULL );
assert( pIncomingPacket != NULL );
- assert( pContext->callbacks.appCallback != NULL );
+ assert( pContext->appCallback != NULL );
- appCallback = pContext->callbacks.appCallback;
+ appCallback = pContext->appCallback;
ackType = getAckFromPacketType( pIncomingPacket->type );
status = MQTT_DeserializeAck( pIncomingPacket, &packetIdentifier, NULL );
@@ -879,7 +898,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
assert( pContext != NULL );
assert( pIncomingPacket != NULL );
- appCallback = pContext->callbacks.appCallback;
+ appCallback = pContext->appCallback;
switch( pIncomingPacket->type )
{
@@ -937,6 +956,7 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
MQTTPacketInfo_t incomingPacket;
assert( pContext != NULL );
+ assert( pContext->networkBuffer.pBuffer != NULL );
status = MQTT_GetIncomingPacketTypeAndLength( pContext->transportInterface.recv,
pContext->transportInterface.pNetworkContext,
@@ -1045,6 +1065,8 @@ static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
assert( pContext != NULL );
assert( pPublishInfo != NULL );
assert( headerSize > 0 );
+ assert( pContext->networkBuffer.pBuffer != NULL );
+ assert( !( pPublishInfo->payloadLength > 0 ) || ( pPublishInfo->pPayload != NULL ) );
/* Send header first. */
bytesSent = sendPacket( pContext,
@@ -1061,20 +1083,28 @@ static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
LogDebug( ( "Sent %d bytes of PUBLISH header.",
bytesSent ) );
- /* Send Payload. */
- bytesSent = sendPacket( pContext,
- pPublishInfo->pPayload,
- pPublishInfo->payloadLength );
-
- if( bytesSent < 0 )
+ /* Send Payload if there is one to send. It is valid for a PUBLISH
+ * Packet to contain a zero length payload.*/
+ if( pPublishInfo->payloadLength > 0U )
{
- LogError( ( "Transport send failed for PUBLISH payload." ) );
- status = MQTTSendFailed;
+ bytesSent = sendPacket( pContext,
+ pPublishInfo->pPayload,
+ pPublishInfo->payloadLength );
+
+ if( bytesSent < 0 )
+ {
+ LogError( ( "Transport send failed for PUBLISH payload." ) );
+ status = MQTTSendFailed;
+ }
+ else
+ {
+ LogDebug( ( "Sent %d bytes of PUBLISH payload.",
+ bytesSent ) );
+ }
}
else
{
- LogDebug( ( "Sent %d bytes of PUBLISH payload.",
- bytesSent ) );
+ LogDebug( "PUBLISH payload was not sent. Payload length was zero." );
}
}
@@ -1097,9 +1127,9 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
assert( pContext != NULL );
assert( pIncomingPacket != NULL );
- assert( pContext->callbacks.getTime != NULL );
+ assert( pContext->getTime != NULL );
- getTimeStamp = pContext->callbacks.getTime;
+ getTimeStamp = pContext->getTime;
/* Get the entry time for the function. */
entryTimeMs = getTimeStamp();
@@ -1290,6 +1320,14 @@ static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
pPublishInfo->qos ) );
status = MQTTBadParameter;
}
+ else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
+ {
+ LogError( ( "A nonzero payload length requires a non-NULL payload: "
+ "payloadLength=%lu, pPayload=%p.",
+ ( unsigned long ) pPublishInfo->payloadLength,
+ pPublishInfo->pPayload ) );
+ status = MQTTBadParameter;
+ }
else
{
/* Empty else MISRA 15.7 */
@@ -1302,31 +1340,31 @@ static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
const TransportInterface_t * pTransportInterface,
- const MQTTApplicationCallbacks_t * pCallbacks,
+ MQTTGetCurrentTimeFunc_t getTimeFunction,
+ MQTTEventCallback_t userCallback,
const MQTTFixedBuffer_t * pNetworkBuffer )
{
MQTTStatus_t status = MQTTSuccess;
/* Validate arguments. */
if( ( pContext == NULL ) || ( pTransportInterface == NULL ) ||
- ( pCallbacks == NULL ) || ( pNetworkBuffer == NULL ) )
+ ( pNetworkBuffer == NULL ) )
{
LogError( ( "Argument cannot be NULL: pContext=%p, "
"pTransportInterface=%p, "
- "pCallbacks=%p, "
- "pNetworkBuffer=%p.",
+ "pNetworkBuffer=%p",
pContext,
pTransportInterface,
- pCallbacks,
pNetworkBuffer ) );
status = MQTTBadParameter;
}
- else if( ( pCallbacks->getTime == NULL ) || ( pCallbacks->appCallback == NULL ) ||
+ else if( ( getTimeFunction == NULL ) || ( userCallback == NULL ) ||
( pTransportInterface->recv == NULL ) || ( pTransportInterface->send == NULL ) )
{
- LogError( ( "Functions cannot be NULL: getTime=%p, appCallback=%p, recv=%p, send=%p.",
- pCallbacks->getTime,
- pCallbacks->appCallback,
+ LogError( ( "Function pointers cannot be NULL: getTimeFunction=%p, userCallback=%p, "
+ "transportRecv=%p, transportRecvSend=%p",
+ getTimeFunction,
+ userCallback,
pTransportInterface->recv,
pTransportInterface->send ) );
status = MQTTBadParameter;
@@ -1337,7 +1375,8 @@ MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
pContext->connectStatus = MQTTNotConnected;
pContext->transportInterface = *pTransportInterface;
- pContext->callbacks = *pCallbacks;
+ pContext->getTime = getTimeFunction;
+ pContext->appCallback = userCallback;
pContext->networkBuffer = *pNetworkBuffer;
/* Zero is not a valid packet ID per MQTT spec. Start from 1. */
@@ -1618,6 +1657,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
pContext->networkBuffer.pBuffer,
packetSize );
+ /* It is an error to not send the entire PINGREQ packet. */
if( bytesSent < 0 )
{
LogError( ( "Transport send failed for PINGREQ packet." ) );
@@ -1757,23 +1797,25 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
uint32_t timeoutMs )
{
MQTTStatus_t status = MQTTBadParameter;
- MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;
- if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) )
+ if( pContext == NULL )
{
- getTimeStampMs = pContext->callbacks.getTime;
- entryTimeMs = getTimeStampMs();
- status = MQTTSuccess;
- pContext->controlPacketSent = false;
+ LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
+ }
+ else if( pContext->getTime == NULL )
+ {
+ LogError( ( "Invalid input parameter: MQTT Context must have valid getTime." ) );
}
- else if( pContext == NULL )
+ else if( pContext->networkBuffer.pBuffer == NULL )
{
- LogError( ( "MQTT Context cannot be NULL." ) );
+ LogError( ( "Invalid input parameter: The MQTT context's networkBuffer must not be NULL." ) );
}
else
{
- LogError( ( "MQTT Context must set callbacks.getTime." ) );
+ entryTimeMs = pContext->getTime();
+ pContext->controlPacketSent = false;
+ status = MQTTSuccess;
}
while( status == MQTTSuccess )
@@ -1784,14 +1826,15 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
* the loop condition, and we do not want multiple breaks in a loop. */
if( status != MQTTSuccess )
{
- LogError( ( "Exiting process loop. Error status=%s",
+ LogError( ( "Exiting process loop due to failure: ErrorStatus=%s",
MQTT_Status_strerror( status ) ) );
}
else
{
/* Recalculate remaining time and check if loop should exit. This is
* done at the end so the loop will run at least a single iteration. */
- elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+ elapsedTimeMs = calculateElapsedTime( pContext->getTime(),
+ entryTimeMs );
if( elapsedTimeMs > timeoutMs )
{
@@ -1811,22 +1854,24 @@ MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
uint32_t timeoutMs )
{
MQTTStatus_t status = MQTTBadParameter;
- MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
uint32_t entryTimeMs = 0U, remainingTimeMs = timeoutMs, elapsedTimeMs = 0U;
- if( ( pContext != NULL ) && ( pContext->callbacks.getTime != NULL ) )
+ if( pContext == NULL )
{
- getTimeStampMs = pContext->callbacks.getTime;
- entryTimeMs = getTimeStampMs();
- status = MQTTSuccess;
+ LogError( ( "Invalid input parameter: MQTT Context cannot be NULL." ) );
}
- else if( pContext == NULL )
+ else if( pContext->getTime == NULL )
{
- LogError( ( "MQTT Context cannot be NULL." ) );
+ LogError( ( "Invalid input parameter: MQTT Context must have a valid getTime function." ) );
+ }
+ else if( pContext->networkBuffer.pBuffer == NULL )
+ {
+ LogError( ( "Invalid input parameter: MQTT context's networkBuffer must not be NULL." ) );
}
else
{
- LogError( ( "MQTT Context must set callbacks.getTime." ) );
+ entryTimeMs = pContext->getTime();
+ status = MQTTSuccess;
}
while( status == MQTTSuccess )
@@ -1844,7 +1889,7 @@ MQTTStatus_t MQTT_ReceiveLoop( MQTTContext_t * pContext,
{
/* Recalculate remaining time and check if loop should exit. This is
* done at the end so the loop will run at least a single iteration. */
- elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
+ elapsedTimeMs = calculateElapsedTime( pContext->getTime(), entryTimeMs );
if( elapsedTimeMs >= timeoutMs )
{
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c
index e3b9ba6f1..7ea35e3e4 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_lightweight.c
@@ -211,7 +211,7 @@ static MQTTStatus_t calculateSubscriptionPacketSize( const MQTTSubscribeInfo_t *
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] packetId Packet identifier.
* @param[in] remainingLength Remaining length of the packet.
- * @param[in] pBuffer Buffer for packet serialization.
+ * @param[in] pFixedBuffer Buffer for packet serialization.
*
* @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
* #MQTTBadParameter if invalid parameters are passed;
@@ -221,7 +221,7 @@ static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo
size_t subscriptionCount,
uint16_t packetId,
size_t remainingLength,
- const MQTTFixedBuffer_t * pBuffer );
+ const MQTTFixedBuffer_t * pFixedBuffer );
/**
* @brief Serialize an MQTT CONNECT packet in the given buffer.
@@ -229,12 +229,12 @@ static MQTTStatus_t validateSubscriptionSerializeParams( const MQTTSubscribeInfo
* @param[in] pConnectInfo MQTT CONNECT packet parameters.
* @param[in] pWillInfo Last Will and Testament. Pass NULL if not used.
* @param[in] remainingLength Remaining Length of MQTT CONNECT packet.
- * @param[out] pBuffer Buffer for packet serialization.
+ * @param[out] pFixedBuffer Buffer for packet serialization.
*/
static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
const MQTTPublishInfo_t * pWillInfo,
size_t remainingLength,
- const MQTTFixedBuffer_t * pBuffer );
+ const MQTTFixedBuffer_t * pFixedBuffer );
/**
* Prints the appropriate message for the CONNACK response code if logs are
@@ -321,7 +321,7 @@ static size_t remainingLengthEncodedSize( size_t length )
encodedSize = 4U;
}
- LogDebug( ( "Encoded size for length =%ul is %ul.",
+ LogDebug( ( "Encoded size for length %lu is %lu bytes.",
length,
encodedSize ) );
@@ -496,7 +496,7 @@ static void serializePublishCommon( const MQTTPublishInfo_t * pPublishInfo,
/* Packet Id should be non zero for QoS1 and QoS2. */
assert( ( pPublishInfo->qos == MQTTQoS0 ) || ( packetIdentifier != 0U ) );
/* Duplicate flag should be set only for Qos1 or Qos2. */
- assert( !( pPublishInfo->dup ) || ( pPublishInfo->qos != MQTTQoS0 ) );
+ assert( ( pPublishInfo->dup != true ) || ( pPublishInfo->qos != MQTTQoS0 ) );
/* Get the start address of the buffer. */
pIndex = pFixedBuffer->pBuffer;
@@ -655,7 +655,7 @@ static bool incomingPacketValid( uint8_t packetType )
/* Any other packet type is invalid. */
default:
- LogWarn( ( "Incoming packet invalid: Packet type=%u",
+ LogWarn( ( "Incoming packet invalid: Packet type=%u.",
packetType ) );
break;
}
@@ -1230,16 +1230,16 @@ static MQTTStatus_t deserializePingresp( const MQTTPacketInfo_t * pPingresp )
static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
const MQTTPublishInfo_t * pWillInfo,
size_t remainingLength,
- const MQTTFixedBuffer_t * pBuffer )
+ const MQTTFixedBuffer_t * pFixedBuffer )
{
uint8_t connectFlags = 0U;
uint8_t * pIndex = NULL;
assert( pConnectInfo != NULL );
- assert( pBuffer != NULL );
- assert( pBuffer->pBuffer != NULL );
+ assert( pFixedBuffer != NULL );
+ assert( pFixedBuffer->pBuffer != NULL );
- pIndex = pBuffer->pBuffer;
+ pIndex = pFixedBuffer->pBuffer;
/* The first byte in the CONNECT packet is the control packet type. */
*pIndex = MQTT_PACKET_TYPE_CONNECT;
pIndex++;
@@ -1337,11 +1337,11 @@ static void serializeConnectPacket( const MQTTConnectInfo_t * pConnectInfo,
}
LogDebug( ( "Length of serialized CONNECT packet is %lu.",
- ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+ ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) );
/* Ensure that the difference between the end and beginning of the buffer
* is less than the buffer size. */
- assert( ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) <= pBuffer->size );
+ assert( ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) <= pFixedBuffer->size );
}
/*-----------------------------------------------------------*/
@@ -1381,7 +1381,7 @@ MQTTStatus_t MQTT_GetConnectPacketSize( const MQTTConnectInfo_t * pConnectInfo,
* By bounding the payloadLength of the will message, the CONNECT
* packet will never be larger than 327699 bytes. */
LogError( ( "The Will Message length must not exceed %d. "
- "pWillInfo->payloadLength=%lu",
+ "pWillInfo->payloadLength=%lu.",
UINT16_MAX,
pWillInfo->payloadLength ) );
status = MQTTBadParameter;
@@ -1553,7 +1553,7 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL
size_t subscriptionCount,
uint16_t packetId,
size_t remainingLength,
- const MQTTFixedBuffer_t * pBuffer )
+ const MQTTFixedBuffer_t * pFixedBuffer )
{
size_t i = 0;
uint8_t * pIndex = NULL;
@@ -1564,11 +1564,11 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL
subscriptionCount,
packetId,
remainingLength,
- pBuffer );
+ pFixedBuffer );
if( status == MQTTSuccess )
{
- pIndex = pBuffer->pBuffer;
+ pIndex = pFixedBuffer->pBuffer;
/* The first byte in SUBSCRIBE is the packet type. */
*pIndex = MQTT_PACKET_TYPE_SUBSCRIBE;
@@ -1595,7 +1595,7 @@ MQTTStatus_t MQTT_SerializeSubscribe( const MQTTSubscribeInfo_t * pSubscriptionL
}
LogDebug( ( "Length of serialized SUBSCRIBE packet is %lu.",
- ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+ ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) );
}
return status;
@@ -1652,7 +1652,7 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio
size_t subscriptionCount,
uint16_t packetId,
size_t remainingLength,
- const MQTTFixedBuffer_t * pBuffer )
+ const MQTTFixedBuffer_t * pFixedBuffer )
{
MQTTStatus_t status = MQTTSuccess;
size_t i = 0;
@@ -1663,12 +1663,12 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio
subscriptionCount,
packetId,
remainingLength,
- pBuffer );
+ pFixedBuffer );
if( status == MQTTSuccess )
{
/* Get the start of the buffer to the iterator variable. */
- pIndex = pBuffer->pBuffer;
+ pIndex = pFixedBuffer->pBuffer;
/* The first byte in UNSUBSCRIBE is the packet type. */
*pIndex = MQTT_PACKET_TYPE_UNSUBSCRIBE;
@@ -1691,7 +1691,7 @@ MQTTStatus_t MQTT_SerializeUnsubscribe( const MQTTSubscribeInfo_t * pSubscriptio
}
LogDebug( ( "Length of serialized UNSUBSCRIBE packet is %lu.",
- ( ( size_t ) ( pIndex - pBuffer->pBuffer ) ) ) );
+ ( ( size_t ) ( pIndex - pFixedBuffer->pBuffer ) ) ) );
}
return status;
@@ -1770,7 +1770,7 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo,
/* For serializing a publish, if there exists a payload, then the buffer
* cannot be NULL. */
- else if( ( pPublishInfo->payloadLength > 0 ) && ( pPublishInfo->pPayload == NULL ) )
+ else if( ( pPublishInfo->payloadLength > 0U ) && ( pPublishInfo->pPayload == NULL ) )
{
LogError( ( "A nonzero payload length requires a non-NULL payload: "
"payloadLength=%lu, pPayload=%p.",
@@ -1792,7 +1792,7 @@ MQTTStatus_t MQTT_SerializePublish( const MQTTPublishInfo_t * pPublishInfo,
pPublishInfo->qos ) );
status = MQTTBadParameter;
}
- else if( ( pPublishInfo->dup ) && ( pPublishInfo->qos == MQTTQoS0 ) )
+ else if( ( pPublishInfo->dup == true ) && ( pPublishInfo->qos == MQTTQoS0 ) )
{
LogError( ( "Duplicate flag is set for PUBLISH with Qos 0," ) );
status = MQTTBadParameter;
@@ -1867,7 +1867,7 @@ MQTTStatus_t MQTT_SerializePublishHeader( const MQTTPublishInfo_t * pPublishInfo
pPublishInfo->qos ) );
status = MQTTBadParameter;
}
- else if( ( pPublishInfo->dup ) && ( pPublishInfo->qos == MQTTQoS0 ) )
+ else if( ( pPublishInfo->dup == true ) && ( pPublishInfo->qos == MQTTQoS0 ) )
{
LogError( ( "Duplicate flag is set for PUBLISH with Qos 0," ) );
status = MQTTBadParameter;
@@ -2096,7 +2096,7 @@ MQTTStatus_t MQTT_DeserializePublish( const MQTTPacketInfo_t * pIncomingPacket,
}
else if( ( pIncomingPacket->type & 0xF0U ) != MQTT_PACKET_TYPE_PUBLISH )
{
- LogError( ( "Packet is not publish. Packet type: %hu.",
+ LogError( ( "Packet is not publish. Packet type: %02x.",
pIncomingPacket->type ) );
status = MQTTBadParameter;
}
@@ -2227,7 +2227,7 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc,
}
else
{
- LogError( ( "Incoming packet invalid: Packet type=%u",
+ LogError( ( "Incoming packet invalid: Packet type=%u.",
pIncomingPacket->type ) );
status = MQTTBadResponse;
}
@@ -2243,7 +2243,7 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc,
else if( status != MQTTBadParameter )
{
LogError( ( "A single byte was not read from the transport: "
- "transportStatus=%d",
+ "transportStatus=%d.",
bytesReceived ) );
status = MQTTRecvFailed;
}
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c
index 42fbda686..37b23e094 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries-LTS-Beta2/c_sdk/standard/mqtt/src/mqtt_state.c
@@ -525,7 +525,7 @@ static MQTTStatus_t addRecord( MQTTPubAckInfo_t * records,
if( records[ index ].packetId == packetId )
{
/* Collision. */
- LogError( ( "Collision when adding PacketID=%u at index=%u",
+ LogError( ( "Collision when adding PacketID=%u at index=%u.",
packetId,
index ) );