diff options
author | bala <balanatarajan@users.noreply.github.com> | 2001-12-05 02:59:47 +0000 |
---|---|---|
committer | bala <balanatarajan@users.noreply.github.com> | 2001-12-05 02:59:47 +0000 |
commit | 82d7d3508f0fa16a810f2043c4b12fe86300f556 (patch) | |
tree | fb99f91419285446b695f1503f67fe936d12acdb /performance-tests | |
parent | 550d3cec0c57ece1cd6c1c939d9ff482317cddb3 (diff) | |
download | ATCD-82d7d3508f0fa16a810f2043c4b12fe86300f556.tar.gz |
ChangeLogTag: Tue Dec 4 20:57:09 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'performance-tests')
-rw-r--r-- | performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp | 225 | ||||
-rw-r--r-- | performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h | 41 |
2 files changed, 198 insertions, 68 deletions
diff --git a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp index e6affadb8ef..cae929e8722 100644 --- a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp +++ b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp @@ -9,7 +9,7 @@ // The number of messages that is being processed -static size_t number_of_messages = 1000; +static size_t number_of_messages = 100; // The number of upcall threads static size_t number_of_workers = 2; @@ -20,8 +20,8 @@ static size_t message_size = 100; // Number of threads that are ready to go static size_t ready_threads = 0; -// Number of IO threads -static size_t io_threads = 1; +// Number of input and output threads +static size_t io_threads = 2; // 1 for output and 1 for input // High resolution test timer static ACE_High_Res_Timer test_timer; @@ -29,6 +29,9 @@ static ACE_High_Res_Timer test_timer; // Debugging condition static DEBUGGING_RANGE debug = DEBUG_NONE; +// Data block used by the message blocks +ACE_Data_Block *data_block = 0; + /*******************************************************************/ // Constructor for Synchronisers Synchronisers::Synchronisers (void) @@ -75,7 +78,7 @@ Synchronisers::start_synchronization (void) if (debug) { ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Ready to go.. \n")); + "(%P|%t) Ready to wait () on event.. \n")); } // Wait blisfully till we are woken up @@ -162,7 +165,7 @@ Worker_Task::svc (void) // Get the flag in the message blok ACE_Message_Block::Message_Flags flag = - mb->flags (); + mb->self_flags (); // The stop flag int stop_flag = 0; @@ -172,9 +175,12 @@ Worker_Task::svc (void) Synchronisers::MB_STOP_FLAG)) { if (debug) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Got stop message after [%d] messages \n", - this->messages_processed_)); + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) saw flag after [%d] messages\n", + this->messages_processed_)); + } + stop_flag = 1; } // Release the message block @@ -186,7 +192,7 @@ Worker_Task::svc (void) if (debug) { ACE_DEBUG ((LM_DEBUG, - "(%t) dequeued its %d message\n", + "(%P|%t) dequeued my %d message\n", this->messages_processed_)); } @@ -202,9 +208,34 @@ Worker_Task::svc (void) ACE::is_prime (n, 2, n / 2); } + // Make a message block for writing onto output queue + ACE_Message_Block *message_block = 0; + ACE_NEW_RETURN (message_block, + ACE_Message_Block (data_block), + -1); + + // Put this message block into the next queue or the output + // queue + result = this->put_next (message_block); + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + // If the stop_flag is set just break and wait.. if (stop_flag) - break; + { + if (debug) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Got stop message after [%d] messages \n", + this->messages_processed_)); + + break; + } } (void) this->synch_.end_synchronization (); @@ -219,15 +250,15 @@ Worker_Task::processed (void) /*******************************************************************/ -IO_Task::IO_Task (Message_Queue *mq, - Synchronisers &synch) +Input_Task::Input_Task (Message_Queue *mq, + Synchronisers &synch) : ACE_Task<ACE_MT_SYNCH> (0, mq), synch_ (synch) { } int -IO_Task::svc (void) +Input_Task::svc (void) { // Synchronise threads (void) this->synch_.start_synchronization (); @@ -237,28 +268,28 @@ IO_Task::svc (void) i < (number_of_messages - number_of_workers); ++i) { - if (debug) - { - ACE_DEBUG ((LM_DEBUG, - "(%t) IO thread -> overall message %d\n", - i)); - } - // Make a message block ACE_Message_Block *message_block = 0; ACE_NEW_RETURN (message_block, - ACE_Message_Block (), - -1); + ACE_Message_Block (data_block), + -1); int result = this->putq (message_block); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, - "IO::svc (%t) -> %p\n", + "Input::svc (%t) -> %p\n", "putq error"), -1); } + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Input thread -> Sent [%d] messages\n", + i)); + } } if (debug) @@ -267,6 +298,7 @@ IO_Task::svc (void) "(%t) Sending close messages \n")); } + // Stop messages for (size_t i = 0; i < number_of_workers; @@ -275,18 +307,18 @@ IO_Task::svc (void) // Make a message block ACE_Message_Block *message_block = 0; ACE_NEW_RETURN (message_block, - ACE_Message_Block (), - -1); + ACE_Message_Block (data_block), + -1); - // Set the stop flag - message_block->set_flags (Synchronisers::MB_STOP_FLAG); + // Set the stop flag in the message block and not in the datablock + message_block->set_self_flags (Synchronisers::MB_STOP_FLAG); int result = this->putq (message_block); if (result == -1) { ACE_ERROR_RETURN ((LM_ERROR, - "IO::svc (%t) -> %p\n", + "Input::svc (%t) -> %p\n", "putq error"), -1); } @@ -297,13 +329,71 @@ IO_Task::svc (void) return 0; } +/*******************************************************************/ + +Output_Task::Output_Task (Message_Queue *mq, + Synchronisers &synch) + : ACE_Task<ACE_MT_SYNCH> (0, mq), + synch_ (synch) +{ +} + +int +Output_Task::svc (void) +{ + // Synchronise threads + (void) this->synch_.start_synchronization (); + + + for (size_t i = 0; + i < number_of_messages; + ++i) + { + // Get the message block from queue + ACE_Message_Block *mb = 0; + int result = this->getq (mb); + + // delete the message block + mb->release (); + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) Output thread -> received [%d] message\n", + i)); + } + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Input::svc (%t) -> %p\n", + "putq error"), + -1); + } + } + + (void) this->synch_.end_synchronization (); + return 0; +} + +int +Output_Task::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + /* if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Sticking message into " + " output queue \n")); + }*/ + return this->putq (mb); +} /*******************************************************************/ static int parse_args (int argc, ACE_TCHAR *argv[]) { - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:i:")); + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:")); int c; while ((c = get_opt ()) != -1) @@ -322,9 +412,6 @@ parse_args (int argc, ACE_TCHAR *argv[]) case 's': message_size = ACE_OS::atoi (get_opt.optarg); break; - case 'i': - io_threads = ACE_OS::atoi (get_opt.optarg); - break; default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s\n" @@ -356,7 +443,24 @@ main (int argc, ACE_TCHAR *argv[]) ACE_High_Res_Timer::calibrate (); // Create the message queue - Message_Queue message_queue; + Message_Queue input_message_queue; + Message_Queue output_message_queue; + + // Create the datablocks. IF we use the default Message Blocks Ctor, + // it is going to do an extra allocation for the data block + ACE_NEW_RETURN (data_block, + ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >, + -1); + + // Increment the reference count so that we can share the + // datablock. This is donw twice the number of messages for the + // input and output queues. + size_t i = 0; + + for (i = 0; i < 2*number_of_messages; ++i) + { + data_block->duplicate (); + } // Create the Synchronisers Synchronisers synch; @@ -367,21 +471,28 @@ main (int argc, ACE_TCHAR *argv[]) Worker_Task *[number_of_workers], -1); + // Input Task + Input_Task input_task (&input_message_queue, + synch); + // Output Task + Output_Task output_task (&output_message_queue, + synch); int priority = ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS; - size_t i = 0; // Create and activate the worker threads for (i = 0; i < number_of_workers; ++i) { ACE_NEW_RETURN (workers[i], - Worker_Task (&message_queue, synch), + Worker_Task (&input_message_queue, synch), -1); + workers[i]->next (&output_task); + // Activate the workers. result = workers[i]->activate (flags, 1, @@ -403,29 +514,28 @@ main (int argc, ACE_TCHAR *argv[]) } } - // Workers. - IO_Task **io_task = 0; - ACE_NEW_RETURN (io_task, - IO_Task *[io_threads], - -1); - // Create and activate the worker threads - for (i = 0; i < io_threads; ++i) - { - ACE_NEW_RETURN (io_task[i], - IO_Task (&message_queue, synch), - -1); + // Activate the input and output threads + result = input_task.activate (flags, + 1, + 1, + priority); - // Activate the workers. - result = io_task[i]->activate (flags, - 1, - 1, - priority); + if (result != 0) + return result; + + + + // Activate the workers. + result = output_task.activate (flags, + 1, + 1, + priority); + + if (result != 0) + return result; - if (result != 0) - return result; - } // Wait for all threads to terminate. @@ -456,13 +566,6 @@ main (int argc, ACE_TCHAR *argv[]) } delete[] workers; - for (i = 0; i < io_threads; ++i) - { - - delete io_task[i]; - } - delete[] io_task; - return result; } diff --git a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h index f7232b677a5..c914e4d96c0 100644 --- a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h +++ b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h @@ -33,7 +33,7 @@ enum DEBUGGING_RANGE typedef ACE_Message_Queue<ACE_MT_SYNCH> Message_Queue; - +/**************************************************************/ /** * @class Synchronisers * @@ -76,20 +76,20 @@ private: }; - +/**************************************************************/ /** - * @class IO_Task + * @class Input_Task * - * @brief Class that does the IO work ie. puts the events into the + * @brief Class that does the Input work ie. puts the events into the * message queue */ -class IO_Task : public ACE_Task<ACE_MT_SYNCH> +class Input_Task : public ACE_Task<ACE_MT_SYNCH> { public: /// Ctor - IO_Task (Message_Queue *mq, - Synchronisers &synch); + Input_Task (Message_Queue *mq, + Synchronisers &synch); /// The thread runs inside this method.. int svc (void); @@ -100,8 +100,35 @@ private: Synchronisers &synch_; }; +/**************************************************************/ +/** + * @class Output_Task + * + * @brief Class that does the Output work ie. getsx the events into the + * message queue + */ + +class Output_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + /// Ctor + Output_Task (Message_Queue *mq, + Synchronisers &synch); + + /// The thread runs inside this method.. + int svc (void); + + /// Need to overload this method to do anything useful.. + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + +private: + + /// Our referance to Synchronisers + Synchronisers &synch_; +}; +/**************************************************************/ /** * @class Synchronisers * |