diff options
Diffstat (limited to 'ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp')
-rw-r--r-- | ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp | 589 |
1 files changed, 589 insertions, 0 deletions
diff --git a/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp b/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp new file mode 100644 index 00000000000..7f90fc62012 --- /dev/null +++ b/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp @@ -0,0 +1,589 @@ +// $Id$ +#include "RT_CORBA_Workers.h" + +#if defined (ACE_HAS_THREADS) + +#include "ace/OS_main.h" +#include "ace/ACE.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Sched_Params.h" +#include "ace/Lock_Adapter_T.h" + +// The number of messages that is being processed +static size_t number_of_messages = 100; + +// The number of upcall threads +static size_t number_of_workers = 2; + +// The size of the message +static size_t message_size = 100; + +// Number of threads that are ready to go +static size_t ready_threads = 0; + +// Number of input and output threads +static size_t io_threads = 2; // 1 for output and 1 for input + +// High resolution test timer +static ACE_High_Res_Timer test_timer; + +// Debugging condition +static DEBUGGING_RANGE debug = DEBUG_NONE; + +// Data block used by the message blocks +ACE_Data_Block *data_block = 0; + +/*******************************************************************/ +// Constructor for Synchronisers +Synchronisers::Synchronisers (void) + : mutex_ (), + event_ () +{ +} + + +int +Synchronisers::start_synchronization (void) +{ + // Hold the lock and increment the global variable to indicate + // number of ready threads + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + + ready_threads ++; + + if (ready_threads == (number_of_workers + io_threads)) + { + // Reset the ready_threads so that we can wait at the end of + // runs + ready_threads = 0; + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Ready to signal start \n")); + } + // Start the timer + test_timer.start (); + + // Signal all the threads + this->event_.signal (); + + // return to do our work; + return 0; + } + + // If we are not the last thread, let go off the lock + } + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Ready to wait () on event.. \n")); + } + + // Wait blisfully till we are woken up + this->event_.wait (); + + return 0; +} + +int +Synchronisers::end_synchronization (void) +{ + // Hold the lock and increment the global variable to indicate + // number of ready threads + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + + ready_threads ++; + + if (ready_threads == (number_of_workers + io_threads)) + { + // Reset the ready_threads so that we can wait at the end of + // runs + ready_threads = 0; + + // Start the timer + test_timer.stop (); + + // Signal all the threads + this->event_.signal (); + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Ended peacefully \n")); + } + + // return to do our work; + return 0; + } + + + // If we are not the last thread, let go off the lock + } + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Going to wait .. \n")); + } + + // Wait blisfully till we are woken up + this->event_.wait (); + + return 0; +} + +/*******************************************************************/ + +Worker_Task::Worker_Task (Message_Queue *mq, + Synchronisers &synch) + : ACE_Task<ACE_MT_SYNCH> (0, mq), + synch_ (synch), + messages_processed_ (0) +{ +} + +int +Worker_Task::svc (void) +{ + // Start synchronization + (void) this->synch_.start_synchronization (); + + for (;;) + { + ACE_Message_Block *mb = 0; + int result = this->getq (mb); + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Worker_Task::svc (%t) -> %p\n", + "getq error"), + -1); + } + + // Get the flag in the message blok + ACE_Message_Block::Message_Flags flag = + mb->self_flags (); + + // The stop flag + int stop_flag = 0; + + // Check for the stop flag + if (ACE_BIT_ENABLED (flag, + Synchronisers::MB_STOP_FLAG)) + { + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) saw flag after [%d] messages\n", + this->messages_processed_)); + } + + stop_flag = 1; + } + // Release the message block + mb->release (); + + // Counter. + ++this->messages_processed_; + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) dequeued my %d message\n", + this->messages_processed_)); + } + + // + // Process message here. + // + + for (size_t j = 0; j < message_size; ++j) + { + // Eat a little CPU + /* takes about 40.2 usecs on a 167 MHz Ultra2 */ + u_long n = 11UL; + ACE::is_prime (n, 2, n / 2); + } + + // Make a message block for writing onto output queue + ACE_Message_Block *message_block = 0; + ACE_NEW_RETURN (message_block, + ACE_Message_Block (data_block), + -1); + + // Put this message block into the next queue or the output + // queue + result = this->put_next (message_block); + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + + // If the stop_flag is set just break and wait.. + if (stop_flag) + { + if (debug) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Got stop message after [%d] messages \n", + this->messages_processed_)); + + break; + } + } + + (void) this->synch_.end_synchronization (); + return 0; +} + +int +Worker_Task::processed (void) +{ + return this->messages_processed_; +} + +/*******************************************************************/ + +Input_Task::Input_Task (Message_Queue *mq, + Synchronisers &synch) + : ACE_Task<ACE_MT_SYNCH> (0, mq), + synch_ (synch) +{ +} + +int +Input_Task::svc (void) +{ + // Synchronise threads + (void) this->synch_.start_synchronization (); + + + size_t i = 0; + for (i = 0; + i < (number_of_messages - number_of_workers); + ++i) + { + // Make a message block + ACE_Message_Block *message_block = 0; + ACE_NEW_RETURN (message_block, + ACE_Message_Block (data_block), + -1); + + int result = this->putq (message_block); + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Input thread -> Sent [%d] messages\n", + i)); + } + } + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Sending close messages \n")); + } + + + // Stop messages + for (i = 0; + i < number_of_workers; + ++i) + { + // Make a message block + ACE_Message_Block *message_block = 0; + ACE_NEW_RETURN (message_block, + ACE_Message_Block (data_block), + -1); + + // Set the stop flag in the message block and not in the datablock + message_block->set_self_flags (Synchronisers::MB_STOP_FLAG); + + int result = this->putq (message_block); + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + } + + + (void) this->synch_.end_synchronization (); + return 0; +} + +/*******************************************************************/ + +Output_Task::Output_Task (Message_Queue *mq, + Synchronisers &synch) + : ACE_Task<ACE_MT_SYNCH> (0, mq), + synch_ (synch) +{ +} + +int +Output_Task::svc (void) +{ + // Synchronise threads + (void) this->synch_.start_synchronization (); + + + for (size_t i = 0; + i < number_of_messages; + ++i) + { + // Get the message block from queue + ACE_Message_Block *mb = 0; + int result = this->getq (mb); + + // delete the message block + mb->release (); + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Output thread -> received [%d] message\n", + i)); + } + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + } + + (void) this->synch_.end_synchronization (); + return 0; +} + +int +Output_Task::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + /* if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Sticking message into " + " output queue \n")); + }*/ + return this->putq (mb); +} + +/*******************************************************************/ + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:")); + int c; + + while ((c = get_opt ()) != -1) + { + switch (c) + { + case 'm': + number_of_messages = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 't': + number_of_workers = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'd': + debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 's': + message_size = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s\n" + "\t[-m number of messages]\n" + "\t[-s message size]\n" + "\t[-w number of workers]\n" + "\t[-b burst size]\n" + "\t[-t timeout between bursts]\n" + "\t[-d debug]\n", + argv[0]), + -1); + } + } + + return 0; +} + + +/*******************************************************************/ +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + int result = parse_args (argc, argv); + if (result != 0) + { + return result; + } + + ACE_High_Res_Timer::calibrate (); + + // Create the message queue + Message_Queue input_message_queue; + Message_Queue output_message_queue; + + // Create the datablocks. IF we use the default Message Blocks Ctor, + // it is going to do an extra allocation for the data block + ACE_NEW_RETURN (data_block, + ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >, + -1); + + // Increment the reference count so that we can share the + // datablock. This is donw twice the number of messages for the + // input and output queues. + size_t i = 0; + + for (i = 0; i < 2*number_of_messages; ++i) + { + data_block->duplicate (); + } + + // Create the Synchronisers + Synchronisers synch; + + // Workers. + Worker_Task **workers = 0; + ACE_NEW_RETURN (workers, + Worker_Task *[number_of_workers], + -1); + + // Input Task + Input_Task input_task (&input_message_queue, + synch); + + // Output Task + Output_Task output_task (&output_message_queue, + synch); + int priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); + + + long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS; + + // Create and activate the worker threads + for (i = 0; i < number_of_workers; ++i) + { + ACE_NEW_RETURN (workers[i], + Worker_Task (&input_message_queue, synch), + -1); + + workers[i]->next (&output_task); + + // Activate the workers. + result = workers[i]->activate (flags, + 1, + 1, + priority); + if (result != 0) + { + flags = THR_BOUND; + priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, + ACE_SCOPE_THREAD); + result = workers[i]->activate (flags, + 1, + 1, + priority); + if (result != 0) + { + return result; + } + } + } + + + + // Activate the input and output threads + result = input_task.activate (flags, + 1, + 1, + priority); + + if (result != 0) + return result; + + + + // Activate the workers. + result = output_task.activate (flags, + 1, + 1, + priority); + + if (result != 0) + return result; + + + + // Wait for all threads to terminate. + result = ACE_Thread_Manager::instance ()->wait (); + + + ACE_hrtime_t elapsed_time = 0; + + test_timer.elapsed_time (elapsed_time); + +# if !defined (ACE_WIN32) + double elapsed_time_per_invocation = + (double) elapsed_time / number_of_messages; + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Throughput is [%f] \n", + elapsed_time_per_invocation)); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Throughput is [%f] \n", + 1000000000/ elapsed_time_per_invocation)); + +#endif /*ACE_WIN32 */ + for (i = 0; i < number_of_workers; ++i) + { + ACE_DEBUG ((LM_DEBUG, + "Message process for thread [%d] is [%d] \n", + i, workers[i]->processed ())); + delete workers[i]; + } + delete[] workers; + + return result; +} + +#else /*ACE_HAS_THREADS*/ + +int +main (int, char *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Not supported in single threaded builds \n")); + + return 0; +} + + +#endif /*ACE_HAS_THREADS*/ |