diff options
author | leegeth <51681119+leegeth@users.noreply.github.com> | 2020-07-13 17:00:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-13 17:00:12 -0700 |
commit | 6a8cdcdf3cf1e95e5295718796dfb807e6a4fef7 (patch) | |
tree | 04c813e0cffba404deb9e669761d0a6f0300d52f | |
parent | 28e0db8b3bcf9a0e5f95b1c7b31c7e814441be67 (diff) | |
download | freertos-git-6a8cdcdf3cf1e95e5295718796dfb807e6a4fef7.tar.gz |
MQTT Lightweight demo (#119)
3 files changed, 986 insertions, 13 deletions
diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/DemoTasks/LightWeightMQTTExample.c b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/DemoTasks/LightWeightMQTTExample.c index ebe37bbd5..3e333df33 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/DemoTasks/LightWeightMQTTExample.c +++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/DemoTasks/LightWeightMQTTExample.c @@ -26,11 +26,24 @@ */ /* - * Light weight MQTT demo. - * TODO - To be implemented. + * Demo for showing use of the MQTT light weight serializer API. + * The Light weight serializer API lets user to serialize and + * deserialize MQTT messages into a user provided buffer. + * This API allows use of a statically allocated buffer. + * + * The Example shown below uses this API to create MQTT messages and + * send them over the connection established using FreeRTOS sockets. + * The example is single threaded and uses statically allocated memory; + * it uses QOS0 and therefore does not implement any retransmission + * mechanism for Publish messages. + * + * !!! NOTE !!! + * This MQTT demo does not authenticate the server or the client. + * Hence, this demo code is not recommended to be used in production + * systems requiring secure connections. */ -/* Demo Specific configs. */ +/* Demo specific configs. */ #include "demo_config.h" /* Standard includes. */ @@ -41,41 +54,938 @@ #include "FreeRTOS.h" #include "task.h" +/* FreeRTOS+TCP includes. */ +#include "FreeRTOS_IP.h" +#include "FreeRTOS_Sockets.h" + +/* MQTT library includes. */ +#include "mqtt_lightweight.h" + /*-----------------------------------------------------------*/ +/* Compile time error for undefined configs. */ +#ifndef democonfigMQTT_BROKER_ENDPOINT + #error "Define the config democonfigMQTT_BROKER_ENDPOINT by following the instructions in file demo_config.h." +#endif + +/*-----------------------------------------------------------*/ + +/* Default values for configs. */ +#ifndef democonfigCLIENT_IDENTIFIER + /** - * @brief The task used to demonstrate the light weight MQTT API. + * @brief The MQTT client identifier used in this example. Each client identifier + * must be unique, so edit as required to ensure no two clients connecting to the + * same broker use the same client identifier. + * + * @note Appending __TIME__ to the client id string will reduce the possibility of a + * client id collision in the broker. Note that the appended time is the compilation + * time. This client id can cause collision, if more than one instance of the same + * binary is used at the same time to connect to the broker. + */ + #define democonfigCLIENT_IDENTIFIER "testClient"__TIME__ +#endif + + + +#ifndef democonfigMQTT_BROKER_PORT + +/** + * @brief The port to use for the demo. + */ + #define democonfigMQTT_BROKER_PORT ( 1883 ) +#endif + +/*-----------------------------------------------------------*/ + +/** + * @brief The topic to subscribe and publish to in the example. * - * @note To be implemented. + * The topic name starts with the client identifier to ensure that each demo + * interacts with a unique topic name. + */ +#define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic" + +/** + * @brief The MQTT message published in this example. + */ +#define mqttexampleMESSAGE "Hello Light Weight MQTT World!" + +/** + * @brief Dimensions a file scope buffer currently used to send and receive MQTT data + * from a socket. + */ +#define mqttexampleSHARED_BUFFER_SIZE ( 500U ) + +/** + * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask(). + */ +#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) ) + +/** + * @brief Keep alive time reported to the broker while establishing an MQTT connection. + * + * It is the responsibility of the Client to ensure that the interval between + * Control Packets being sent does not exceed the this Keep Alive value. In the + * absence of sending any other Control Packets, the Client MUST send a + * PINGREQ Packet. + */ +#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 10U ) + +/** + * @brief Time to wait before sending ping request to keep MQTT connection alive. + * + * A PINGREQ is attempted to be sent at every ( #mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) + * seconds. This is to make sure that a PINGREQ is always sent before the timeout + * expires in broker. + */ +#define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( ( ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * 1000 ) ) ) + +/** + * @brief Maximum number of times to call FreeRTOS_recv when initiating a + * graceful socket shutdown. + */ +#define mqttexampleMAX_SOCKET_SHUTDOWN_LOOPS ( 3 ) +/*-----------------------------------------------------------*/ + +/** + * @brief The task used to demonstrate the light weight MQTT API. * * @param[in] pvParameters Parameters as passed at the time of task creation. Not * used in this example. */ static void prvMQTTDemoTask( void * pvParameters ); +/** + * @brief Creates a TCP connection to the MQTT broker as specified by + * democonfigMQTT_BROKER_ENDPOINT and democonfigMQTT_BROKER_PORT defined at the + * top of this file. + * + * @return On success the socket connected to the MQTT broker is returned. + * + */ +static Socket_t prvCreateTCPConnectionToBroker( void ); + +/** + * @brief Sends an MQTT Connect packet over the already connected TCP socket. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker. + * + */ +static void prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket ); + +/** + * @brief Performs a graceful shutdown and close of the socket passed in as its + * parameter. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvGracefulShutDown( Socket_t xSocket ); + +/** + * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of + * this file. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTSubscribeToTopic( Socket_t xMQTTSocket ); + +/** + * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTPublishToTopic( Socket_t xMQTTSocket ); + +/** + * @brief Unsubscribes from the previously subscribed topic as specified + * in mqttexampleTOPIC. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket ); + +/** + * @brief Send MQTT Ping Request to the broker. + * Ping request is used to keep connection to the broker alive. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTKeepAlive( Socket_t xMQTTSocket ); + +/** + * @brief Disconnect From the MQTT broker. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTDisconnect( Socket_t xMQTTSocket ); + +/** + * @brief Process a response or ack to an MQTT request (PING, SUBSCRIBE + * or UNSUBSCRIBE). This function processes PING_RESP, SUB_ACK, UNSUB_ACK + * + * @param pxIncomingPacket is a pointer to structure containing deserialized + * MQTT response. + * @param usPacketId is the packet identifier from the ack received. + */ +static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, + uint16_t usPacketId ); + +/** + * @brief Process incoming Publish message. + * + * @param pxPublishInfo is a pointer to structure containing deserialized + * Publish message. + */ +static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); + +/** + * @brief Receive and validate MQTT packet from the broker, determine the type + * of the packet and processes the packet based on the type. + * + * @param xMQTTSocket is a TCP socket that is connected to an MQTT broker to which + * an MQTT connection has been established. + */ +static void prvMQTTProcessIncomingPacket( Socket_t xMQTTSocket ); + +/** + * @brief The transport receive wrapper function supplied to the MQTT library for + * receiving type and length of an incoming MQTT packet. + * + * @param[in] pxContext Pointer to network context. + * @param[out] pBuffer Buffer for receiving data. + * @param[in] bytesToRecv Size of pBuffer. + * + * @return Number of bytes received or zero to indicate transportTimeout; + * negative value on error. + */ +static int32_t prvTransportRecv( NetworkContext_t * pxContext, + void * pvBuffer, + size_t xBytesToRecv ); + +/** + * @brief Generate and return monotonically increasing packet identifier. + * + * @return The next PacketId. + * + * @note This function is not thread safe. + */ +static uint16_t prvGetNextPacketIdentifier( void ); + +/*-----------------------------------------------------------*/ + +/* @brief Static buffer used to hold MQTT messages being sent and received. */ +static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ]; + +/** + * @brief Packet Identifier generated when Subscribe request was sent to the broker; + * it is used to match received Subscribe ACK to the transmitted subscribe request. + */ +static uint16_t usSubscribePacketIdentifier; + +/** + * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; + * it is used to match received Unsubscribe response to the transmitted unsubscribe + * request. + */ +static uint16_t usUnsubscribePacketIdentifier; + + +/** @brief Static buffer used to hold MQTT messages being sent and received. */ +const static MQTTFixedBuffer_t xBuffer = +{ + .pBuffer = ucSharedBuffer, + .size = mqttexampleSHARED_BUFFER_SIZE +}; + +/*-----------------------------------------------------------*/ + +/** + * @brief The Network Context implementation. This context is passed to the + * transport interface functions. + * + * This example uses transport interface function only to read the packet type + * and length of the incoming packet from network. + */ +struct NetworkContext +{ + Socket_t xTcpSocket; +}; /*-----------------------------------------------------------*/ /* * @brief Create the task that demonstrates the Light Weight MQTT API Demo. + * This is the entry function of this demo. */ void vStartSimpleMQTTDemo( void ) { + /* This example uses a single application task, which in turn is used to + * connect, subscribe, publish, unsubscribe and disconnect from the MQTT + * broker. */ xTaskCreate( prvMQTTDemoTask, /* Function that implements the task. */ "MQTTLWDemo", /* Text name for the task - only used for debugging. */ - configMINIMAL_STACK_SIZE, /* Size of stack (in words, not bytes) to allocate for the task. */ + democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */ NULL, /* Task parameter - not used in this case. */ tskIDLE_PRIORITY, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */ NULL ); /* Used to pass out a handle to the created task - not used in this case. */ } - /*-----------------------------------------------------------*/ static void prvMQTTDemoTask( void * pvParameters ) { - /* Demo stub. */ - vTaskDelay( pdMS_TO_TICKS( 1000 ) ); - LogInfo( ( "In the light weight MQTT demo." ) ); - vTaskDelay( pdMS_TO_TICKS( 1000 ) ); + Socket_t xMQTTSocket; + uint32_t ulPublishCount = 0U; + const uint32_t ulMaxPublishCount = 5UL; + + /* Remove compiler warnings about unused parameters. */ + ( void ) pvParameters; + + for( ; ; ) + { + /****************************** Connect. ******************************/ + + /* Establish a TCP connection with the MQTT broker. This example connects to + * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and + * democonfigMQTT_BROKER_PORT at the top of this file. */ + LogInfo( ( "Create a TCP connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + xMQTTSocket = prvCreateTCPConnectionToBroker(); + + /* Sends an MQTT Connect packet over the already connected TCP socket + * xMQTTSocket, and waits for connection acknowledgment (CONNACK) packet. */ + LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + prvCreateMQTTConnectionWithBroker( xMQTTSocket ); + + /**************************** Subscribe. ******************************/ + + /* The client is now connected to the broker. Subscribe to the topic + * as specified in mqttexampleTOPIC at the top of this file by sending a + * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). + * This client will then publish to the same topic it subscribed to, so it + * will expect all the messages it sends to the broker to be sent back to it + * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish + * messages received from the broker will have QOS0. */ + LogInfo( ( "Attempt to subscribed to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + prvMQTTSubscribeToTopic( xMQTTSocket ); + + /* Process incoming packet from the broker. After sending the subscribe, the + * client may receive a publish before it receives a subscribe ack. Therefore, + * call generic incoming packet processing function. Since this demo is + * subscribing to the topic to which no one is publishing, probability of + * receiving Publish message before subscribe ack is zero; but application + * must be ready to receive any packet. This demo uses the generic packet + * processing function everywhere to highlight this fact. */ + prvMQTTProcessIncomingPacket( xMQTTSocket ); + + /**************************** Publish and Keep Alive Loop. ******************************/ + /* Publish messages with QOS0, send and process Keep alive messages. */ + for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) + { + LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + prvMQTTPublishToTopic( xMQTTSocket ); + + /* Process incoming publish echo, since application subscribed to the same + * topic the broker will send publish message back to the application. */ + LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); + prvMQTTProcessIncomingPacket( xMQTTSocket ); + + /* Leave Connection Idle for some time */ + LogInfo( ( "Keeping Connection Idle.\r\n\r\n" ) ); + vTaskDelay( mqttexampleKEEP_ALIVE_DELAY ); + + /* Send Ping request to broker and receive ping response */ + LogInfo( ( "Sending Ping Request to the broker.\r\n" ) ); + prvMQTTKeepAlive( xMQTTSocket ); + + /* Process Incoming packet from the broker */ + prvMQTTProcessIncomingPacket( xMQTTSocket ); + } + + /************************ Unsubscribe from the topic. **************************/ + LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); + prvMQTTUnsubscribeFromTopic( xMQTTSocket ); + + /* Process Incoming packet from the broker. */ + prvMQTTProcessIncomingPacket( xMQTTSocket ); + + /**************************** Disconnect. ******************************/ + + /* Send an MQTT Disconnect packet over the already connected TCP socket. + * There is no corresponding response for the disconnect packet. After sending + * disconnect, client must close the network connection. */ + LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + prvMQTTDisconnect( xMQTTSocket ); + + /* Close the network connection. */ + prvGracefulShutDown( xMQTTSocket ); + + /* Wait for some time between two iterations to ensure that we do not + * bombard the public test mosquitto broker. */ + LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); + LogInfo( ( "Demo completed successfully.\r\n" ) ); + LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); + vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ); + } +} +/*-----------------------------------------------------------*/ + +static void prvGracefulShutDown( Socket_t xSocket ) +{ + uint8_t ucDummy[ 20 ]; + const TickType_t xShortDelay = pdMS_TO_MIN_TICKS( 250 ); + BaseType_t xShutdownLoopCount = 0; + + if( xSocket != ( Socket_t ) 0 ) + { + if( xSocket != FREERTOS_INVALID_SOCKET ) + { + /* Initiate graceful shutdown. */ + FreeRTOS_shutdown( xSocket, FREERTOS_SHUT_RDWR ); + + /* Wait for the socket to disconnect gracefully (indicated by FreeRTOS_recv() + * returning a FREERTOS_EINVAL error) before closing the socket. */ + while( FreeRTOS_recv( xSocket, ucDummy, sizeof( ucDummy ), 0 ) >= 0 ) + { + /* Wait for shutdown to complete. If a receive block time is used then + * this delay will not be necessary as FreeRTOS_recv() will place the RTOS task + * into the Blocked state anyway. */ + vTaskDelay( xShortDelay ); + + /* Limit the number of FreeRTOS_recv loops to avoid infinite loop. */ + if( ++xShutdownLoopCount >= mqttexampleMAX_SOCKET_SHUTDOWN_LOOPS ) + { + break; + } + } + + /* The socket has shut down and is safe to close. */ + FreeRTOS_closesocket( xSocket ); + } + } +} +/*-----------------------------------------------------------*/ + +static int32_t prvTransportRecv( NetworkContext_t * pxContext, + void * pvBuffer, + size_t xBytesToRecv ) +{ + int32_t lResult; + + configASSERT( pxContext != NULL ); + + /* Receive upto xBytesToRecv from network. */ + lResult = ( int32_t ) FreeRTOS_recv( ( Socket_t ) pxContext->xTcpSocket, + pvBuffer, + xBytesToRecv, + 0 ); + + return lResult; +} +/*-----------------------------------------------------------*/ + +static uint16_t prvGetNextPacketIdentifier() +{ + static uint16_t usPacketId = 0; + + usPacketId++; + + /* Since 0 is invalid packet identifier value, + * take care of it when it rolls over */ + if( usPacketId == 0 ) + { + usPacketId = 1; + } + + return usPacketId; +} +/*-----------------------------------------------------------*/ + +static Socket_t prvCreateTCPConnectionToBroker( void ) +{ + Socket_t xMQTTSocket = FREERTOS_INVALID_SOCKET; + uint32_t ulBrokerIPAddress; + BaseType_t xStatus = pdFAIL; + struct freertos_sockaddr xBrokerAddress; + + /* This is the socket used to connect to the MQTT broker. */ + xMQTTSocket = FreeRTOS_socket( FREERTOS_AF_INET, + FREERTOS_SOCK_STREAM, + FREERTOS_IPPROTO_TCP ); + + if( xMQTTSocket != FREERTOS_INVALID_SOCKET ) + { + /* Socket was created. Locate then connect to the MQTT broker. */ + ulBrokerIPAddress = FreeRTOS_gethostbyname( democonfigMQTT_BROKER_ENDPOINT ); + + if( ulBrokerIPAddress != 0 ) + { + xBrokerAddress.sin_port = FreeRTOS_htons( democonfigMQTT_BROKER_PORT ); + xBrokerAddress.sin_addr = ulBrokerIPAddress; + + if( FreeRTOS_connect( xMQTTSocket, &xBrokerAddress, sizeof( xBrokerAddress ) ) == 0 ) + { + /* Connection was successful. */ + xStatus = pdPASS; + } + else + { + LogInfo( ( "Located but could not connect to MQTT broker %s.\r\n\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + } + } + else + { + LogInfo( ( "Could not locate MQTT broker %s.\r\n\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); + } + } + else + { + LogInfo( ( "Could not create TCP socket.\r\n\r\n" ) ); + } + + /* If the socket was created but the connection was not successful then delete + * the socket again. */ + if( xStatus == pdFAIL ) + { + if( xMQTTSocket != FREERTOS_INVALID_SOCKET ) + { + FreeRTOS_closesocket( xMQTTSocket ); + xMQTTSocket = FREERTOS_INVALID_SOCKET; + } + } + + return xMQTTSocket; +} +/*-----------------------------------------------------------*/ + +static void prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket ) +{ + BaseType_t xStatus; + size_t xRemainingLength; + size_t xPacketSize; + MQTTStatus_t xResult; + MQTTPacketInfo_t xIncomingPacket; + MQTTConnectInfo_t xConnectInfo; + uint16_t usPacketId; + bool xSessionPresent; + NetworkContext_t xNetworkContext; + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + /* Many fields not used in this demo so start with everything at 0. */ + memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); + + /* Start with a clean session i.e. direct the MQTT broker to discard any + * previous session data. Also, establishing a connection with clean session + * will ensure that the broker does not store any data when this client + * gets disconnected. */ + xConnectInfo.cleanSession = true; + + /* The client identifier is used to uniquely identify this MQTT client to + * the MQTT broker. In a production device the identifier can be something + * unique, such as a device serial number. */ + xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; + xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); + + /* Set MQTT keep-alive period. It is the responsibility of the application to ensure + * that the interval between Control Packets being sent does not exceed the Keep Alive value. + * In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet. */ + xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; + + /* Get size requirement for the connect packet. + * Last Will and Testament is not used in this demo. It is passed as NULL. */ + xResult = MQTT_GetConnectPacketSize( &xConnectInfo, + NULL, + &xRemainingLength, + &xPacketSize ); + + /* Make sure the packet size is less than static buffer size. */ + configASSERT( xResult == MQTTSuccess ); + configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); + + /* Serialize MQTT connect packet into the provided buffer. */ + xResult = MQTT_SerializeConnect( &xConnectInfo, + NULL, + xRemainingLength, + &xBuffer ); + configASSERT( xResult == MQTTSuccess ); + + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xPacketSize, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xPacketSize ); + + /* Reset all fields of the incoming packet structure. */ + ( void ) memset( ( void * ) &xIncomingPacket, 0x00, sizeof( MQTTPacketInfo_t ) ); + + /* Wait for connection acknowledgment. We cannot assume received data is the + * connection acknowledgment. Therefore this function reads type and remaining + * length of the received packet, before processing entire packet - although in + * this case to keep the example simple error checks are just performed by + * asserts. + */ + xNetworkContext.xTcpSocket = xMQTTSocket; + xResult = MQTT_GetIncomingPacketTypeAndLength( prvTransportRecv, + &xNetworkContext, + &xIncomingPacket ); + configASSERT( xResult == MQTTSuccess ); + configASSERT( xIncomingPacket.type == MQTT_PACKET_TYPE_CONNACK ); + configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE ); + + /* Now receive the reset of the packet into the statically allocated buffer. */ + xStatus = FreeRTOS_recv( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xIncomingPacket.remainingLength, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xIncomingPacket.remainingLength ); + + xIncomingPacket.pRemainingData = xBuffer.pBuffer; + xResult = MQTT_DeserializeAck( &xIncomingPacket, + &usPacketId, + &xSessionPresent ); + configASSERT( xResult == MQTTSuccess ); + + if( xResult != MQTTSuccess ) + { + LogInfo( ( "Connection with MQTT broker failed.\r\n" ) ); + } +} +/*-----------------------------------------------------------*/ + +static void prvMQTTSubscribeToTopic( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + MQTTSubscribeInfo_t xMQTTSubscription[ 1 ]; + size_t xRemainingLength; + size_t xPacketSize; + BaseType_t xStatus; + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + /* Some fields not used by this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); + + /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to + * only one topic and uses QOS0. */ + xMQTTSubscription[ 0 ].qos = MQTTQoS0; + xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; + xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + + xResult = MQTT_GetSubscribePacketSize( xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + &xRemainingLength, + &xPacketSize ); + + /* Make sure the packet size is less than static buffer size. */ + configASSERT( xResult == MQTTSuccess ); + configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); + + /* Get a unique packet id. */ + usSubscribePacketIdentifier = prvGetNextPacketIdentifier(); + /* Make sure the packet id obtained is valid. */ + configASSERT( usSubscribePacketIdentifier != 0 ); + + /* Serialize subscribe into statically allocated ucSharedBuffer. */ + xResult = MQTT_SerializeSubscribe( xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + usSubscribePacketIdentifier, + xRemainingLength, + &xBuffer ); + + configASSERT( xResult == MQTTSuccess ); + + /* Send Subscribe request to the broker. */ + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xPacketSize, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xPacketSize ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTPublishToTopic( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + MQTTPublishInfo_t xMQTTPublishInfo; + size_t xRemainingLength; + size_t xPacketSize; + size_t xHeaderSize; + BaseType_t xStatus; + + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + /* Some fields not used by this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); + + /* This demo uses QOS0 */ + xMQTTPublishInfo.qos = MQTTQoS0; + xMQTTPublishInfo.retain = false; + xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; + xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; + xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); + + /* Find out length of Publish packet size. */ + xResult = MQTT_GetPublishPacketSize( &xMQTTPublishInfo, + &xRemainingLength, + &xPacketSize ); + configASSERT( xResult == MQTTSuccess ); + + /* Make sure the packet size is less than static buffer size. */ + configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); + + /* Serialize MQTT Publish packet header. The publish message payload will + * be sent directly in order to avoid copying it into the buffer. + * QOS0 does not make use of packet identifier, therefore value of 0 is used */ + xResult = MQTT_SerializePublishHeader( &xMQTTPublishInfo, + 0, + xRemainingLength, + &xBuffer, + &xHeaderSize ); + configASSERT( xResult == MQTTSuccess ); + + /* Send Publish header to the broker. */ + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xHeaderSize, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xHeaderSize ); + + /* Send Publish payload to the broker. */ + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xMQTTPublishInfo.pPayload, + xMQTTPublishInfo.payloadLength, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xMQTTPublishInfo.payloadLength ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTUnsubscribeFromTopic( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + MQTTSubscribeInfo_t xMQTTSubscription[ 1 ]; + size_t xRemainingLength; + size_t xPacketSize; + BaseType_t xStatus; + + /* Some fields not used by this demo so start with everything at 0. */ + memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); + + /* Unsubscribe to the mqttexampleTOPIC topic filter. The task handle is passed + * as the callback context which is used by the callback to send a task + * notification to this task.*/ + xMQTTSubscription[ 0 ].qos = MQTTQoS0; + xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; + xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); + + xResult = MQTT_GetUnsubscribePacketSize( xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + &xRemainingLength, + &xPacketSize ); + configASSERT( xResult == MQTTSuccess ); + /* Make sure the packet size is less than static buffer size */ + configASSERT( xPacketSize < mqttexampleSHARED_BUFFER_SIZE ); + + /* Get next unique packet identifier */ + usUnsubscribePacketIdentifier = prvGetNextPacketIdentifier(); + /* Make sure the packet id obtained is valid. */ + configASSERT( usUnsubscribePacketIdentifier != 0 ); + + xResult = MQTT_SerializeUnsubscribe( xMQTTSubscription, + sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), + usUnsubscribePacketIdentifier, + xRemainingLength, + &xBuffer ); + configASSERT( xResult == MQTTSuccess ); + + /* Send Unsubscribe request to the broker. */ + xStatus = FreeRTOS_send( xMQTTSocket, ( void * ) xBuffer.pBuffer, xPacketSize, 0 ); + configASSERT( xStatus == ( BaseType_t ) xPacketSize ); +} +/*-----------------------------------------------------------*/ + +static void prvMQTTKeepAlive( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + BaseType_t xStatus; + size_t xPacketSize; + + /* Calculate PING request size. */ + xResult = MQTT_GetPingreqPacketSize( &xPacketSize ); + configASSERT( xResult == MQTTSuccess ); + configASSERT( xPacketSize <= mqttexampleSHARED_BUFFER_SIZE ); + + xResult = MQTT_SerializePingreq( &xBuffer ); + configASSERT( xResult == MQTTSuccess ); + + /* Send Ping Request to the broker. */ + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xPacketSize, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xPacketSize ); +} + +/*-----------------------------------------------------------*/ + +static void prvMQTTDisconnect( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + BaseType_t xStatus; + size_t xPacketSize; + + /* Calculate DISCONNECT packet size. */ + xResult = MQTT_GetDisconnectPacketSize( &xPacketSize ); + configASSERT( xResult == MQTTSuccess ); + configASSERT( xPacketSize <= mqttexampleSHARED_BUFFER_SIZE ); + + xResult = MQTT_SerializeDisconnect( &xBuffer ); + configASSERT( xResult == MQTTSuccess ); + + xStatus = FreeRTOS_send( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xPacketSize, + 0 ); + configASSERT( xStatus == ( BaseType_t ) xPacketSize ); +} + +/*-----------------------------------------------------------*/ + +static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, + uint16_t usPacketId ) +{ + switch( pxIncomingPacket->type ) + { + case MQTT_PACKET_TYPE_SUBACK: + LogInfo( ( "Subscribed to the topic %s.\r\n", mqttexampleTOPIC ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usSubscribePacketIdentifier == usPacketId ); + break; + + case MQTT_PACKET_TYPE_UNSUBACK: + LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usUnsubscribePacketIdentifier == usPacketId ); + break; + + case MQTT_PACKET_TYPE_PINGRESP: + LogInfo( ( "Ping Response successfully received.\r\n" ) ); + break; + + /* Any other packet type is invalid. */ + default: + LogInfo( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", + pxIncomingPacket->type ) ); + } +} + +/*-----------------------------------------------------------*/ + +static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) +{ + configASSERT( pxPublishInfo != NULL ); + + /* Process incoming Publish. */ + LogInfo( ( "Incoming QOS : %d\n", pxPublishInfo->qos ) ); + + /* Verify the received publish is for the we have subscribed to. */ + if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && + ( 0 == strncmp( mqttexampleTOPIC, + pxPublishInfo->pTopicName, + pxPublishInfo->topicNameLength ) ) ) + { + LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" + "Incoming Publish Message : %.*s\r\n", + pxPublishInfo->topicNameLength, + pxPublishInfo->pTopicName, + pxPublishInfo->payloadLength, + pxPublishInfo->pPayload ) ); + } + else + { + LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", + pxPublishInfo->topicNameLength, + pxPublishInfo->pTopicName ) ); + } +} + +/*-----------------------------------------------------------*/ + +static void prvMQTTProcessIncomingPacket( Socket_t xMQTTSocket ) +{ + MQTTStatus_t xResult; + MQTTPacketInfo_t xIncomingPacket; + BaseType_t xStatus; + MQTTPublishInfo_t xPublishInfo; + uint16_t usPacketId; + NetworkContext_t xNetworkContext; + + /*** + * For readability, error handling in this function is restricted to the use of + * asserts(). + ***/ + + ( void ) memset( ( void * ) &xIncomingPacket, 0x00, sizeof( MQTTPacketInfo_t ) ); + + /* Determine incoming packet type and remaining length. */ + xNetworkContext.xTcpSocket = xMQTTSocket; + xResult = MQTT_GetIncomingPacketTypeAndLength( prvTransportRecv, + &xNetworkContext, + &xIncomingPacket ); + configASSERT( xResult == MQTTSuccess ); + configASSERT( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE ); + + /* Current implementation expects an incoming Publish and three different + * responses ( SUBACK, PINGRESP and UNSUBACK ). */ + + /* Receive the remaining bytes. In case of PINGRESP, remaining length will be zero. + * Skip reading from network for remaining length zero. */ + if( xIncomingPacket.remainingLength > 0 ) + { + xStatus = FreeRTOS_recv( xMQTTSocket, + ( void * ) xBuffer.pBuffer, + xIncomingPacket.remainingLength, 0 ); + configASSERT( xStatus == ( BaseType_t ) xIncomingPacket.remainingLength ); + xIncomingPacket.pRemainingData = xBuffer.pBuffer; + } + + /* Check if the incoming packet is a publish packet. */ + if( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH ) + { + xResult = MQTT_DeserializePublish( &xIncomingPacket, &usPacketId, &xPublishInfo ); + configASSERT( xResult == MQTTSuccess ); + + /* Process incoming Publish message. */ + prvMQTTProcessIncomingPublish( &xPublishInfo ); + } + else + { + /* If the received packet is not a Publish message, then it is an ACK for one + * of the messages we sent out, verify that the ACK packet is a valid MQTT + * packet. Session present is only valid for a CONNACK. CONNACK is not + * expected to be received here. Hence pass NULL for pointer to session + * present. */ + xResult = MQTT_DeserializeAck( &xIncomingPacket, &usPacketId, NULL ); + configASSERT( xResult == MQTTSuccess ); + + /* Process the response. */ + prvMQTTProcessResponse( &xIncomingPacket, usPacketId ); + } } /*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/demo_config.h b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/demo_config.h index cd706e8f8..bfd4d9be8 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/demo_config.h +++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/demo_config.h @@ -43,7 +43,7 @@ /* Logging configuration for the Demo. */ #ifndef LIBRARY_LOG_NAME - #define LIBRARY_LOG_NAME "DEMO" + #define LIBRARY_LOG_NAME "MQTTLWDemo" #endif #ifndef LIBRARY_LOG_LEVEL @@ -53,4 +53,67 @@ /************ End of logging configuration ****************/ +/** + * @brief The MQTT client identifier used in this example. Each client identifier + * must be unique so edit as required to ensure no two clients connecting to the + * same broker use the same client identifier. + * + * #define democonfigCLIENT_IDENTIFIER "insert here." + */ + + +/** + * @brief MQTT broker end point to connect to. + * + * @note For running this demo an MQTT broker, which can be run locally on + * the same host is recommended. Any MQTT broker, which can be run on a Windows + * host can be used for this demo. However, the instructions below are for + * setting up a local Mosquitto broker on a Windows host. + * 1. Download Mosquitto from https://mosquitto.org/download/ + * 2. Install Mosquitto as a Windows service by running the installer. + * More details about installing as a Windows service can be found at + * https://github.com/eclipse/mosquitto/blob/master/readme-windows.txt and + * https://github.com/eclipse/mosquitto/blob/master/readme.md + * 3. Verify that Mosquitto server is running locally and listening on port + * 1883 by + * a. Opening Power Shell. + * b. Typing in command `netstat -a -p TCP | grep 1883` to check if there + * is an active connection listening on port 1883. + * c. Verify that there is an output as shown below + * `TCP 0.0.0.0:1883 <HOST-NAME>:0 LISTENING` + * d. If there is no output please go through the Mosquitto documentation + * listed above to check if the installation was successful. + * 4.After verifying that a Mosquitto broker is running successfully, update + * the config democonfigMQTT_BROKER_ENDPOINT to the local IP address of the + * Windows host machine. Please note that "localhost" or address "127.0.0.1" + * will not work as this example is running on a Windows Simulator and not on + * Windows host natively. + * + * As an alternative option, a publicly hosted Mosquitto broker can also be + * used as an MQTT broker end point. This can be done by updating the config + * democonfigMQTT_BROKER_ENDPOINT to "test.mosquitto.org". However, this is not + * recommended due the possible downtimes of the broker as indicated by the + * documentation in https://test.mosquitto.org/. + * + * #define democonfigMQTT_BROKER_ENDPOINT "insert here." + */ + + +/** + * @brief The port to use for the demo. + * + * #define democonfigMQTT_BROKER_PORT ( insert here. ) + */ + + +/** + * @brief Set the stack size of the main demo task. + * + * In the Windows port, this stack only holds a structure. The actual + * stack is created by an operating system thread. + */ +#define democonfigDEMO_STACKSIZE configMINIMAL_STACK_SIZE + + + #endif /* DEMO_CONFIG_H */ diff --git a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/mqtt_config.h b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/mqtt_config.h index 1ed08bbb5..afe83b644 100644 --- a/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/mqtt_config.h +++ b/FreeRTOS-Plus/Demo/FreeRTOS-IoT-Libraries-LTS-Beta2/mqtt/mqtt_light_weight/mqtt_config.h @@ -46,7 +46,7 @@ #endif #ifndef LIBRARY_LOG_LEVEL - #define LIBRARY_LOG_LEVEL LOG_INFO + #define LIBRARY_LOG_LEVEL LOG_NONE #endif #include "logging_stack.h" |