summaryrefslogtreecommitdiff
path: root/FreeRTOS-Plus
diff options
context:
space:
mode:
authorrtel <rtel@1d2547de-c912-0410-9cb9-b8ca96c0e9e2>2019-07-25 20:20:24 +0000
committerrtel <rtel@1d2547de-c912-0410-9cb9-b8ca96c0e9e2>2019-07-25 20:20:24 +0000
commitb8c2a4c7a4ac68999b8b272965a9b27b849f4e2b (patch)
treeab0e3cc06f2289fb1ad9ea81a181b5f2cf93c72b /FreeRTOS-Plus
parent375e8520cf192205f6103c424e9e4e9e3adafa24 (diff)
downloadfreertos-b8c2a4c7a4ac68999b8b272965a9b27b849f4e2b.tar.gz
Files as per 190725_FreeRTOS_IoT_Libs_Task_Pool_and_MQTT_Preview interim release.
git-svn-id: http://svn.code.sf.net/p/freertos/code/trunk@2708 1d2547de-c912-0410-9cb9-b8ca96c0e9e2
Diffstat (limited to 'FreeRTOS-Plus')
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url5
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj4
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters12
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c13
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c1
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url6
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj2
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters6
-rw-r--r--FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c12
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h547
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h8
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c101
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c210
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c158
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c4
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c4
-rw-r--r--FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h157
17 files changed, 968 insertions, 282 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url
new file mode 100644
index 000000000..8a5a8617e
--- /dev/null
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/READ_ME_INSTRUCTIONS.url
@@ -0,0 +1,5 @@
+[{000214A0-0000-0000-C000-000000000046}]
+Prop3=19,11
+[InternetShortcut]
+IDList=
+URL=https://www.freertos.org/mqtt/
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj
index dda52fb5b..b92278318 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj
@@ -198,10 +198,6 @@
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_taskpool_internal.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\types\iot_taskpool_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOSIPConfigDefaults.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOS_ARP.h" />
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters
index 4edf58197..98cdff0ad 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/WIN32.vcxproj.filters
@@ -286,18 +286,6 @@
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
</ClInclude>
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent.h">
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
- </ClInclude>
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_agent_config_defaults.h">
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
- </ClInclude>
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_config_defaults.h">
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
- </ClInclude>
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\iot_mqtt_lib.h">
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include</Filter>
- </ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\mqtt\include\types\iot_mqtt_types.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\mqtt\include\types</Filter>
</ClInclude>
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c
index 213defd03..bedfd3e85 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/mqtt/main.c
@@ -25,9 +25,9 @@
* 1 tab == 4 spaces!
*/
-/*
- * TBD
- */
+ /***
+ * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.
+ ***/
/* Standard includes. */
#include <stdio.h>
@@ -102,10 +102,9 @@ static UBaseType_t ulNextRand;
int main( void )
{
- /*
- * Instructions for using this project are provided on:
- * TBD
- */
+ /***
+ * See https://www.FreeRTOS.org/mqtt/index.html for configuration and usage instructions.
+ ***/
/* Miscellaneous initialisation including preparing the logging and seeding
the random number generator. */
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c
index f1c7a42a4..fb2eaca26 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/DemoTasks/SimpleTaskPoolExamples.c
@@ -25,7 +25,6 @@
* 1 tab == 4 spaces!
*/
-//_RB_ Add link to docs here.
/* Kernel includes. */
#include "FreeRTOS.h"
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url
new file mode 100644
index 000000000..c00147b27
--- /dev/null
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/READ_ME_INSTRUCTIONS.url
@@ -0,0 +1,6 @@
+[{000214A0-0000-0000-C000-000000000046}]
+Prop3=19,11
+[InternetShortcut]
+IDList=
+URL=https://www.freertos.org/task-pool/
+HotKey=0
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj
index 4c7704c70..7f9596ad3 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj
@@ -177,7 +177,7 @@
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\task.h" />
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\timers.h" />
<ClInclude Include="..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW\portmacro.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h" />
+ <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\iot_taskpool.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\c_sdk\standard\common\include\private\iot_error.h" />
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters
index 4f426c818..3653a6cbe 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/WIN32.vcxproj.filters
@@ -250,11 +250,11 @@
</ClInclude>
<ClInclude Include="iot_config.h" />
<ClInclude Include="iot_config_common.h" />
- <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_afr.h">
- <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform</Filter>
- </ClInclude>
<ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\include\types\iot_platform_types.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\include\types</Filter>
</ClInclude>
+ <ClInclude Include="..\..\..\Source\FreeRTOS-IoT-Libraries\abstractions\platform\freertos\include\platform\iot_platform_types_freertos.h">
+ <Filter>FreeRTOS+\FreeRTOS IoT Libraries\abstractions\platform\freertos\include\platform\types</Filter>
+ </ClInclude>
</ItemGroup>
</Project> \ No newline at end of file
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c
index 7f42f9ab2..2cf6b3829 100644
--- a/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c
+++ b/FreeRTOS-Plus/Demo/FreeRTOS_IoT_Libraries/task_pool/main.c
@@ -25,6 +25,11 @@
* 1 tab == 4 spaces!
*/
+ /***
+ * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.
+ ***/
+
+
/* Standard includes. */
#include <stdio.h>
#include <time.h>
@@ -55,10 +60,9 @@ const uint8_t ucMACAddress[ 6 ] = { configMAC_ADDR0, configMAC_ADDR1, configMAC_
int main( void )
{
- /*
- * Instructions for using this project are provided on:
- * TBD
- */
+ /***
+ * See https://www.FreeRTOS.org/task-pool/ for configuration and usage instructions.
+ ***/
/* Create the example that demonstrates task pool functionality. Examples
that demonstrate networking connectivity will be added in future projects
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h
new file mode 100644
index 000000000..d44593d8f
--- /dev/null
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/common/include/atomic.h
@@ -0,0 +1,547 @@
+/*
+ * FreeRTOS Kernel V10.2.0
+ * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * http://www.FreeRTOS.org
+ * http://aws.amazon.com/freertos
+ *
+ * 1 tab == 4 spaces!
+ */
+
+/**
+ * @file atomic.h
+ * @brief FreeRTOS atomic operation support.
+ *
+ * Two implementations of atomic are given in this header file:
+ * 1. Disabling interrupt globally.
+ * 2. ISA native atomic support.
+ * The former is available to all ports (compiler-architecture combination),
+ * while the latter is only available to ports compiling with GCC (version at
+ * least 4.7.0), which also have ISA atomic support.
+ *
+ * User can select which implementation to use by:
+ * setting/clearing configUSE_ATOMIC_INSTRUCTION in FreeRTOSConfig.h.
+ * Define AND set configUSE_ATOMIC_INSTRUCTION to 1 for ISA native atomic support.
+ * Undefine OR clear configUSE_ATOMIC_INSTRUCTION for disabling global interrupt
+ * implementation.
+ *
+ * @see GCC Built-in Functions for Memory Model Aware Atomic Operations
+ * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
+ */
+
+#ifndef ATOMIC_H
+#define ATOMIC_H
+
+#ifndef INC_FREERTOS_H
+ #error "include FreeRTOS.h must appear in source files before include atomic.h"
+#endif
+
+/* Standard includes. */
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ /* Needed for __atomic_compare_exchange() weak=false. */
+ #include <stdbool.h>
+
+ /* This branch is for GCC compiler and GCC compiler only. */
+ #ifndef portFORCE_INLINE
+ #define portFORCE_INLINE inline __attribute__((always_inline))
+ #endif
+
+#else
+
+ /* Port specific definitions -- entering/exiting critical section.
+ * Refer template -- ./lib/FreeRTOS/portable/Compiler/Arch/portmacro.h
+ *
+ * Every call to ATOMIC_EXIT_CRITICAL() must be closely paired with
+ * ATOMIC_ENTER_CRITICAL().
+ */
+ #if defined( portSET_INTERRUPT_MASK_FROM_ISR )
+
+ /* Nested interrupt scheme is supported in this port. */
+ #define ATOMIC_ENTER_CRITICAL() \
+ UBaseType_t uxCriticalSectionType = portSET_INTERRUPT_MASK_FROM_ISR()
+
+ #define ATOMIC_EXIT_CRITICAL() \
+ portCLEAR_INTERRUPT_MASK_FROM_ISR( uxCriticalSectionType )
+
+ #else
+
+ /* Nested interrupt scheme is NOT supported in this port. */
+ #define ATOMIC_ENTER_CRITICAL() portENTER_CRITICAL()
+ #define ATOMIC_EXIT_CRITICAL() portEXIT_CRITICAL()
+
+ #endif /* portSET_INTERRUPT_MASK_FROM_ISR() */
+
+ /* Port specific definition -- "always inline".
+ * Inline is compiler specific, and may not always get inlined depending on your optimization level.
+ * Also, inline is considerred as performance optimization for atomic.
+ * Thus, if portFORCE_INLINE is not provided by portmacro.h, instead of resulting error,
+ * simply define it.
+ */
+ #ifndef portFORCE_INLINE
+ #define portFORCE_INLINE
+ #endif
+
+#endif /* configUSE_GCC_BUILTIN_ATOMICS */
+
+#define ATOMIC_COMPARE_AND_SWAP_SUCCESS 0x1U /**< Compare and swap succeeded, swapped. */
+#define ATOMIC_COMPARE_AND_SWAP_FAILURE 0x0U /**< Compare and swap failed, did not swap. */
+
+/*----------------------------- Swap && CAS ------------------------------*/
+
+/**
+ * Atomic compare-and-swap
+ *
+ * @brief Performs an atomic compare-and-swap operation on the specified values.
+ *
+ * @param[in, out] pDestination Pointer to memory location from where value is
+ * to be loaded and checked.
+ * @param[in] ulExchange If condition meets, write this value to memory.
+ * @param[in] ulComparand Swap condition.
+ *
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.
+ *
+ * @note This function only swaps *pDestination with ulExchange, if previous
+ * *pDestination value equals ulComparand.
+ */
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwap_u32(
+ uint32_t volatile * pDestination,
+ uint32_t ulExchange,
+ uint32_t ulComparand )
+{
+
+ uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;
+
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ if ( __atomic_compare_exchange( pDestination,
+ &ulComparand,
+ &ulExchange,
+ false,
+ __ATOMIC_SEQ_CST,
+ __ATOMIC_SEQ_CST ) )
+ {
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
+ }
+
+#else
+
+ ATOMIC_ENTER_CRITICAL();
+
+ if ( *pDestination == ulComparand )
+ {
+ *pDestination = ulExchange;
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
+ }
+
+ ATOMIC_EXIT_CRITICAL();
+
+#endif
+
+ return ulReturnValue;
+
+}
+
+/**
+ * Atomic swap (pointers)
+ *
+ * @brief Atomically sets the address pointed to by *ppDestination to the value
+ * of *pExchange.
+ *
+ * @param[in, out] ppDestination Pointer to memory location from where a pointer
+ * value is to be loaded and written back to.
+ * @param[in] pExchange Pointer value to be written to *ppDestination.
+ *
+ * @return The initial value of *ppDestination.
+ */
+static portFORCE_INLINE void * Atomic_SwapPointers_p32(
+ void * volatile * ppDestination,
+ void * pExchange )
+{
+ void * pReturnValue;
+
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ __atomic_exchange( ppDestination, &pExchange, &pReturnValue, __ATOMIC_SEQ_CST );
+
+#else
+
+ ATOMIC_ENTER_CRITICAL();
+
+ pReturnValue = *ppDestination;
+
+ *ppDestination = pExchange;
+
+ ATOMIC_EXIT_CRITICAL();
+
+#endif
+
+ return pReturnValue;
+}
+
+/**
+ * Atomic compare-and-swap (pointers)
+ *
+ * @brief Performs an atomic compare-and-swap operation on the specified pointer
+ * values.
+ *
+ * @param[in, out] ppDestination Pointer to memory location from where a pointer
+ * value is to be loaded and checked.
+ * @param[in] pExchange If condition meets, write this value to memory.
+ * @param[in] pComparand Swap condition.
+ *
+ * @return Unsigned integer of value 1 or 0. 1 for swapped, 0 for not swapped.
+ *
+ * @note This function only swaps *ppDestination with pExchange, if previous
+ * *ppDestination value equals pComparand.
+ */
+static portFORCE_INLINE uint32_t Atomic_CompareAndSwapPointers_p32(
+ void * volatile * ppDestination,
+ void * pExchange, void * pComparand )
+{
+ uint32_t ulReturnValue = ATOMIC_COMPARE_AND_SWAP_FAILURE;
+
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+ if ( __atomic_compare_exchange( ppDestination,
+ &pComparand,
+ &pExchange,
+ false,
+ __ATOMIC_SEQ_CST,
+ __ATOMIC_SEQ_CST ) )
+ {
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
+ }
+
+#else
+
+ ATOMIC_ENTER_CRITICAL();
+
+ if ( *ppDestination == pComparand )
+ {
+ *ppDestination = pExchange;
+ ulReturnValue = ATOMIC_COMPARE_AND_SWAP_SUCCESS;
+ }
+
+ ATOMIC_EXIT_CRITICAL();
+
+#endif
+
+ return ulReturnValue;
+}
+
+
+/*----------------------------- Arithmetic ------------------------------*/
+
+/**
+ * Atomic add
+ *
+ * @brief Atomically adds count to the value of the specified pointer points to.
+ *
+ * @param[in,out] pAddend Pointer to memory location from where value is to be
+ * loaded and written back to.
+ * @param[in] ulCount Value to be added to *pAddend.
+ *
+ * @return previous *pAddend value.
+ */
+static portFORCE_INLINE uint32_t Atomic_Add_u32(
+ uint32_t volatile * pAddend,
+ uint32_t ulCount )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_add(pAddend, ulCount, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pAddend;
+
+ *pAddend += ulCount;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic subtract
+ *
+ * @brief Atomically subtracts count from the value of the specified pointer
+ * pointers to.
+ *
+ * @param[in,out] pAddend Pointer to memory location from where value is to be
+ * loaded and written back to.
+ * @param[in] ulCount Value to be subtract from *pAddend.
+ *
+ * @return previous *pAddend value.
+ */
+static portFORCE_INLINE uint32_t Atomic_Subtract_u32(
+ uint32_t volatile * pAddend,
+ uint32_t ulCount )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_sub(pAddend, ulCount, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pAddend;
+
+ *pAddend -= ulCount;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic increment
+ *
+ * @brief Atomically increments the value of the specified pointer points to.
+ *
+ * @param[in,out] pAddend Pointer to memory location from where value is to be
+ * loaded and written back to.
+ *
+ * @return *pAddend value before increment.
+ */
+static portFORCE_INLINE uint32_t Atomic_Increment_u32( uint32_t volatile * pAddend )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_add(pAddend, 1, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pAddend;
+
+ *pAddend += 1;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic decrement
+ *
+ * @brief Atomically decrements the value of the specified pointer points to
+ *
+ * @param[in,out] pAddend Pointer to memory location from where value is to be
+ * loaded and written back to.
+ *
+ * @return *pAddend value before decrement.
+ */
+static portFORCE_INLINE uint32_t Atomic_Decrement_u32( uint32_t volatile * pAddend )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_sub(pAddend, 1, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pAddend;
+
+ *pAddend -= 1;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/*----------------------------- Bitwise Logical ------------------------------*/
+
+/**
+ * Atomic OR
+ *
+ * @brief Performs an atomic OR operation on the specified values.
+ *
+ * @param [in, out] pDestination Pointer to memory location from where value is
+ * to be loaded and written back to.
+ * @param [in] ulValue Value to be ORed with *pDestination.
+ *
+ * @return The original value of *pDestination.
+ */
+static portFORCE_INLINE uint32_t Atomic_OR_u32(
+ uint32_t volatile * pDestination,
+ uint32_t ulValue )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_or(pDestination, ulValue, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pDestination;
+
+ *pDestination |= ulValue;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic AND
+ *
+ * @brief Performs an atomic AND operation on the specified values.
+ *
+ * @param [in, out] pDestination Pointer to memory location from where value is
+ * to be loaded and written back to.
+ * @param [in] ulValue Value to be ANDed with *pDestination.
+ *
+ * @return The original value of *pDestination.
+ */
+static portFORCE_INLINE uint32_t Atomic_AND_u32(
+ uint32_t volatile * pDestination,
+ uint32_t ulValue )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_and(pDestination, ulValue, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pDestination;
+
+ *pDestination &= ulValue;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic NAND
+ *
+ * @brief Performs an atomic NAND operation on the specified values.
+ *
+ * @param [in, out] pDestination Pointer to memory location from where value is
+ * to be loaded and written back to.
+ * @param [in] ulValue Value to be NANDed with *pDestination.
+ *
+ * @return The original value of *pDestination.
+ */
+static portFORCE_INLINE uint32_t Atomic_NAND_u32(
+ uint32_t volatile * pDestination,
+ uint32_t ulValue )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_nand(pDestination, ulValue, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pDestination;
+
+ *pDestination = ~(ulCurrent & ulValue);
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+/**
+ * Atomic XOR
+ *
+ * @brief Performs an atomic XOR operation on the specified values.
+ *
+ * @param [in, out] pDestination Pointer to memory location from where value is
+ * to be loaded and written back to.
+ * @param [in] ulValue Value to be XORed with *pDestination.
+ *
+ * @return The original value of *pDestination.
+ */
+static portFORCE_INLINE uint32_t Atomic_XOR_u32(
+ uint32_t volatile * pDestination,
+ uint32_t ulValue )
+{
+#if defined ( configUSE_GCC_BUILTIN_ATOMICS ) && ( configUSE_GCC_BUILTIN_ATOMICS == 1 )
+
+ return __atomic_fetch_xor(pDestination, ulValue, __ATOMIC_SEQ_CST);
+
+#else
+
+ uint32_t ulCurrent;
+
+ ATOMIC_ENTER_CRITICAL();
+
+ ulCurrent = *pDestination;
+
+ *pDestination ^= ulValue;
+
+ ATOMIC_EXIT_CRITICAL();
+
+ return ulCurrent;
+
+#endif
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ATOMIC_H */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h
index 327784a3e..dad106a68 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/include/iot_mqtt.h
@@ -427,7 +427,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pSubscribeOperation );
+ IotMqttOperation_t * const pSubscribeOperation );
/* @[declare_mqtt_subscribe] */
/**
@@ -512,7 +512,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pUnsubscribeOperation );
+ IotMqttOperation_t * const pUnsubscribeOperation );
/* @[declare_mqtt_unsubscribe] */
/**
@@ -642,7 +642,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pPublishOperation );
+ IotMqttOperation_t * const pPublishOperation );
/* @[declare_mqtt_publish] */
/**
@@ -817,7 +817,7 @@ const char * IotMqtt_OperationType( IotMqttOperationType_t operation );
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
- IotMqttSubscription_t * pCurrentSubscription );
+ IotMqttSubscription_t * const pCurrentSubscription );
/* @[declare_mqtt_issubscribed] */
#endif /* ifndef IOT_MQTT_H_ */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
index 23133f46b..0d6a259ed 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c
@@ -94,7 +94,7 @@ static void _mqttSubscription_tryDestroy( void * pData );
static void _mqttOperation_tryDestroy( void * pData );
/**
- * @brief Create a keep-alive job for an MQTT connection.
+ * @brief Initialize the keep-alive operation for an MQTT connection.
*
* @param[in] pNetworkInfo User-provided network information for the new
* connection.
@@ -103,9 +103,9 @@ static void _mqttOperation_tryDestroy( void * pData );
*
* @return `true` if the keep-alive job was successfully created; `false` otherwise.
*/
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
- uint16_t keepAliveSeconds,
- _mqttConnection_t * pMqttConnection );
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
+ uint16_t keepAliveSeconds,
+ _mqttConnection_t * pMqttConnection );
/**
* @brief Creates a new MQTT connection and initializes its members.
@@ -141,7 +141,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pOperationReference );
+ IotMqttOperation_t * const pOperationReference );
/*-----------------------------------------------------------*/
@@ -238,9 +238,9 @@ static void _mqttOperation_tryDestroy( void * pData )
/*-----------------------------------------------------------*/
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
- uint16_t keepAliveSeconds,
- _mqttConnection_t * pMqttConnection )
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,
+ uint16_t keepAliveSeconds,
+ _mqttConnection_t * pMqttConnection )
{
bool status = true;
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
@@ -253,9 +253,12 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqttError_t ( * serializePingreq )( uint8_t **,
size_t * ) = _IotMqtt_SerializePingreq;
+ /* Set PINGREQ operation members. */
+ pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;
+
/* Convert the keep-alive interval to milliseconds. */
- pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;
+ pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;
/* Choose a PINGREQ serializer function. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
@@ -277,8 +280,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Generate a PINGREQ packet. */
- serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),
- &( pMqttConnection->pingreqPacketSize ) );
+ serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),
+ &( pMqttConnection->pingreq.u.operation.packetSize ) );
if( serializeStatus != IOT_MQTT_SUCCESS )
{
@@ -291,8 +294,8 @@ static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,
/* Create the task pool job that processes keep-alive. */
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,
pMqttConnection,
- &( pMqttConnection->keepAliveJobStorage ),
- &( pMqttConnection->keepAliveJob ) );
+ &( pMqttConnection->pingreq.jobStorage ),
+ &( pMqttConnection->pingreq.job ) );
/* Task pool job creation for a pre-allocated job should never fail.
* Abort the program if it does. */
@@ -408,9 +411,9 @@ static _mqttConnection_t * _createMqttConnection( bool awsIotMqttMode,
/* Check if keep-alive is active for this connection. */
if( keepAliveSeconds != 0 )
{
- if( _createKeepAliveJob( pNetworkInfo,
- keepAliveSeconds,
- pMqttConnection ) == false )
+ if( _createKeepAliveOperation( pNetworkInfo,
+ keepAliveSeconds,
+ pMqttConnection ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
@@ -471,17 +474,31 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
{
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;
+ /* Default free packet function. */
+ void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;
+
/* Clean up keep-alive if still allocated. */
- if( pMqttConnection->keepAliveMs != 0 )
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
+ /* Choose a function to free the packet. */
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
+ if( pMqttConnection->pSerializer != NULL )
+ {
+ if( pMqttConnection->pSerializer->freePacket != NULL )
+ {
+ freePacket = pMqttConnection->pSerializer->freePacket;
+ }
+ }
+ #endif
+
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
- pMqttConnection->keepAliveMs = 0;
- pMqttConnection->pPingreqPacket = NULL;
- pMqttConnection->pingreqPacketSize = 0;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
+ pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Decrement reference count. */
pMqttConnection->references--;
@@ -494,9 +511,9 @@ static void _destroyMqttConnection( _mqttConnection_t * pMqttConnection )
/* A connection to be destroyed should have no keep-alive and at most 1
* reference. */
IotMqtt_Assert( pMqttConnection->references <= 1 );
- IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );
- IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );
/* Remove all subscriptions. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
@@ -546,7 +563,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pOperationReference )
+ IotMqttOperation_t * const pOperationReference )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pSubscriptionOperation = NULL;
@@ -666,7 +683,7 @@ static IotMqttError_t _subscriptionCommon( IotMqttOperationType_t operation,
/* Check the subscription operation data and set the operation type. */
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
- IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );
pSubscriptionOperation->u.operation.type = operation;
/* Generate a subscription packet from the subscription list. */
@@ -853,7 +870,7 @@ IotMqttError_t IotMqtt_Init( void )
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Log initialization status. */
- if( status != IOT_MQTT_SUCCESS ) //_RB_ This will generate compiler warnings if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES != 0
+ if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "Failed to initialize MQTT library serializer. " );
}
@@ -896,7 +913,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
_mqttConnection_t * pNewMqttConnection = NULL;
/* Default CONNECT serializer function. */
- IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *, //_RB_ Needs to be a typedef to make it easier to rease and more maintainable should the prototype change.
+ IotMqttError_t ( * serializeConnect )( const IotMqttConnectInfo_t *,
uint8_t **,
size_t * ) = _IotMqtt_SerializeConnect;
@@ -911,7 +928,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
}
/* Validate network interface and connect info. */
- if( _IotMqtt_ValidateConnect( pConnectInfo ) == false ) //_RB_ A lot of code in here that could be replaced by asserts().
+ if( _IotMqtt_ValidateConnect( pConnectInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
@@ -1002,7 +1019,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotLogInfo( "Establishing new MQTT connection." );
- /* Initialize a new MQTT connection object. *///_RB_ Initialise, as per the comment, or create, as per the function name? I don't think this does create a connection as that happens below.
+ /* Initialize a new MQTT connection object. */
pNewMqttConnection = _createMqttConnection( pConnectInfo->awsIotMqttMode,
pNetworkInfo,
pConnectInfo->keepAliveSeconds );
@@ -1059,7 +1076,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_CONNECT;
@@ -1127,7 +1144,7 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
/* Add the CONNECT operation to the send queue for network transmission. */
- status = _IotMqtt_ScheduleOperation( pOperation, // Why schedule a job if going to wait for comletion?
+ status = _IotMqtt_ScheduleOperation( pOperation,
_IotMqtt_ProcessSend,
0 );
@@ -1150,13 +1167,13 @@ IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
if( status == IOT_MQTT_SUCCESS )
{
/* Check if a keep-alive job should be scheduled. */
- if( pNewMqttConnection->keepAliveMs != 0 )
+ if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
IotLogDebug( "Scheduling first MQTT keep-alive job." );
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,
- pNewMqttConnection->keepAliveJob,
- pNewMqttConnection->nextKeepAliveMs );
+ pNewMqttConnection->pingreq.job,
+ pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )
{
@@ -1268,7 +1285,7 @@ void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )
== IOT_MQTT_FLAG_WAITABLE );
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );
/* Set the operation type. */
pOperation->u.operation.type = IOT_MQTT_DISCONNECT;
@@ -1389,7 +1406,7 @@ IotMqttError_t IotMqtt_Subscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pSubscribeOperation )
+ IotMqttOperation_t * const pSubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,
mqttConnection,
@@ -1445,7 +1462,7 @@ IotMqttError_t IotMqtt_Unsubscribe( IotMqttConnection_t mqttConnection,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pUnsubscribeOperation )
+ IotMqttOperation_t * const pUnsubscribeOperation )
{
return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,
mqttConnection,
@@ -1500,7 +1517,7 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
- IotMqttOperation_t * pPublishOperation )
+ IotMqttOperation_t * const pPublishOperation )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pOperation = NULL;
@@ -1651,8 +1668,8 @@ IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
/* A QoS 0 PUBLISH may not be retried. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
- pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;
- pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;
+ pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;
+ pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;
}
else
{
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
index 169a292df..42843d28c 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
@@ -43,6 +43,9 @@
/* Platform layer includes. */
#include "platform/iot_threads.h"
+/* Atomics include. */
+#include "iot_atomic.h"
+
/*-----------------------------------------------------------*/
/**
@@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier );
+/**
+ * @brief Flush a packet from the stream of incoming data.
+ *
+ * This function is called when memory for a packet cannot be allocated. The
+ * packet is flushed from the stream of incoming data so that the next packet
+ * may be read.
+ *
+ * @param[in] pNetworkConnection Network connection to use for receive, which
+ * may be different from the network connection associated with the MQTT connection.
+ * @param[in] pMqttConnection The associated MQTT connection.
+ * @param[in] length The length of the packet to flush.
+ */
+static void _flushPacket( void * pNetworkConnection,
+ const _mqttConnection_t * pMqttConnection,
+ size_t length );
+
/*-----------------------------------------------------------*/
static bool _incomingPacketValid( uint8_t packetType )
@@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
if( pIncomingPacket->pRemainingData == NULL )
{
+ IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
+ "%lu for incoming packet type %lu.",
+ pMqttConnection,
+ ( unsigned long ) pIncomingPacket->remainingLength,
+ ( unsigned long ) pIncomingPacket->type );
+
+ _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
+
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
@@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
if( status == IOT_MQTT_SUCCESS )
{
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
- if( pMqttConnection->keepAliveFailure == false )
+ if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
+ 0,
+ 1 ) == 1 )
{
- IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
- pMqttConnection );
+ IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
+ pMqttConnection );
}
else
{
- IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
- pMqttConnection );
-
- pMqttConnection->keepAliveFailure = false;
+ IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
+ pMqttConnection );
}
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
else
{
@@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier )
{
- IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
- uint8_t * pPuback = NULL;
- size_t pubackSize = 0, bytesSent = 0;
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
+ _mqttOperation_t * pPubackOperation = NULL;
- /* Default PUBACK serializer and free packet functions. */
+ /* Default PUBACK serializer function. */
IotMqttError_t ( * serializePuback )( uint16_t,
uint8_t **,
size_t * ) = _IotMqtt_SerializePuback;
- void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
pMqttConnection,
@@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection,
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
- if( pMqttConnection->pSerializer != NULL )
- {
- if( pMqttConnection->pSerializer->freePacket != NULL )
- {
- freePacket = pMqttConnection->pSerializer->freePacket;
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
+
+ /* Create a PUBACK operation. */
+ status = _IotMqtt_CreateOperation( pMqttConnection,
+ 0,
+ NULL,
+ &pPubackOperation );
+
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ IOT_GOTO_CLEANUP();
+ }
+
+ /* Set the operation type. */
+ pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
/* Generate a PUBACK packet from the packet identifier. */
- serializeStatus = serializePuback( packetIdentifier,
- &pPuback,
- &pubackSize );
+ status = serializePuback( packetIdentifier,
+ &( pPubackOperation->u.operation.pMqttPacket ),
+ &( pPubackOperation->u.operation.packetSize ) );
+
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ IOT_GOTO_CLEANUP();
+ }
+
+ /* Add the PUBACK operation to the send queue for network transmission. */
+ status = _IotMqtt_ScheduleOperation( pPubackOperation,
+ _IotMqtt_ProcessSend,
+ 0 );
- if( serializeStatus != IOT_MQTT_SUCCESS )
+ if( status != IOT_MQTT_SUCCESS )
{
- IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "
- "received PUBLISH %hu.",
- pMqttConnection,
- packetIdentifier );
+ IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
+ pMqttConnection );
+
+ IOT_GOTO_CLEANUP();
}
else
{
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
- pPuback,
- pubackSize );
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Clean up on error. */
+ IOT_FUNCTION_CLEANUP_BEGIN();
- if( bytesSent != pubackSize )
+ if( status != IOT_MQTT_SUCCESS )
+ {
+ if( pPubackOperation != NULL )
{
- IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"
- " PUBLISH %hu.",
- pMqttConnection,
- packetIdentifier );
+ _IotMqtt_DestroyOperation( pPubackOperation );
}
else
{
- IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",
- pMqttConnection,
- packetIdentifier );
+ EMPTY_ELSE_MARKER;
}
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+}
+
+/*-----------------------------------------------------------*/
- freePacket( pPuback );
+static void _flushPacket( void * pNetworkConnection,
+ const _mqttConnection_t * pMqttConnection,
+ size_t length )
+{
+ size_t bytesFlushed = 0;
+ uint8_t receivedByte = 0;
+
+ for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
+ {
+ ( void ) _IotMqtt_GetNextByte( pNetworkConnection,
+ pMqttConnection->pNetworkInterface,
+ &receivedByte );
}
}
@@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
+ void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
+
+ /* Disconnect callback function. */
+ void ( * disconnectCallback )( void *,
+ IotMqttCallbackParam_t * ) = NULL;
+
+ /* Network close function. */
+ IotNetworkError_t ( * closeConnection) ( void * ) = NULL;
+
+ /* Default free packet function. */
+ void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
pMqttConnection->disconnected = true;
- pMqttConnection->keepAliveFailure = true;
- if( pMqttConnection->keepAliveMs != 0 )
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
/* Keep-alive must have a PINGREQ allocated. */
- IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
/* PINGREQ provides a reference to the connection, so reference count must
* be nonzero. */
@@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
/* Attempt to cancel the keep-alive job. */
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
- pMqttConnection->keepAliveJob,
+ pMqttConnection->pingreq.job,
NULL );
/* If the keep-alive job was not canceled, it must be already executing.
@@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
* the executing keep-alive job will clean up itself. */
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
- /* Clean up PINGREQ packet and job. */
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
+ /* Choose a function to free the packet. */
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
+ if( pMqttConnection->pSerializer != NULL )
+ {
+ if( pMqttConnection->pSerializer->freePacket != NULL )
+ {
+ freePacket = pMqttConnection->pSerializer->freePacket;
+ }
+ }
+ #endif
+
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
- pMqttConnection->keepAliveMs = 0;
- pMqttConnection->pPingreqPacket = NULL;
- pMqttConnection->pingreqPacketSize = 0;
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
+ pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Keep-alive is cleaned up; decrement reference count. Since this
* function must be followed with a call to DISCONNECT, a check to
@@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
EMPTY_ELSE_MARKER;
}
+ /* Copy the function pointers and contexts, as the MQTT connection may be
+ * modified after the mutex is released. */
+ disconnectCallback = pMqttConnection->disconnectCallback.function;
+ pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
+
+ closeConnection = pMqttConnection->pNetworkInterface->close;
+ pNetworkConnection = pMqttConnection->pNetworkConnection;
+
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Close the network connection. */
- if( pMqttConnection->pNetworkInterface->close != NULL )
+ if( closeConnection != NULL )
{
- closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );
+ closeStatus = closeConnection( pNetworkConnection );
if( closeStatus == IOT_NETWORK_SUCCESS )
{
@@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
}
/* Invoke the disconnect callback. */
- if( pMqttConnection->disconnectCallback.function != NULL )
+ if( disconnectCallback != NULL )
{
/* Set the members of the callback parameter. */
callbackParam.mqttConnection = pMqttConnection;
callbackParam.u.disconnectReason = disconnectReason;
- pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,
- &callbackParam );
+ disconnectCallback( pDisconnectCallbackContext,
+ &callbackParam );
}
else
{
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
index 9e4ec282e..af343f637 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_operation.c
@@ -44,6 +44,9 @@
#include "platform/iot_clock.h"
#include "platform/iot_threads.h"
+/* Atomics include. */
+#include "iot_atomic.h"
+
/*-----------------------------------------------------------*/
/**
@@ -133,7 +136,7 @@ static bool _mqttOperation_match( const IotLink_t * pOperationLink,
static bool _checkRetryLimit( _mqttOperation_t * pOperation )
{
_mqttConnection_t * pMqttConnection = pOperation->pMqttConnection;
- bool status = true;
+ bool status = true, setDup = false;
/* Choose a set DUP function. */
void ( * publishSetDup )( uint8_t *,
@@ -162,36 +165,65 @@ static bool _checkRetryLimit( _mqttOperation_t * pOperation )
IotMqtt_Assert( pOperation->u.operation.type == IOT_MQTT_PUBLISH_TO_SERVER );
/* Check if the retry limit is exhausted. */
- if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
+ if( pOperation->u.operation.periodic.retry.count > pOperation->u.operation.periodic.retry.limit )
{
/* The retry count may be at most one more than the retry limit, which
* accounts for the final check for a PUBACK. */
- IotMqtt_Assert( pOperation->u.operation.retry.count == pOperation->u.operation.retry.limit + 1 );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.count ==
+ pOperation->u.operation.periodic.retry.limit + 1 );
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) No response received after %lu retries.",
pMqttConnection,
pOperation,
- pOperation->u.operation.retry.limit );
+ pOperation->u.operation.periodic.retry.limit );
status = false;
}
- /* Check if this is the first retry. */
- else if( pOperation->u.operation.retry.count == 1 )
- {
- /* Always set the DUP flag on the first retry. */
- publishSetDup( pOperation->u.operation.pMqttPacket,
- pOperation->u.operation.pPacketIdentifierHigh,
- &( pOperation->u.operation.packetIdentifier ) );
- }
else
{
- /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
- * identifier) must be reset on every retry. */
- if( pMqttConnection->awsIotMqttMode == true )
+ if( pOperation->u.operation.periodic.retry.count == 1 )
+ {
+ /* The DUP flag should always be set on the first retry. */
+ setDup = true;
+ }
+ else if( pMqttConnection->awsIotMqttMode == true )
{
+ /* In AWS IoT MQTT mode, the DUP flag (really a change to the packet
+ * identifier) must be reset on every retry. */
+ setDup = true;
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ if( setDup == true )
+ {
+ /* In AWS IoT MQTT mode, the references mutex must be locked to
+ * prevent the packet identifier from being read while it is being
+ * changed. */
+ if( pMqttConnection->awsIotMqttMode == true )
+ {
+ IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
+
+ /* Always set the DUP flag on the first retry. */
publishSetDup( pOperation->u.operation.pMqttPacket,
pOperation->u.operation.pPacketIdentifierHigh,
&( pOperation->u.operation.packetIdentifier ) );
+
+ if( pMqttConnection->awsIotMqttMode == true )
+ {
+ IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
+ }
+ else
+ {
+ EMPTY_ELSE_MARKER;
+ }
}
else
{
@@ -213,14 +245,16 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
/* This function should never be called with retry count greater than
* retry limit. */
- IotMqtt_Assert( pOperation->u.operation.retry.count <= pOperation->u.operation.retry.limit );
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.count <=
+ pOperation->u.operation.periodic.retry.limit );
/* Increment the retry count. */
- ( pOperation->u.operation.retry.count )++;
+ ( pOperation->u.operation.periodic.retry.count )++;
/* Check for a response shortly for the final retry. Otherwise, calculate the
* next retry period. */
- if( pOperation->u.operation.retry.count > pOperation->u.operation.retry.limit )
+ if( pOperation->u.operation.periodic.retry.count >
+ pOperation->u.operation.periodic.retry.limit )
{
scheduleDelay = IOT_MQTT_RESPONSE_WAIT_MS;
@@ -232,14 +266,14 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
}
else
{
- scheduleDelay = pOperation->u.operation.retry.nextPeriod;
+ scheduleDelay = pOperation->u.operation.periodic.retry.nextPeriodMs;
/* Double the retry period, subject to a ceiling value. */
- pOperation->u.operation.retry.nextPeriod *= 2;
+ pOperation->u.operation.periodic.retry.nextPeriodMs *= 2;
- if( pOperation->u.operation.retry.nextPeriod > IOT_MQTT_RETRY_MS_CEILING )
+ if( pOperation->u.operation.periodic.retry.nextPeriodMs > IOT_MQTT_RETRY_MS_CEILING )
{
- pOperation->u.operation.retry.nextPeriod = IOT_MQTT_RETRY_MS_CEILING;
+ pOperation->u.operation.periodic.retry.nextPeriodMs = IOT_MQTT_RETRY_MS_CEILING;
}
else
{
@@ -249,12 +283,12 @@ static bool _scheduleNextRetry( _mqttOperation_t * pOperation )
IotLogDebug( "(MQTT connection %p, PUBLISH operation %p) Scheduling retry %lu of %lu in %lu ms.",
pMqttConnection,
pOperation,
- ( unsigned long ) pOperation->u.operation.retry.count,
- ( unsigned long ) pOperation->u.operation.retry.limit,
+ ( unsigned long ) pOperation->u.operation.periodic.retry.count,
+ ( unsigned long ) pOperation->u.operation.periodic.retry.limit,
( unsigned long ) scheduleDelay );
/* Check if this is the first retry. */
- firstRetry = ( pOperation->u.operation.retry.count == 1 );
+ firstRetry = ( pOperation->u.operation.periodic.retry.count == 1 );
/* On the first retry, the PUBLISH will be moved from the pending processing
* list to the pending responses list. Lock the connection references mutex
@@ -332,7 +366,7 @@ IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
{
IotLogError( "Callback should not be set for a waitable operation." );
- return IOT_MQTT_BAD_PARAMETER;
+ IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
@@ -509,8 +543,8 @@ bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
pMqttConnection,
IotMqtt_OperationType( pOperation->u.operation.type ),
pOperation,
- pOperation->u.operation.jobReference + 1,
- pOperation->u.operation.jobReference );
+ ( long ) ( pOperation->u.operation.jobReference + 1 ),
+ ( long ) ( pOperation->u.operation.jobReference ) );
/* The job reference count must be 0 or 1 after the decrement. */
IotMqtt_Assert( ( pOperation->u.operation.jobReference == 0 ) ||
@@ -649,23 +683,30 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
void * pContext )
{
bool status = true;
+ uint32_t swapStatus = 0;
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
size_t bytesSent = 0;
+ /* Swap status is not checked when asserts are disabled. */
+ ( void ) swapStatus;
+
/* Retrieve the MQTT connection from the context. */
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pContext;
+ _mqttOperation_t * pPingreqOperation = &( pMqttConnection->pingreq );
/* Check parameters. */
/* IotMqtt_Assert( pTaskPool == IOT_SYSTEM_TASKPOOL ); */
- IotMqtt_Assert( pKeepAliveJob == pMqttConnection->keepAliveJob );
+ IotMqtt_Assert( pKeepAliveJob == pPingreqOperation->job );
/* Check that keep-alive interval is valid. The MQTT spec states its maximum
* value is 65,535 seconds. */
- IotMqtt_Assert( pMqttConnection->keepAliveMs <= 65535000 );
+ IotMqtt_Assert( pPingreqOperation->u.operation.periodic.ping.keepAliveMs <= 65535000 );
/* Only two values are valid for the next keep alive job delay. */
- IotMqtt_Assert( ( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs ) ||
- ( pMqttConnection->nextKeepAliveMs == IOT_MQTT_RESPONSE_WAIT_MS ) );
+ IotMqtt_Assert( ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs ) ||
+ ( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs
+ == IOT_MQTT_RESPONSE_WAIT_MS ) );
IotLogDebug( "(MQTT connection %p) Keep-alive job started.", pMqttConnection );
@@ -676,10 +717,9 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
&pKeepAliveJob );
IotMqtt_Assert( taskPoolStatus == IOT_TASKPOOL_SUCCESS );
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
-
/* Determine whether to send a PINGREQ or check for PINGRESP. */
- if( pMqttConnection->nextKeepAliveMs == pMqttConnection->keepAliveMs )
+ if( pPingreqOperation->u.operation.periodic.ping.nextPeriodMs ==
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs )
{
IotLogDebug( "(MQTT connection %p) Sending PINGREQ.", pMqttConnection );
@@ -687,10 +727,10 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
* more important than other operations. Bypass the queue of jobs for
* operations by directly sending the PINGREQ in this job. */
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
- pMqttConnection->pPingreqPacket,
- pMqttConnection->pingreqPacketSize );
+ pPingreqOperation->u.operation.pMqttPacket,
+ pPingreqOperation->u.operation.packetSize );
- if( bytesSent != pMqttConnection->pingreqPacketSize )
+ if( bytesSent != pPingreqOperation->u.operation.packetSize )
{
IotLogError( "(MQTT connection %p) Failed to send PINGREQ.", pMqttConnection );
status = false;
@@ -699,10 +739,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
/* Assume the keep-alive will fail. The network receive callback will
* clear the failure flag upon receiving a PINGRESP. */
- pMqttConnection->keepAliveFailure = true;
+ swapStatus = Atomic_CompareAndSwap_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ),
+ 1,
+ 0 );
+ IotMqtt_Assert( swapStatus == 1 );
/* Schedule a check for PINGRESP. */
- pMqttConnection->nextKeepAliveMs = IOT_MQTT_RESPONSE_WAIT_MS;
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs = IOT_MQTT_RESPONSE_WAIT_MS;
IotLogDebug( "(MQTT connection %p) PINGREQ sent. Scheduling check for PINGRESP in %d ms.",
pMqttConnection,
@@ -713,12 +756,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
IotLogDebug( "(MQTT connection %p) Checking for PINGRESP.", pMqttConnection );
- if( pMqttConnection->keepAliveFailure == false )
+ if( Atomic_Add_u32( &( pPingreqOperation->u.operation.periodic.ping.failure ), 0 ) == 0 )
{
IotLogDebug( "(MQTT connection %p) PINGRESP was received.", pMqttConnection );
/* PINGRESP was received. Schedule the next PINGREQ transmission. */
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs =
+ pPingreqOperation->u.operation.periodic.ping.keepAliveMs;
}
else
{
@@ -737,13 +781,13 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
taskPoolStatus = IotTaskPool_ScheduleDeferred( pTaskPool,
pKeepAliveJob,
- pMqttConnection->nextKeepAliveMs );
+ pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
- IotLogDebug( "(MQTT connection %p) Next keep-alive job in %d ms.",
+ IotLogDebug( "(MQTT connection %p) Next keep-alive job in %lu ms.",
pMqttConnection,
- IOT_MQTT_RESPONSE_WAIT_MS );
+ ( unsigned long ) pPingreqOperation->u.operation.periodic.ping.nextPeriodMs );
}
else
{
@@ -769,8 +813,6 @@ void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
{
EMPTY_ELSE_MARKER;
}
-
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
/*-----------------------------------------------------------*/
@@ -799,6 +841,8 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
}
else
{
+ /* This operation may have already been removed by cleanup of pending
+ * operations (called from Disconnect). In that case, do nothing here. */
EMPTY_ELSE_MARKER;
}
@@ -810,15 +854,9 @@ void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
_IotMqtt_InvokeSubscriptionCallback( pOperation->pMqttConnection,
&callbackParam );
- /* Free any buffers associated with the current PUBLISH message. */
- if( pOperation->u.publish.pReceivedData != NULL )
- {
- IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
- }
- else
- {
- EMPTY_ELSE_MARKER;
- }
+ /* Free buffers associated with the current PUBLISH message. */
+ IotMqtt_Assert( pOperation->u.publish.pReceivedData != NULL );
+ IotMqtt_FreeMessage( ( void * ) pOperation->u.publish.pReceivedData );
/* Free the incoming PUBLISH operation. */
IotMqtt_FreeOperation( pOperation );
@@ -851,7 +889,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
waitable = ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE;
/* Check PUBLISH retry counts and limits. */
- if( pOperation->u.operation.retry.limit > 0 )
+ if( pOperation->u.operation.periodic.retry.limit > 0 )
{
if( _checkRetryLimit( pOperation ) == false )
{
@@ -923,7 +961,7 @@ void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
if( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING )
{
/* Check if this operation should be scheduled for retransmission. */
- if( pOperation->u.operation.retry.limit > 0 )
+ if( pOperation->u.operation.periodic.retry.limit > 0 )
{
if( _scheduleNextRetry( pOperation ) == false )
{
@@ -1141,7 +1179,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
/* Check if the matched operation is a PUBLISH with retry. If it is, cancel
* the retry job. */
- if( pResult->u.operation.retry.limit > 0 )
+ if( pResult->u.operation.periodic.retry.limit > 0 )
{
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pResult->job,
@@ -1188,7 +1226,7 @@ _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
if( pResult != NULL )
{
- IotLogDebug( "(MQTT connection %p) Found operation %s." ,
+ IotLogDebug( "(MQTT connection %p) Found operation %s.",
pMqttConnection,
IotMqtt_OperationType( type ) );
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c
index f42c80c6a..d3447eefd 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_serialize.c
@@ -1375,12 +1375,12 @@ IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish )
* a packet identifer, but QoS 0 PUBLISH packets do not. */
if( pOutput->qos == IOT_MQTT_QOS_0 )
{
- pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );
+ pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - sizeof( uint16_t ) );
pOutput->pPayload = pPacketIdentifierHigh;
}
else
{
- pOutput->payloadLength = ( uint16_t ) ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );
+ pOutput->payloadLength = ( pPublish->remainingLength - pOutput->topicNameLength - 2 * sizeof( uint16_t ) );
pOutput->pPayload = pPacketIdentifierHigh + sizeof( uint16_t );
}
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
index 59caccc7f..f4546b558 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_subscription.c
@@ -586,7 +586,7 @@ void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnecti
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
- IotMqttSubscription_t * pCurrentSubscription )
+ IotMqttSubscription_t * const pCurrentSubscription )
{
bool status = false;
_mqttSubscription_t * pSubscription = NULL;
@@ -595,7 +595,7 @@ bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
topicMatchParams.pTopicName = pTopicFilter;
topicMatchParams.topicNameLength = topicFilterLength;
- topicMatchParams.exactMatchOnly = false;
+ topicMatchParams.exactMatchOnly = true;
/* Prevent any other thread from modifying the subscription list while this
* function is running. */
diff --git a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h
index 80aef1c60..5ac1b9489 100644
--- a/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h
+++ b/FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/private/iot_mqtt_internal.h
@@ -248,69 +248,13 @@
/*---------------------- MQTT internal data structures ----------------------*/
/**
- * @brief Represents an MQTT connection.
- */
-typedef struct _mqttConnection
-{
- bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */
- bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */
- void * pNetworkConnection; /**< @brief References the transport-layer network connection. */
- const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */
- IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */
-
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
- const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */
- #endif
-
- bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
- IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
- int32_t references; /**< @brief Counts callbacks and operations using this connection. */
- IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
- IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
-
- IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
- IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
-
- bool keepAliveFailure; /**< @brief Failure flag for keep-alive operation. */
- uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
- uint32_t nextKeepAliveMs; /**< @brief Relative delay for next keep-alive job. */
- IotTaskPoolJobStorage_t keepAliveJobStorage; /**< @brief Task pool job for processing this connection's keep-alive. */
- IotTaskPoolJob_t keepAliveJob; /**< @brief Task pool job for processing this connection's keep-alive. */
- uint8_t * pPingreqPacket; /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */
- size_t pingreqPacketSize; /**< @brief The size of an allocated PINGREQ packet. */
-} _mqttConnection_t;
-
-/**
- * @brief Represents a subscription stored in an MQTT connection.
+ * @cond DOXYGEN_IGNORE
+ * Doxygen should ignore this section.
+ *
+ * Forward declaration of MQTT connection type.
*/
-typedef struct _mqttSubscription
-{
- IotLink_t link; /**< @brief List link member. */
-
- int32_t references; /**< @brief How many subscription callbacks are using this subscription. */
-
- /**
- * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for
- * this subscription.
- *
- * If there are active subscription callbacks, @ref mqtt_function_unsubscribe
- * cannot remove this subscription. Instead, it will set this flag, which
- * schedules the removal of this subscription once all subscription callbacks
- * terminate.
- */
- bool unsubscribed;
-
- struct
- {
- uint16_t identifier; /**< @brief Packet identifier. */
- size_t order; /**< @brief Order in the packet's list of subscriptions. */
- } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */
-
- IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */
-
- uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */
- char pTopicFilter[]; /**< @brief The subscription topic filter. */
-} _mqttSubscription_t;
+struct _mqttConnection;
+/** @endcond */
/**
* @brief Internal structure representing a single MQTT operation, such as
@@ -321,13 +265,13 @@ typedef struct _mqttSubscription
typedef struct _mqttOperation
{
/* Pointers to neighboring queue elements. */
- IotLink_t link; /**< @brief List link member. */
+ IotLink_t link; /**< @brief List link member. */
- bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */
- _mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
+ bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */
+ struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
- IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
- IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
+ IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
+ IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
union
{
@@ -353,12 +297,22 @@ typedef struct _mqttOperation
} notify; /**< @brief How to notify of this operation's completion. */
IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */
- struct
+ union
{
- uint32_t count;
- uint32_t limit;
- uint32_t nextPeriod;
- } retry;
+ struct
+ {
+ uint32_t count; /**< @brief Current number of retries. */
+ uint32_t limit; /**< @brief Maximum number of retries allowed. */
+ uint32_t nextPeriodMs; /**< @brief Next retry period. */
+ } retry; /**< @brief Additional information for PUBLISH retry. */
+
+ struct
+ {
+ uint32_t failure; /**< @brief Flag tracking keep-alive status. */
+ uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
+ uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */
+ } ping; /**< @brief Additional information for keep-alive pings. */
+ } periodic; /**< @brief Additional information for periodic operations. */
} operation;
/* If incomingPublish is true, this struct is valid. */
@@ -371,6 +325,65 @@ typedef struct _mqttOperation
} _mqttOperation_t;
/**
+ * @brief Represents an MQTT connection.
+ */
+typedef struct _mqttConnection
+{
+ bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */
+ bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */
+ void * pNetworkConnection; /**< @brief References the transport-layer network connection. */
+ const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */
+ IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */
+
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
+ const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */
+ #endif
+
+ bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
+ IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
+ int32_t references; /**< @brief Counts callbacks and operations using this connection. */
+ IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
+ IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
+
+ IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
+ IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
+
+ _mqttOperation_t pingreq; /**< @brief Operation used for MQTT keep-alive. */
+} _mqttConnection_t;
+
+/**
+ * @brief Represents a subscription stored in an MQTT connection.
+ */
+typedef struct _mqttSubscription
+{
+ IotLink_t link; /**< @brief List link member. */
+
+ int32_t references; /**< @brief How many subscription callbacks are using this subscription. */
+
+ /**
+ * @brief Tracks whether @ref mqtt_function_unsubscribe has been called for
+ * this subscription.
+ *
+ * If there are active subscription callbacks, @ref mqtt_function_unsubscribe
+ * cannot remove this subscription. Instead, it will set this flag, which
+ * schedules the removal of this subscription once all subscription callbacks
+ * terminate.
+ */
+ bool unsubscribed;
+
+ struct
+ {
+ uint16_t identifier; /**< @brief Packet identifier. */
+ size_t order; /**< @brief Order in the packet's list of subscriptions. */
+ } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */
+
+ IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */
+
+ uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */
+ char pTopicFilter[]; /**< @brief The subscription topic filter. */
+} _mqttSubscription_t;
+
+/**
* @brief Represents an MQTT packet received from the network.
*
* This struct is used to hold parameters for the deserializers so that all