summaryrefslogtreecommitdiff
path: root/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp')
-rw-r--r--ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp589
1 files changed, 589 insertions, 0 deletions
diff --git a/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp b/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
new file mode 100644
index 00000000000..7f90fc62012
--- /dev/null
+++ b/ACE/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
@@ -0,0 +1,589 @@
+// $Id$
+#include "RT_CORBA_Workers.h"
+
+#if defined (ACE_HAS_THREADS)
+
+#include "ace/OS_main.h"
+#include "ace/ACE.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Sched_Params.h"
+#include "ace/Lock_Adapter_T.h"
+
+// The number of messages that is being processed
+static size_t number_of_messages = 100;
+
+// The number of upcall threads
+static size_t number_of_workers = 2;
+
+// The size of the message
+static size_t message_size = 100;
+
+// Number of threads that are ready to go
+static size_t ready_threads = 0;
+
+// Number of input and output threads
+static size_t io_threads = 2; // 1 for output and 1 for input
+
+// High resolution test timer
+static ACE_High_Res_Timer test_timer;
+
+// Debugging condition
+static DEBUGGING_RANGE debug = DEBUG_NONE;
+
+// Data block used by the message blocks
+ACE_Data_Block *data_block = 0;
+
+/*******************************************************************/
+// Constructor for Synchronisers
+Synchronisers::Synchronisers (void)
+ : mutex_ (),
+ event_ ()
+{
+}
+
+
+int
+Synchronisers::start_synchronization (void)
+{
+ // Hold the lock and increment the global variable to indicate
+ // number of ready threads
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ ready_threads ++;
+
+ if (ready_threads == (number_of_workers + io_threads))
+ {
+ // Reset the ready_threads so that we can wait at the end of
+ // runs
+ ready_threads = 0;
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Ready to signal start \n"));
+ }
+ // Start the timer
+ test_timer.start ();
+
+ // Signal all the threads
+ this->event_.signal ();
+
+ // return to do our work;
+ return 0;
+ }
+
+ // If we are not the last thread, let go off the lock
+ }
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Ready to wait () on event.. \n"));
+ }
+
+ // Wait blisfully till we are woken up
+ this->event_.wait ();
+
+ return 0;
+}
+
+int
+Synchronisers::end_synchronization (void)
+{
+ // Hold the lock and increment the global variable to indicate
+ // number of ready threads
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ ready_threads ++;
+
+ if (ready_threads == (number_of_workers + io_threads))
+ {
+ // Reset the ready_threads so that we can wait at the end of
+ // runs
+ ready_threads = 0;
+
+ // Start the timer
+ test_timer.stop ();
+
+ // Signal all the threads
+ this->event_.signal ();
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Ended peacefully \n"));
+ }
+
+ // return to do our work;
+ return 0;
+ }
+
+
+ // If we are not the last thread, let go off the lock
+ }
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Going to wait .. \n"));
+ }
+
+ // Wait blisfully till we are woken up
+ this->event_.wait ();
+
+ return 0;
+}
+
+/*******************************************************************/
+
+Worker_Task::Worker_Task (Message_Queue *mq,
+ Synchronisers &synch)
+ : ACE_Task<ACE_MT_SYNCH> (0, mq),
+ synch_ (synch),
+ messages_processed_ (0)
+{
+}
+
+int
+Worker_Task::svc (void)
+{
+ // Start synchronization
+ (void) this->synch_.start_synchronization ();
+
+ for (;;)
+ {
+ ACE_Message_Block *mb = 0;
+ int result = this->getq (mb);
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Worker_Task::svc (%t) -> %p\n",
+ "getq error"),
+ -1);
+ }
+
+ // Get the flag in the message blok
+ ACE_Message_Block::Message_Flags flag =
+ mb->self_flags ();
+
+ // The stop flag
+ int stop_flag = 0;
+
+ // Check for the stop flag
+ if (ACE_BIT_ENABLED (flag,
+ Synchronisers::MB_STOP_FLAG))
+ {
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) saw flag after [%d] messages\n",
+ this->messages_processed_));
+ }
+
+ stop_flag = 1;
+ }
+ // Release the message block
+ mb->release ();
+
+ // Counter.
+ ++this->messages_processed_;
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) dequeued my %d message\n",
+ this->messages_processed_));
+ }
+
+ //
+ // Process message here.
+ //
+
+ for (size_t j = 0; j < message_size; ++j)
+ {
+ // Eat a little CPU
+ /* takes about 40.2 usecs on a 167 MHz Ultra2 */
+ u_long n = 11UL;
+ ACE::is_prime (n, 2, n / 2);
+ }
+
+ // Make a message block for writing onto output queue
+ ACE_Message_Block *message_block = 0;
+ ACE_NEW_RETURN (message_block,
+ ACE_Message_Block (data_block),
+ -1);
+
+ // Put this message block into the next queue or the output
+ // queue
+ result = this->put_next (message_block);
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+
+ // If the stop_flag is set just break and wait..
+ if (stop_flag)
+ {
+ if (debug)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Got stop message after [%d] messages \n",
+ this->messages_processed_));
+
+ break;
+ }
+ }
+
+ (void) this->synch_.end_synchronization ();
+ return 0;
+}
+
+int
+Worker_Task::processed (void)
+{
+ return this->messages_processed_;
+}
+
+/*******************************************************************/
+
+Input_Task::Input_Task (Message_Queue *mq,
+ Synchronisers &synch)
+ : ACE_Task<ACE_MT_SYNCH> (0, mq),
+ synch_ (synch)
+{
+}
+
+int
+Input_Task::svc (void)
+{
+ // Synchronise threads
+ (void) this->synch_.start_synchronization ();
+
+
+ size_t i = 0;
+ for (i = 0;
+ i < (number_of_messages - number_of_workers);
+ ++i)
+ {
+ // Make a message block
+ ACE_Message_Block *message_block = 0;
+ ACE_NEW_RETURN (message_block,
+ ACE_Message_Block (data_block),
+ -1);
+
+ int result = this->putq (message_block);
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Input thread -> Sent [%d] messages\n",
+ i));
+ }
+ }
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Sending close messages \n"));
+ }
+
+
+ // Stop messages
+ for (i = 0;
+ i < number_of_workers;
+ ++i)
+ {
+ // Make a message block
+ ACE_Message_Block *message_block = 0;
+ ACE_NEW_RETURN (message_block,
+ ACE_Message_Block (data_block),
+ -1);
+
+ // Set the stop flag in the message block and not in the datablock
+ message_block->set_self_flags (Synchronisers::MB_STOP_FLAG);
+
+ int result = this->putq (message_block);
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+ }
+
+
+ (void) this->synch_.end_synchronization ();
+ return 0;
+}
+
+/*******************************************************************/
+
+Output_Task::Output_Task (Message_Queue *mq,
+ Synchronisers &synch)
+ : ACE_Task<ACE_MT_SYNCH> (0, mq),
+ synch_ (synch)
+{
+}
+
+int
+Output_Task::svc (void)
+{
+ // Synchronise threads
+ (void) this->synch_.start_synchronization ();
+
+
+ for (size_t i = 0;
+ i < number_of_messages;
+ ++i)
+ {
+ // Get the message block from queue
+ ACE_Message_Block *mb = 0;
+ int result = this->getq (mb);
+
+ // delete the message block
+ mb->release ();
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Output thread -> received [%d] message\n",
+ i));
+ }
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+ }
+
+ (void) this->synch_.end_synchronization ();
+ return 0;
+}
+
+int
+Output_Task::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ /* if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Sticking message into "
+ " output queue \n"));
+ }*/
+ return this->putq (mb);
+}
+
+/*******************************************************************/
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:"));
+ int c;
+
+ while ((c = get_opt ()) != -1)
+ {
+ switch (c)
+ {
+ case 'm':
+ number_of_messages = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 't':
+ number_of_workers = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'd':
+ debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ()));
+ break;
+ case 's':
+ message_size = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s\n"
+ "\t[-m number of messages]\n"
+ "\t[-s message size]\n"
+ "\t[-w number of workers]\n"
+ "\t[-b burst size]\n"
+ "\t[-t timeout between bursts]\n"
+ "\t[-d debug]\n",
+ argv[0]),
+ -1);
+ }
+ }
+
+ return 0;
+}
+
+
+/*******************************************************************/
+int
+ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ int result = parse_args (argc, argv);
+ if (result != 0)
+ {
+ return result;
+ }
+
+ ACE_High_Res_Timer::calibrate ();
+
+ // Create the message queue
+ Message_Queue input_message_queue;
+ Message_Queue output_message_queue;
+
+ // Create the datablocks. IF we use the default Message Blocks Ctor,
+ // it is going to do an extra allocation for the data block
+ ACE_NEW_RETURN (data_block,
+ ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,
+ -1);
+
+ // Increment the reference count so that we can share the
+ // datablock. This is donw twice the number of messages for the
+ // input and output queues.
+ size_t i = 0;
+
+ for (i = 0; i < 2*number_of_messages; ++i)
+ {
+ data_block->duplicate ();
+ }
+
+ // Create the Synchronisers
+ Synchronisers synch;
+
+ // Workers.
+ Worker_Task **workers = 0;
+ ACE_NEW_RETURN (workers,
+ Worker_Task *[number_of_workers],
+ -1);
+
+ // Input Task
+ Input_Task input_task (&input_message_queue,
+ synch);
+
+ // Output Task
+ Output_Task output_task (&output_message_queue,
+ synch);
+ int priority =
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO);
+
+
+ long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS;
+
+ // Create and activate the worker threads
+ for (i = 0; i < number_of_workers; ++i)
+ {
+ ACE_NEW_RETURN (workers[i],
+ Worker_Task (&input_message_queue, synch),
+ -1);
+
+ workers[i]->next (&output_task);
+
+ // Activate the workers.
+ result = workers[i]->activate (flags,
+ 1,
+ 1,
+ priority);
+ if (result != 0)
+ {
+ flags = THR_BOUND;
+ priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER,
+ ACE_SCOPE_THREAD);
+ result = workers[i]->activate (flags,
+ 1,
+ 1,
+ priority);
+ if (result != 0)
+ {
+ return result;
+ }
+ }
+ }
+
+
+
+ // Activate the input and output threads
+ result = input_task.activate (flags,
+ 1,
+ 1,
+ priority);
+
+ if (result != 0)
+ return result;
+
+
+
+ // Activate the workers.
+ result = output_task.activate (flags,
+ 1,
+ 1,
+ priority);
+
+ if (result != 0)
+ return result;
+
+
+
+ // Wait for all threads to terminate.
+ result = ACE_Thread_Manager::instance ()->wait ();
+
+
+ ACE_hrtime_t elapsed_time = 0;
+
+ test_timer.elapsed_time (elapsed_time);
+
+# if !defined (ACE_WIN32)
+ double elapsed_time_per_invocation =
+ (double) elapsed_time / number_of_messages;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Throughput is [%f] \n",
+ elapsed_time_per_invocation));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Throughput is [%f] \n",
+ 1000000000/ elapsed_time_per_invocation));
+
+#endif /*ACE_WIN32 */
+ for (i = 0; i < number_of_workers; ++i)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Message process for thread [%d] is [%d] \n",
+ i, workers[i]->processed ()));
+ delete workers[i];
+ }
+ delete[] workers;
+
+ return result;
+}
+
+#else /*ACE_HAS_THREADS*/
+
+int
+main (int, char *[])
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Not supported in single threaded builds \n"));
+
+ return 0;
+}
+
+
+#endif /*ACE_HAS_THREADS*/