summaryrefslogtreecommitdiff
path: root/FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c
diff options
context:
space:
mode:
Diffstat (limited to 'FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c')
-rw-r--r--FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c374
1 files changed, 374 insertions, 0 deletions
diff --git a/FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c b/FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c
new file mode 100644
index 000000000..8aba18231
--- /dev/null
+++ b/FreeRTOS-Labs/Demo/FreeRTOS_Plus_POSIX_with_actor_Windows_Simulator/posix_demo.c
@@ -0,0 +1,374 @@
+/*
+ * FreeRTOS POSIX Demo V1.0.0
+ * Copyright (C) 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ * http://aws.amazon.com/freertos
+ * http://www.FreeRTOS.org
+ */
+
+/**
+ * @brief Demo intro: job distribution with actor model.
+ *
+ * This demo simulates job distribution with actor model.
+ * https://en.wikipedia.org/wiki/Actor_model
+ *
+ * In this demo, vStartPOSIXDemo() first creates all mailboxes
+ * which will be used by actors to send and receive messages.
+ * Then it spins up two types of actors -- Dispatcher and Workers.
+ *
+ * Dispatcher -- Distributing sub-tasks to workers.
+ * Distribution is done by putting messages into each worker's inbox,
+ * which is essentially an mqueue. Dispatcher keeps distributing tasks
+ * until all intended tasks are distributed.
+ *
+ * Workers -- Take sub-tasks and perform predefined routine for each type of tasks.
+ *
+ * Upon finishing distributing all tasks, Dispatcher will send a "terminate" message to
+ * each worker. vStartPOSIXDemo() will then join all actor threads and clean up mailboxes.
+ *
+ * @note A few assumptions are made in this demo, which a user might have to alter
+ * if to adopt this model in a new application:
+ *
+ * - The upper limit for MQUEUE_NUMBER_OF_WORKERS is set to 10.
+ * This is not due to physical constraint (e.g. memory), rather to make queue
+ * names end with a single digit number.
+ *
+ * - Message enum is cast to char/uint8_t directly, with the assumption that
+ * the system is not going to have more than 254 messages, which is often true
+ * in practice. Could extend bits used in a message to either have more messages
+ * or include additional arguments for a message. Proper typecasting is needed
+ * in that case.
+ *
+ * - The philosophy is "failure is expected". It is shown in both the way dispatcher
+ * delivers messages (i.e. messages can be dropped by worker(s)), and also the
+ * way workers process messages (i.e. workers do not inform dispatcher success or
+ * failure).
+ *
+ * - Following the philosophy, dispatcher shall never use blocking calls to distribute
+ * tasks. The only exception made here is that dispatcher needs to make sure the
+ * successful delivery of "terminate" messages. So that, main thread could join
+ * all actor threads and finish the demo.
+ */
+
+/* FreeRTOS includes. */
+#include "FreeRTOS_POSIX.h"
+
+/* System headers. */
+#include <stdbool.h>
+#include <string.h>
+#include <stdio.h>
+
+/* Demo includes. */
+#include "posix_demo.h"
+
+/* FreeRTOS+POSIX. */
+#include "FreeRTOS_POSIX/pthread.h"
+#include "FreeRTOS_POSIX/mqueue.h"
+#include "FreeRTOS_POSIX/time.h"
+#include "FreeRTOS_POSIX/fcntl.h"
+#include "FreeRTOS_POSIX/errno.h"
+
+/* Constants. */
+#define LINE_BREAK "\r\n"
+
+/**
+ * @brief Control messages.
+ *
+ * uint8_t is sufficient for this enum, that we are going to cast to char directly.
+ * If ever needed, implement a function to properly typecast.
+ */
+/**@{ */
+typedef enum ControlMessage
+{
+ eMSG_LOWER_INAVLID = 0x00, /**< Guard, let's not use 0x00 for messages. */
+ eWORKER_CTRL_MSG_CONTINUE = 0x01, /**< Dispatcher to worker, distributing another job. */
+ eWORKER_CTRL_MSG_EXIT = 0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */
+
+ /* define additional messages here */
+
+ eMSG_UPPER_INVALID = 0xFF /**< Guard, additional tasks shall be defined above. */
+} eControlMessage;
+/**@} */
+
+/**
+ * @defgroup Configuration constants for the dispatcher-worker demo.
+ */
+/**@{ */
+#define MQUEUE_NUMBER_OF_WORKERS ( 4 ) /**< The number of worker threads, each thread has one queue which is used as income box. */
+
+#if ( MQUEUE_NUMBER_OF_WORKERS > 10 )
+ #error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10."
+#endif
+
+#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */
+#define MQUEUE_WORKER_QNAME_BASE_LEN ( 6 ) /** Queue name base length. */
+
+#define MQUEUE_TIMEOUT_SECONDS ( 1 ) /**< Relative timeout for mqueue functions. */
+#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER ( 1 ) /**< Maximum number of messages in a queue. */
+
+#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE sizeof( uint8_t ) /**< Control message size. */
+#define DEMO_ERROR ( -1 ) /**< Any non-zero value would work. */
+/**@} */
+
+/**
+ * @brief Structure used by Worker thread.
+ */
+/**@{ */
+typedef struct WorkerThreadResources
+{
+ pthread_t pxID; /**< thread ID. */
+ mqd_t xInboxID; /**< mqueue inbox ID. */
+} WorkerThreadResources_t;
+/**@} */
+
+/**
+ * @brief Structure used by Dispatcher thread.
+ */
+/**@{ */
+typedef struct DispatcherThreadResources
+{
+ pthread_t pxID; /**< thread ID. */
+ mqd_t * pOutboxID; /**< a list of mqueue outbox ID. */
+} DispatcherThreadResources_t;
+/**@} */
+
+/*-----------------------------------------------------------*/
+
+static void * prvWorkerThread( void * pvArgs )
+{
+ WorkerThreadResources_t pArgList = *( WorkerThreadResources_t * ) pvArgs;
+
+ printf( "Worker thread #[%d] - start %s", ( int ) pArgList.pxID, LINE_BREAK );
+
+ struct timespec xReceiveTimeout = { 0 };
+
+ ssize_t xMessageSize = 0;
+ char pcReceiveBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
+
+ /* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */
+ while( true )
+ {
+ clock_gettime( CLOCK_REALTIME, &xReceiveTimeout );
+ xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
+
+ xMessageSize = mq_receive( pArgList.xInboxID,
+ pcReceiveBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0 );
+
+ /* Parse messages */
+ if( xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE )
+ {
+ switch( ( int ) pcReceiveBuffer[ 0 ] )
+ {
+ case eWORKER_CTRL_MSG_CONTINUE:
+ /* Task branch, currently only prints message to screen. */
+ /* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */
+ printf( "Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s", ( int ) pArgList.pxID, LINE_BREAK );
+ break;
+
+ case eWORKER_CTRL_MSG_EXIT:
+ printf( "Worker thread #[%d] -- Finished. Exit now. %s", ( int ) pArgList.pxID, LINE_BREAK );
+
+ return NULL;
+
+ default:
+ /* Received a message that we don't care or not defined. */
+ break;
+ }
+ }
+ else
+ {
+ /* Invalid message. Error handling can be done here, if desired. */
+ }
+ }
+
+ /* You should never hit here. */
+ /* return NULL; */
+}
+
+/*-----------------------------------------------------------*/
+
+static void * prvDispatcherThread( void * pvArgs )
+{
+ DispatcherThreadResources_t pArgList = *( DispatcherThreadResources_t * ) pvArgs;
+
+ printf( "Dispatcher thread - start %s", LINE_BREAK );
+
+ struct timespec xSendTimeout = { 0 };
+
+ ssize_t xMessageSize = 0;
+ char pcSendBuffer[ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE ] = { 0 };
+
+ /* Just for fun, let threads do a total of 100 independent tasks. */
+ int i = 0;
+ const int totalNumOfJobsPerThread = 100;
+
+ /* Distribute 1000 independent tasks to workers, in round-robin fashion. */
+ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_CONTINUE;
+
+ for( i = 0; i < totalNumOfJobsPerThread; i++ )
+ {
+ clock_gettime( CLOCK_REALTIME, &xSendTimeout );
+ xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;
+
+ printf( "Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
+
+ xMessageSize = mq_timedsend( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
+ pcSendBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0,
+ &xSendTimeout );
+
+ if( xMessageSize != 0 )
+ {
+ /* This error is acceptable in our setup.
+ * Since inbox for each thread fits only one message.
+ * In reality, balance inbox size, message arrival rate, and message drop rate. */
+ printf( "An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s",
+ ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], errno, LINE_BREAK );
+ }
+ }
+
+ /* Control thread is now done with distributing jobs. Tell workers they are done. */
+ pcSendBuffer[ 0 ] = ( char ) eWORKER_CTRL_MSG_EXIT;
+
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ printf( "Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s", i, ( int ) pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ], LINE_BREAK );
+
+ /* This is a blocking call, to guarantee worker thread exits. */
+ xMessageSize = mq_send( pArgList.pOutboxID[ i % MQUEUE_NUMBER_OF_WORKERS ],
+ pcSendBuffer,
+ MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ 0 );
+ }
+
+ return NULL;
+}
+
+/*-----------------------------------------------------------*/
+
+/**
+ * @brief Job distribution with actor model.
+ *
+ * See the top of this file for detailed description.
+ */
+void vStartPOSIXDemo( void *pvParameters )
+{
+ int i = 0;
+ int iStatus = 0;
+
+ /* Remove warnings about unused parameters. */
+ ( void ) pvParameters;
+
+ /* Handles of the threads and related resources. */
+ DispatcherThreadResources_t pxDispatcher = { 0 };
+ WorkerThreadResources_t pxWorkers[ MQUEUE_NUMBER_OF_WORKERS ] = { { 0 } };
+ mqd_t workerMqueues[ MQUEUE_NUMBER_OF_WORKERS ] = { 0 };
+
+ struct mq_attr xQueueAttributesWorker =
+ {
+ .mq_flags = 0,
+ .mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER,
+ .mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
+ .mq_curmsgs = 0
+ };
+
+ pxDispatcher.pOutboxID = workerMqueues;
+
+ /* Create message queues for each worker thread. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ /* Prepare a unique queue name for each worker. */
+ char qName[] = MQUEUE_WORKER_QNAME_BASE;
+ qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
+
+ /* Open a queue with --
+ * O_CREAT -- create a message queue.
+ * O_RDWR -- both receiving and sending messages.
+ */
+ pxWorkers[ i ].xInboxID = mq_open( qName,
+ O_CREAT | O_RDWR,
+ ( mode_t ) 0,
+ &xQueueAttributesWorker );
+
+ if( pxWorkers[ i ].xInboxID == ( mqd_t ) -1 )
+ {
+ printf( "Invalid inbox (mqueue) for worker. %s", LINE_BREAK );
+ iStatus = DEMO_ERROR;
+ break;
+ }
+
+ /* Outboxes of dispatcher thread is the inboxes of all worker threads. */
+ pxDispatcher.pOutboxID[ i ] = pxWorkers[ i ].xInboxID;
+ }
+
+ /* Create and start Worker threads. */
+ if( iStatus == 0 )
+ {
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ ( void ) pthread_create( &( pxWorkers[ i ].pxID ), NULL, prvWorkerThread, &pxWorkers[ i ] );
+ }
+
+ /* Create and start dispatcher thread. */
+ ( void ) pthread_create( &( pxDispatcher.pxID ), NULL, prvDispatcherThread, &pxDispatcher );
+
+ /* Actors will do predefined tasks in threads. Current implementation is that
+ * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */
+
+ /* Wait for worker threads to join. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ ( void ) pthread_join( pxWorkers[ i ].pxID, NULL );
+ }
+
+ /* Wait for dispatcher thread to join. */
+ ( void ) pthread_join( pxDispatcher.pxID, NULL );
+ }
+
+ /* Close and unlink worker message queues. */
+ for( i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++ )
+ {
+ char qName[] = MQUEUE_WORKER_QNAME_BASE;
+ qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] = qName[ MQUEUE_WORKER_QNAME_BASE_LEN - 1 ] + i;
+
+ if( pxWorkers[ i ].xInboxID != NULL )
+ {
+ ( void ) mq_close( pxWorkers[ i ].xInboxID );
+ ( void ) mq_unlink( qName );
+ }
+ }
+
+ /* Have something on console. */
+ if( iStatus == 0 )
+ {
+ printf( "All threads finished. %s", LINE_BREAK );
+ }
+ else
+ {
+ printf( "Queues did not get initialized properly. Did not run demo. %s", LINE_BREAK );
+ }
+
+ /* This task was created with the native xTaskCreate() API function, so
+ must not run off the end of its implementing thread. */
+ vTaskDelete( NULL );
+}