summaryrefslogtreecommitdiff
path: root/performance-tests
diff options
context:
space:
mode:
Diffstat (limited to 'performance-tests')
-rw-r--r--performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp225
-rw-r--r--performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h41
2 files changed, 198 insertions, 68 deletions
diff --git a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
index e6affadb8ef..cae929e8722 100644
--- a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
+++ b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.cpp
@@ -9,7 +9,7 @@
// The number of messages that is being processed
-static size_t number_of_messages = 1000;
+static size_t number_of_messages = 100;
// The number of upcall threads
static size_t number_of_workers = 2;
@@ -20,8 +20,8 @@ static size_t message_size = 100;
// Number of threads that are ready to go
static size_t ready_threads = 0;
-// Number of IO threads
-static size_t io_threads = 1;
+// Number of input and output threads
+static size_t io_threads = 2; // 1 for output and 1 for input
// High resolution test timer
static ACE_High_Res_Timer test_timer;
@@ -29,6 +29,9 @@ static ACE_High_Res_Timer test_timer;
// Debugging condition
static DEBUGGING_RANGE debug = DEBUG_NONE;
+// Data block used by the message blocks
+ACE_Data_Block *data_block = 0;
+
/*******************************************************************/
// Constructor for Synchronisers
Synchronisers::Synchronisers (void)
@@ -75,7 +78,7 @@ Synchronisers::start_synchronization (void)
if (debug)
{
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Ready to go.. \n"));
+ "(%P|%t) Ready to wait () on event.. \n"));
}
// Wait blisfully till we are woken up
@@ -162,7 +165,7 @@ Worker_Task::svc (void)
// Get the flag in the message blok
ACE_Message_Block::Message_Flags flag =
- mb->flags ();
+ mb->self_flags ();
// The stop flag
int stop_flag = 0;
@@ -172,9 +175,12 @@ Worker_Task::svc (void)
Synchronisers::MB_STOP_FLAG))
{
if (debug)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) Got stop message after [%d] messages \n",
- this->messages_processed_));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) saw flag after [%d] messages\n",
+ this->messages_processed_));
+ }
+
stop_flag = 1;
}
// Release the message block
@@ -186,7 +192,7 @@ Worker_Task::svc (void)
if (debug)
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) dequeued its %d message\n",
+ "(%P|%t) dequeued my %d message\n",
this->messages_processed_));
}
@@ -202,9 +208,34 @@ Worker_Task::svc (void)
ACE::is_prime (n, 2, n / 2);
}
+ // Make a message block for writing onto output queue
+ ACE_Message_Block *message_block = 0;
+ ACE_NEW_RETURN (message_block,
+ ACE_Message_Block (data_block),
+ -1);
+
+ // Put this message block into the next queue or the output
+ // queue
+ result = this->put_next (message_block);
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+
// If the stop_flag is set just break and wait..
if (stop_flag)
- break;
+ {
+ if (debug)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Got stop message after [%d] messages \n",
+ this->messages_processed_));
+
+ break;
+ }
}
(void) this->synch_.end_synchronization ();
@@ -219,15 +250,15 @@ Worker_Task::processed (void)
/*******************************************************************/
-IO_Task::IO_Task (Message_Queue *mq,
- Synchronisers &synch)
+Input_Task::Input_Task (Message_Queue *mq,
+ Synchronisers &synch)
: ACE_Task<ACE_MT_SYNCH> (0, mq),
synch_ (synch)
{
}
int
-IO_Task::svc (void)
+Input_Task::svc (void)
{
// Synchronise threads
(void) this->synch_.start_synchronization ();
@@ -237,28 +268,28 @@ IO_Task::svc (void)
i < (number_of_messages - number_of_workers);
++i)
{
- if (debug)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%t) IO thread -> overall message %d\n",
- i));
- }
-
// Make a message block
ACE_Message_Block *message_block = 0;
ACE_NEW_RETURN (message_block,
- ACE_Message_Block (),
- -1);
+ ACE_Message_Block (data_block),
+ -1);
int result = this->putq (message_block);
if (result == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
- "IO::svc (%t) -> %p\n",
+ "Input::svc (%t) -> %p\n",
"putq error"),
-1);
}
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Input thread -> Sent [%d] messages\n",
+ i));
+ }
}
if (debug)
@@ -267,6 +298,7 @@ IO_Task::svc (void)
"(%t) Sending close messages \n"));
}
+
// Stop messages
for (size_t i = 0;
i < number_of_workers;
@@ -275,18 +307,18 @@ IO_Task::svc (void)
// Make a message block
ACE_Message_Block *message_block = 0;
ACE_NEW_RETURN (message_block,
- ACE_Message_Block (),
- -1);
+ ACE_Message_Block (data_block),
+ -1);
- // Set the stop flag
- message_block->set_flags (Synchronisers::MB_STOP_FLAG);
+ // Set the stop flag in the message block and not in the datablock
+ message_block->set_self_flags (Synchronisers::MB_STOP_FLAG);
int result = this->putq (message_block);
if (result == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
- "IO::svc (%t) -> %p\n",
+ "Input::svc (%t) -> %p\n",
"putq error"),
-1);
}
@@ -297,13 +329,71 @@ IO_Task::svc (void)
return 0;
}
+/*******************************************************************/
+
+Output_Task::Output_Task (Message_Queue *mq,
+ Synchronisers &synch)
+ : ACE_Task<ACE_MT_SYNCH> (0, mq),
+ synch_ (synch)
+{
+}
+
+int
+Output_Task::svc (void)
+{
+ // Synchronise threads
+ (void) this->synch_.start_synchronization ();
+
+
+ for (size_t i = 0;
+ i < number_of_messages;
+ ++i)
+ {
+ // Get the message block from queue
+ ACE_Message_Block *mb = 0;
+ int result = this->getq (mb);
+
+ // delete the message block
+ mb->release ();
+
+ if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Output thread -> received [%d] message\n",
+ i));
+ }
+
+ if (result == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Input::svc (%t) -> %p\n",
+ "putq error"),
+ -1);
+ }
+ }
+
+ (void) this->synch_.end_synchronization ();
+ return 0;
+}
+
+int
+Output_Task::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ /* if (debug)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Sticking message into "
+ " output queue \n"));
+ }*/
+ return this->putq (mb);
+}
/*******************************************************************/
static int
parse_args (int argc, ACE_TCHAR *argv[])
{
- ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:i:"));
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:s:t:d:"));
int c;
while ((c = get_opt ()) != -1)
@@ -322,9 +412,6 @@ parse_args (int argc, ACE_TCHAR *argv[])
case 's':
message_size = ACE_OS::atoi (get_opt.optarg);
break;
- case 'i':
- io_threads = ACE_OS::atoi (get_opt.optarg);
- break;
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s\n"
@@ -356,7 +443,24 @@ main (int argc, ACE_TCHAR *argv[])
ACE_High_Res_Timer::calibrate ();
// Create the message queue
- Message_Queue message_queue;
+ Message_Queue input_message_queue;
+ Message_Queue output_message_queue;
+
+ // Create the datablocks. IF we use the default Message Blocks Ctor,
+ // it is going to do an extra allocation for the data block
+ ACE_NEW_RETURN (data_block,
+ ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> >,
+ -1);
+
+ // Increment the reference count so that we can share the
+ // datablock. This is donw twice the number of messages for the
+ // input and output queues.
+ size_t i = 0;
+
+ for (i = 0; i < 2*number_of_messages; ++i)
+ {
+ data_block->duplicate ();
+ }
// Create the Synchronisers
Synchronisers synch;
@@ -367,21 +471,28 @@ main (int argc, ACE_TCHAR *argv[])
Worker_Task *[number_of_workers],
-1);
+ // Input Task
+ Input_Task input_task (&input_message_queue,
+ synch);
+ // Output Task
+ Output_Task output_task (&output_message_queue,
+ synch);
int priority =
ACE_Sched_Params::priority_max (ACE_SCHED_FIFO);
long flags = THR_SCHED_FIFO | THR_SCOPE_PROCESS;
- size_t i = 0;
// Create and activate the worker threads
for (i = 0; i < number_of_workers; ++i)
{
ACE_NEW_RETURN (workers[i],
- Worker_Task (&message_queue, synch),
+ Worker_Task (&input_message_queue, synch),
-1);
+ workers[i]->next (&output_task);
+
// Activate the workers.
result = workers[i]->activate (flags,
1,
@@ -403,29 +514,28 @@ main (int argc, ACE_TCHAR *argv[])
}
}
- // Workers.
- IO_Task **io_task = 0;
- ACE_NEW_RETURN (io_task,
- IO_Task *[io_threads],
- -1);
- // Create and activate the worker threads
- for (i = 0; i < io_threads; ++i)
- {
- ACE_NEW_RETURN (io_task[i],
- IO_Task (&message_queue, synch),
- -1);
+ // Activate the input and output threads
+ result = input_task.activate (flags,
+ 1,
+ 1,
+ priority);
- // Activate the workers.
- result = io_task[i]->activate (flags,
- 1,
- 1,
- priority);
+ if (result != 0)
+ return result;
+
+
+
+ // Activate the workers.
+ result = output_task.activate (flags,
+ 1,
+ 1,
+ priority);
+
+ if (result != 0)
+ return result;
- if (result != 0)
- return result;
- }
// Wait for all threads to terminate.
@@ -456,13 +566,6 @@ main (int argc, ACE_TCHAR *argv[])
}
delete[] workers;
- for (i = 0; i < io_threads; ++i)
- {
-
- delete io_task[i];
- }
- delete[] io_task;
-
return result;
}
diff --git a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h
index f7232b677a5..c914e4d96c0 100644
--- a/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h
+++ b/performance-tests/Server_Concurrency/Queue_Based_Workers/RT_CORBA_Workers.h
@@ -33,7 +33,7 @@ enum DEBUGGING_RANGE
typedef ACE_Message_Queue<ACE_MT_SYNCH> Message_Queue;
-
+/**************************************************************/
/**
* @class Synchronisers
*
@@ -76,20 +76,20 @@ private:
};
-
+/**************************************************************/
/**
- * @class IO_Task
+ * @class Input_Task
*
- * @brief Class that does the IO work ie. puts the events into the
+ * @brief Class that does the Input work ie. puts the events into the
* message queue
*/
-class IO_Task : public ACE_Task<ACE_MT_SYNCH>
+class Input_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
/// Ctor
- IO_Task (Message_Queue *mq,
- Synchronisers &synch);
+ Input_Task (Message_Queue *mq,
+ Synchronisers &synch);
/// The thread runs inside this method..
int svc (void);
@@ -100,8 +100,35 @@ private:
Synchronisers &synch_;
};
+/**************************************************************/
+/**
+ * @class Output_Task
+ *
+ * @brief Class that does the Output work ie. getsx the events into the
+ * message queue
+ */
+
+class Output_Task : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ /// Ctor
+ Output_Task (Message_Queue *mq,
+ Synchronisers &synch);
+
+ /// The thread runs inside this method..
+ int svc (void);
+
+ /// Need to overload this method to do anything useful..
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+
+private:
+
+ /// Our referance to Synchronisers
+ Synchronisers &synch_;
+};
+/**************************************************************/
/**
* @class Synchronisers
*