diff options
Diffstat (limited to 'TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp')
-rw-r--r-- | TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp | 308 |
1 files changed, 205 insertions, 103 deletions
diff --git a/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp b/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp index 5b2c2c067f2..3caff110df8 100644 --- a/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp +++ b/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp @@ -1,39 +1,6 @@ -#include "Task_Client.h" - -#if defined (VXWORKS) -#include "my_time.h" -#endif /* defined (VXWORKS) */ - -int stats(int data[], int n) -{ - int i, j, key, sum, mean; - - if( n < 2 ) - return 0; - - /* sort the samples */ - for(j=1; j<n; j++) { - key = data[j]; - i = j - 1; - while( i >= 0 && data[i] > key ) { - data[i+1] = data[i]; - i--; - } - data[i+1] = key; - } - - sum = 0; - for(i=0; i<n; i++) { - sum += data[i]; - } +// $Id$ - if( (n%2) == 0 ) - mean = (data[n/2-1] + data[n/2]) / 2; - else - mean = data[n/2]; - - return sum/n; -} +#include "Task_Client.h" Task_State::Task_State (int argc, char **argv) : start_count_ (0), @@ -44,15 +11,28 @@ Task_State::Task_State (int argc, char **argv) argc_ (argc), argv_ (argv), thread_per_rate_ (0), - global_jitter_array_ (0) + global_jitter_array_ (0), + use_chorus_ipc_ (0), + grain_ (1) { - ACE_OS::strcpy (server_host_, "localhost"); - ACE_Get_Opt opts (argc, argv, "h:n:t:p:d:r"); int c; int datatype; + // defaults + ACE_OS::strcpy (server_host_, "localhost"); + ior_header_ = ACE_OS::strdup ("cool-tcp"); + ACE_Get_Opt opts (argc, argv, "Hh:n:t:p:d:rIg:"); + while ((c = opts ()) != -1) switch (c) { + case 'g': + grain_ = ACE_OS::atoi (opts.optarg); + if (grain_ < 1) + grain_ = 1; + break; + case 'I': + use_chorus_ipc_ = 1; + break; case 'r': thread_per_rate_ = 1; break; @@ -74,8 +54,8 @@ Task_State::Task_State (int argc, char **argv) break; case CB_SHORT: default: - datatype_ = CB_SHORT; ACE_DEBUG ((LM_DEBUG, "Testing Shorts\n")); + datatype_ = CB_SHORT; break; } continue; @@ -91,46 +71,55 @@ Task_State::Task_State (int argc, char **argv) case 't': thread_count_ = (u_int) ACE_OS::atoi (opts.optarg); continue; - case '?': - default: + case 'H': ACE_DEBUG ((LM_DEBUG, "usage: %s" "[-d datatype Octet=0, Short=1, Long=2, Struct=3]" " [-n num_calls]" " [-h server_hostname]" " [-p server_port_num]" " [-t num_threads]" + " [-I Use Chorus IPC. (For Chorus ClassiX *only*) ]" + " [-g granularity_of_request_timing]" "\n", argv [0])); + continue; } + + if (use_chorus_ipc_ == 1) + { + ior_header_ = ACE_OS::strdup ("cool-chorus"); + ACE_OS::strcpy (server_host_, ""); + } + // thread_count_ + 1 because there is one utilization thread also // wanting to begin at the same time the clients begin.. ACE_NEW (barrier_, ACE_Barrier (thread_count_ + 1)); ACE_NEW (latency_, double [thread_count_]); ACE_NEW (global_jitter_array_, double *[thread_count_]); - ACE_NEW (ave_latency_, int [thread_count_]); } Client::Client (Task_State *ts) : ACE_Task<ACE_MT_SYNCH> (ACE_Thread_Manager::instance ()), ts_ (ts) { - -} - -void -Client::put_ave_latency (int ave_latency, u_int thread_id) { - ts_->lock_.acquire (); - ts_->ave_latency_[thread_id] = ave_latency; - ts_->lock_.release (); } void Client::put_latency (double *jitter, double latency, u_int thread_id) { - ts_->lock_.acquire (); + ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, ts_->lock_)); + ts_->latency_[thread_id] = latency; - ACE_DEBUG ((LM_DEBUG, "(%t) My latency was %f\n", latency)); ts_->global_jitter_array_ [thread_id] = jitter; - ts_->lock_.release (); + +#if defined (ACE_LACKS_FLOATING_POINT) + ACE_DEBUG ((LM_DEBUG, + "(%t) My latency was %u\n", + latency)); +#else + ACE_DEBUG ((LM_DEBUG, + "(%t) My latency was %f\n", + latency)); +#endif /* ! ACE_LACKS_FLOATING_POINT */ } double @@ -146,44 +135,67 @@ Client::get_low_priority_latency (void) for (u_int i = 1; i < ts_->start_count_; i++) l += (double) ts_->latency_[i]; - return l / (double) (ts_->start_count_ - 1); + return ts_->start_count_ > 1? l / (double) (ts_->start_count_ - 1) : 0; } int Client::get_latency (u_int thread_id) { - return ts_->ave_latency_ [thread_id]; + return ts_->latency_ [thread_id]; } double -Client::get_high_priority_jitter () { +Client::get_high_priority_jitter (void) +{ double jitter = 0; double average = get_high_priority_latency (); + + // Compute the standard deviation (i.e. jitter) from the values + // stored in the global_jitter_array_. + + // we first compute the sum of the squares of the differences + // each latency has from the average for (u_int i = 0; i < ts_->loop_count_; i ++) { - double difference = ts_->global_jitter_array_ [0][i] - average; + double difference = + ts_->global_jitter_array_ [0][i] - average; jitter += difference * difference; } - return sqrt (jitter / (double) (ts_->loop_count_ - 1)); + + // Return the square root of the sum of the differences computed + // above, i.e. jitter. + return sqrt (jitter); } double -Client::get_low_priority_jitter () { +Client::get_low_priority_jitter (void) +{ double jitter = 0; double average = get_low_priority_latency (); + + // Compute the standard deviation (i.e. jitter) from the values + // stored in the global_jitter_array_. + + // We first compute the sum of the squares of the differences each + // latency has from the average. for (u_int j = 1; j < ts_->start_count_; j ++) for (u_int i = 0; i < ts_->loop_count_; i ++) { double difference = ts_->global_jitter_array_ [j][i] - average; jitter += difference * difference; } - return (double) (jitter / ((ts_->loop_count_* (ts_->start_count_ - 1)) - 1)); + + // Return the square root of the sum of the differences computed + // above, i.e. jitter. + return sqrt (jitter); } int Client::svc (void) { - ACE_DEBUG ((LM_DEBUG, "(%t) Thread created\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) Thread created\n")); + u_int thread_id; Cubit_ptr cb; char ior [1024]; @@ -203,10 +215,11 @@ Client::svc (void) ACE_DEBUG ((LM_DEBUG, "(%t) Im the high priority client, my id is %d.\n", thread_id)); - ::sprintf (ior, - "cool-tcp://%s:%d", - ts_->server_host_, - ts_->base_port_); + ACE_OS::sprintf (ior, + "%s://%s:%d", + ts_->ior_header_, + ts_->server_host_, + ts_->base_port_); frequency = CB_HIGH_PRIORITY_RATE; } else @@ -214,10 +227,11 @@ Client::svc (void) ACE_DEBUG ((LM_DEBUG, "(%t) Im a low priority client, my id is %d\n", thread_id)); - ::sprintf (ior, - "cool-tcp://%s:%d", - ts_->server_host_, - ts_->base_port_);// + thread_id); + ACE_OS::sprintf (ior, + "%s://%s:%d", + ts_->ior_header_, + ts_->server_host_, + ts_->base_port_);// + thread_id); frequency = CB_LOW_PRIORITY_RATE; } } @@ -225,6 +239,9 @@ Client::svc (void) { switch (thread_id) { +///////////////////////////////////// +// THIS NEEDS TO BE UPDATED AS ABOVE. +///////////////////////////////////// case CB_40HZ_CONSUMER: ACE_DEBUG ((LM_DEBUG, "(%t) Im the high priority client, my id is %d.\n", thread_id)); ::sprintf (ior, "cool-tcp://%s:%d", ts_->server_host_, ts_->base_port_); @@ -274,15 +291,13 @@ Client::svc (void) // - // Initialise client's binding to an + // Initialize client's binding to an // arbitrary cubit server (at some host) // COOL::EOABindingData bindingData (ior); cb = Cubit::_bind(bindingData, env); - - // objref = orb_ptr->string_to_object ((CORBA::String) ior, env); // if (env.exception () != 0) @@ -318,8 +333,6 @@ Client::svc (void) ACE_OS::fflush (stdout); ACE_DEBUG ((LM_DEBUG, "Object Created at: '%ul'", cb)); ACE_DEBUG ((LM_DEBUG, "connected to object '%s'", str)); - // if (cb->cube_short (2, env) == 8) // dummy call. - // ACE_DEBUG ((LM_DEBUG, "(%t) Made successful dummy call")); } ACE_DEBUG ((LM_DEBUG, "(%t) Waiting for other threads to finish binding..\n")); @@ -350,18 +363,32 @@ Client::run_tests (Cubit_ptr cb, double latency = 0; double sleep_time = (1/frequency) * (1000 * 1000); double delta = 0; - // Make the calls in a loop. + int pstartTime = 0; + int pstopTime = 0; + double real_time = 0.0; + +#if defined (USE_QUANTIFY) + quantify_stop_recording_data(); + quantify_clear_data (); +#endif /* USE_QUANTIFY */ + + // Make the calls in a loop. for (i = 0; i < loop_count; i++) { -#if defined (VXWORKS) - TimeStamp(&start[i]); -#else - ACE_Profile_Timer timer; + ACE_High_Res_Timer timer_; ACE_Time_Value tv (0, (long int) (sleep_time - delta)); ACE_OS::sleep (tv); - timer.start (); -#endif /* defined (VXWORKS) */ + + // Elapsed time will be in microseconds. + ACE_Time_Value delta_t; + +#if defined (CHORUS) + pstartTime = pccTime1Get(); +#else /* CHORUS */ + timer_.start (); +#endif /* !CHORUS */ + switch (datatype) { case CB_OCTET: @@ -369,7 +396,16 @@ Client::run_tests (Cubit_ptr cb, // Cube an octet. CORBA::Octet arg_octet = func (i), ret_octet = 0; +#if defined (USE_QUANTIFY) + /* start recording quantify data from here */ + quantify_start_recording_data (); +#endif /* USE_QUANTIFY */ ret_octet = cb->cube_octet (arg_octet, env); + +#if defined (USE_QUANTIFY) + quantify_stop_recording_data(); +#endif /* USE_QUANTIFY */ + if (env.exception () != 0) { CORBA::SystemException* ex; @@ -405,8 +441,17 @@ Client::run_tests (Cubit_ptr cb, CORBA::Short arg_short = func (i), ret_short; +#if defined (USE_QUANTIFY) + // start recording quantify data from here. + quantify_start_recording_data (); +#endif /* USE_QUANTIFY */ + ret_short = cb->cube_short (arg_short, env); +#if defined (USE_QUANTIFY) + quantify_stop_recording_data(); +#endif /* USE_QUANTIFY */ + if (env.exception () != 0) ACE_ERROR_RETURN ((LM_ERROR, "%s:Call failed\n", @@ -430,8 +475,17 @@ Client::run_tests (Cubit_ptr cb, CORBA::Long arg_long = func (i), ret_long; +#if defined (USE_QUANTIFY) + // start recording quantify data from here. + quantify_start_recording_data (); +#endif /* USE_QUANTIFY */ + ret_long = cb->cube_long (arg_long, env); +#if defined (USE_QUANTIFY) + quantify_stop_recording_data(); +#endif /* USE_QUANTIFY */ + if (env.exception () != 0) ACE_ERROR_RETURN ((LM_ERROR,"%s:Call failed\n", env.exception ()), 2); @@ -456,8 +510,17 @@ Client::run_tests (Cubit_ptr cb, arg_struct.s = func (i); arg_struct.o = func (i); +#if defined (USE_QUANTIFY) + // start recording quantify data from here. + quantify_start_recording_data (); +#endif /* USE_QUANTIFY */ + ret_struct = cb->cube_struct (arg_struct, env); +#if defined (USE_QUANTIFY) + quantify_stop_recording_data(); +#endif /* USE_QUANTIFY */ + if (env.exception () != 0) ACE_ERROR_RETURN ((LM_ERROR,"%s:Call failed\n", env.exception ()), 2); @@ -477,45 +540,84 @@ Client::run_tests (Cubit_ptr cb, } } -#if defined (VXWORKS) - TimeStamp(&stop[i]); - elapsed_time[i] = DeltaTime(start[i], stop[i]); - delta = ( (0.4 * fabs (elapsed_time[i])) + (0.6 * delta) ); // pow(10,6) - latency += (double)elapsed_time[i]; -#else - timer.stop(); - ACE_Profile_Timer::ACE_Elapsed_Time et; - timer.elapsed_time (et); - delta = ((0.4 * fabs (et.real_time * (1000 * 1000))) + (0.6 * delta)); // pow(10,6) - latency += et.real_time; - my_jitter_array [i] = et.real_time * 1000; -#endif /* defined (VXWORKS) */ + // use sysBench when CHORUS defined and option specified on command line +#if defined (CHORUS) + if ( (loop_count % ts_->grain_) == 0) + pstopTime = pccTime1Get(); +#else /* CHORUS */ + // if CHORUS is not defined just use plain timer_.stop (). + timer_.stop (); + timer_.elapsed_time (delta_t); +#endif /* !CHORUS */ + + // Calculate time elapsed +#if defined (ACE_LACKS_FLOATING_POINT) +# if defined (CHORUS) + real_time = pstopTime - pstartTime; + my_jitter_array [i/ts_->grain_] = real_time; // in units of microseconds. + // update the latency array, correcting the index using the granularity +# else /* CHORUS */ + // Store the time in usecs. + real_time = delta_t.sec () * ACE_ONE_SECOND_IN_USECS + + delta_t.usec (); + my_jitter_array [i] = real_time; // in units of microseconds. +# endif /* !CHORUS */ + delta = ((40 * fabs (real_time) / 100) + (60 * delta / 100)); // pow(10,6) + latency += real_time; +#else /* ACE_LACKS_FLOATING_POINT */ + delta = ((0.4 * fabs (real_time * (1000 * 1000))) + (0.6 * delta)); // pow(10,6) + latency += real_time; + my_jitter_array [i] = real_time * 1000; +#endif /* !ACE_LACKS_FLOATING_POINT */ } -#if defined (VXWORKS) - int ave_latency = stats(elapsed_time, loop_count); - put_ave_latency(ave_latency, thread_id); -#endif - if (call_count > 0) { -#if !defined (VXWORKS) if (error_count == 0) { +#if defined (ACE_LACKS_FLOATING_POINT) + double calls_per_second = (call_count * ACE_ONE_SECOND_IN_USECS) / latency; +#endif /* ACE_LACKS_FLOATING_POINT */ latency /= call_count; if (latency > 0) { +#if defined (ACE_LACKS_FLOATING_POINT) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) cube average call ACE_OS::time\t= %u usec, \t" + "%u calls/second\n", + latency, + calls_per_second)); + + this->put_latency (my_jitter_array, + latency, + thread_id); +#else ACE_DEBUG ((LM_DEBUG, "cube average call ACE_OS::time\t= %f msec, \t" "%f calls/second\n", latency * 1000, 1 / latency)); - this->put_latency (my_jitter_array, latency * 1000, thread_id); + this->put_latency (my_jitter_array, + latency * 1000, + thread_id); +#endif /* ! ACE_LACKS_FLOATING_POINT */ + } + else + { + // still we have to call this function to store a valid array pointer. + this->put_latency (my_jitter_array, + 0, + thread_id); + ACE_DEBUG ((LM_DEBUG, + "*** Warning: Latency is less than or equal to zero." + " Precision may have been lost.\n")); } } -#endif /* !defined (VXWORKS) */ - ACE_DEBUG ((LM_DEBUG, "%d calls, %d errors\n", call_count, error_count)); + ACE_DEBUG ((LM_DEBUG, + "%d calls, %d errors\n", + call_count, + error_count)); } // cb->please_exit (env); |