diff options
Diffstat (limited to 'ACE/performance-tests/Server_Concurrency/Leader_Follower/RT_CORBA_Leader_Follower.cpp')
-rw-r--r-- | ACE/performance-tests/Server_Concurrency/Leader_Follower/RT_CORBA_Leader_Follower.cpp | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/ACE/performance-tests/Server_Concurrency/Leader_Follower/RT_CORBA_Leader_Follower.cpp b/ACE/performance-tests/Server_Concurrency/Leader_Follower/RT_CORBA_Leader_Follower.cpp new file mode 100644 index 00000000000..59b6844836a --- /dev/null +++ b/ACE/performance-tests/Server_Concurrency/Leader_Follower/RT_CORBA_Leader_Follower.cpp @@ -0,0 +1,401 @@ +// $Id$ + +#include "RT_CORBA_Leader_Follower.h" + + + +#if defined (ACE_HAS_THREADS) + +// We need the following only if we have threads enabled.. +#include "ace/OS_main.h" +#include "ace/ACE.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Sched_Params.h" +#include "ace/Profile_Timer.h" + +// Number of messages that are used in this experiment +static size_t number_of_messages = 1000; + +// Number of messages that are used in this experiment +static size_t number_of_messages_left = 0; + +// Number of threads used in this experiment +static size_t number_of_threads = 2; + +// Global variable for the availability of the leader +static size_t leader_available = 0; + +// Number of threads that are ready to go/dispatch +static size_t ready_threads = 0; + +// Work in the upcall +static size_t message_size = 100; + +// Debugging condition +static DEBUGGING_RANGE debug = DEBUG_NONE; + +// Timer for the test +ACE_High_Res_Timer test_timer; + + + + +/*******************************************************************/ +// Constructor for Synchronisers +Synchronisers::Synchronisers (void) + : mutex_ (), + condition_ (mutex_), + event_ () +{ +} + + +int +Synchronisers::start_synchronization (void) +{ + // Hold the lock and increment the global variable to indicate + // number of ready threads + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + + ready_threads ++; + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Ready to go.. \n")); + } + + if (ready_threads == number_of_threads) + { + + // Reset the ready_threads so that we can wait at the end of + // runs + ready_threads = 0; + + // Start the timer + test_timer.start (); + + // Signal all the threads + this->event_.signal (); + + // return to do our work; + return 0; + } + + // If we are not the last thread, let go off the lock + } + + // Wait blisfully till we are woken up + this->event_.wait (); + + return 0; +} + +int +Synchronisers::end_synchronization (void) +{ + // Hold the lock and increment the global variable to indicate + // number of ready threads + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + + ready_threads ++; + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Ready to go.. \n")); + } + + if (ready_threads == number_of_threads) + { + + // Reset the ready_threads so that we can wait at the end of + // runs + ready_threads = 0; + + // Start the timer + test_timer.stop (); + + // Signal all the threads + this->event_.signal (); + + // return to do our work; + return 0; + } + + // If we are not the last thread, let go off the lock + } + + // Wait blisfully till we are woken up + this->event_.wait (); + + return 0; +} + + +/*******************************************************************/ + +Leader_Follower_Task::Leader_Follower_Task (Synchronisers &synch) + : messages_consumed_ (0), + synch_ (synch) +{ +} + +int +Leader_Follower_Task::processed (void) +{ + return this->messages_consumed_; +} + +int +Leader_Follower_Task::svc (void) +{ + (void) this->synch_.start_synchronization (); + + for (;;) + { + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1); + + // Wait until there is no leader. + while (leader_available) + { + int result = this->synch_.condition_.wait (); + + if (result == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Leader_Follower_Task::svc (%t) -> %p\n", + "wait error"), + -1); + } + } + + // I am the leader. + leader_available = 1; + + // + // We are letting go of the leader follower lock before going + // in the event loop. + // + } + + // + // It is ok to modify these shared variables without a lock + // since we are the only leader. + // + + int exit_loop = 0; + if (number_of_messages_left == 0) + { + exit_loop = 1; + } + else + { + --number_of_messages_left; + + // Local counter. + ++this->messages_consumed_; + + if (debug) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) message for this thread %d\n", + this->messages_consumed_)); + } + } + + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->synch_.mutex_, -1); + + // I am no longer the leader. + leader_available = 0; + + // Wake up a follower. + this->synch_.condition_.signal (); + } + + if (exit_loop) + { + break; + } + else + { + // + // Process message here. + // + for (size_t j = 0; j < message_size; ++j) + { + // Eat a little CPU + /* takes about 40.2 usecs on a 167 MHz Ultra2 */ + u_long n = 11UL; + ACE::is_prime (n, 2, n / 2); + } + + } + } + + (void) this->synch_.end_synchronization (); + + return 0; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("m:t:d:s:")); + int c; + + while ((c = get_opt ()) != -1) + { + switch (c) + { + case 'm': + number_of_messages = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 't': + number_of_threads = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'd': + debug = static_cast<DEBUGGING_RANGE> (ACE_OS::atoi (get_opt.opt_arg ())); + break; + case 's': + message_size = ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s\n" + "\t[-m number of messages]\n" + "\t[-s message size]\n" + "\t[-w number of threads]\n" + "\t[-b burst size]\n" + "\t[-t timeout between bursts]\n" + "\t[-d debug]\n", + argv[0]), + -1); + } + } + + return 0; +} + +/*******************************************************************/ + +// Entry point + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + int result = parse_args (argc, argv); + + if (result != 0) + { + return result; + } + + Synchronisers synch_forms; + + ACE_High_Res_Timer::calibrate (); + + // Leader Followers. + Leader_Follower_Task **leader_followers = 0; + ACE_NEW_RETURN (leader_followers, + Leader_Follower_Task *[number_of_threads], + -1); + + int priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO); + + long flags = THR_SCOPE_PROCESS; + + // Number of messages left = Number_Of_messages + number_of_messages_left = number_of_messages; + + size_t i = 0; + // Create and activate them. + for (i = 0; i < number_of_threads; ++i) + { + ACE_NEW_RETURN (leader_followers[i], + Leader_Follower_Task (synch_forms), + -1); + + // Activate the leader_followers. + result = leader_followers[i]->activate (flags, + 1, + 1, + priority); + if (result != 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) - Activate failed for RT class " + " - Using default priority for thread [%d]\n", + i)); + + flags = THR_BOUND; + priority = ACE_Sched_Params::priority_min (ACE_SCHED_OTHER, + ACE_SCOPE_THREAD); + + // Activate the leader_followers. + result = leader_followers[i]->activate (flags, + 1, + 1, + priority); + + if (result != 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) - Failed again no hope \n")); + + return 0; + } + + } + } + + // Wait for all threads to terminate. + result = ACE_Thread_Manager::instance ()->wait (); + + ACE_hrtime_t elapsed_time = 0; + + test_timer.elapsed_time (elapsed_time); + + double elapsed_time_per_invocation = + static_cast<double> ( + ACE_UINT64_DBLCAST_ADAPTER (elapsed_time / number_of_messages) + ); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Throughput is [%f] \n", + 1000000000/ elapsed_time_per_invocation)); + + + for (i = 0; i < number_of_threads; ++i) + { + ACE_DEBUG ((LM_DEBUG, + "Message consumed in thread [%d] is [%d] \n", + i, leader_followers[i]->processed ())); + delete leader_followers[i]; + } + + delete[] leader_followers; + + return result; +} + + + + +#else /*if defined (ACE_HAS_THREADS)*/ + +int +main (int , char *[]) +{ + ACE_DEBUG ((LM_DEBUG, + "(%p|%t) Cannot run in SIngle threaded mode \n")); + + return 0; +} +#endif /*ACE_HAS_THREADS*/ |