summaryrefslogtreecommitdiff
path: root/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp')
-rw-r--r--TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp308
1 files changed, 205 insertions, 103 deletions
diff --git a/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp b/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp
index 5b2c2c067f2..3caff110df8 100644
--- a/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp
+++ b/TAO/tests/Cubit/COOL/MT_Cubit/Task_Client.cpp
@@ -1,39 +1,6 @@
-#include "Task_Client.h"
-
-#if defined (VXWORKS)
-#include "my_time.h"
-#endif /* defined (VXWORKS) */
-
-int stats(int data[], int n)
-{
- int i, j, key, sum, mean;
-
- if( n < 2 )
- return 0;
-
- /* sort the samples */
- for(j=1; j<n; j++) {
- key = data[j];
- i = j - 1;
- while( i >= 0 && data[i] > key ) {
- data[i+1] = data[i];
- i--;
- }
- data[i+1] = key;
- }
-
- sum = 0;
- for(i=0; i<n; i++) {
- sum += data[i];
- }
+// $Id$
- if( (n%2) == 0 )
- mean = (data[n/2-1] + data[n/2]) / 2;
- else
- mean = data[n/2];
-
- return sum/n;
-}
+#include "Task_Client.h"
Task_State::Task_State (int argc, char **argv)
: start_count_ (0),
@@ -44,15 +11,28 @@ Task_State::Task_State (int argc, char **argv)
argc_ (argc),
argv_ (argv),
thread_per_rate_ (0),
- global_jitter_array_ (0)
+ global_jitter_array_ (0),
+ use_chorus_ipc_ (0),
+ grain_ (1)
{
- ACE_OS::strcpy (server_host_, "localhost");
- ACE_Get_Opt opts (argc, argv, "h:n:t:p:d:r");
int c;
int datatype;
+ // defaults
+ ACE_OS::strcpy (server_host_, "localhost");
+ ior_header_ = ACE_OS::strdup ("cool-tcp");
+ ACE_Get_Opt opts (argc, argv, "Hh:n:t:p:d:rIg:");
+
while ((c = opts ()) != -1)
switch (c) {
+ case 'g':
+ grain_ = ACE_OS::atoi (opts.optarg);
+ if (grain_ < 1)
+ grain_ = 1;
+ break;
+ case 'I':
+ use_chorus_ipc_ = 1;
+ break;
case 'r':
thread_per_rate_ = 1;
break;
@@ -74,8 +54,8 @@ Task_State::Task_State (int argc, char **argv)
break;
case CB_SHORT:
default:
- datatype_ = CB_SHORT;
ACE_DEBUG ((LM_DEBUG, "Testing Shorts\n"));
+ datatype_ = CB_SHORT;
break;
}
continue;
@@ -91,46 +71,55 @@ Task_State::Task_State (int argc, char **argv)
case 't':
thread_count_ = (u_int) ACE_OS::atoi (opts.optarg);
continue;
- case '?':
- default:
+ case 'H':
ACE_DEBUG ((LM_DEBUG, "usage: %s"
"[-d datatype Octet=0, Short=1, Long=2, Struct=3]"
" [-n num_calls]"
" [-h server_hostname]"
" [-p server_port_num]"
" [-t num_threads]"
+ " [-I Use Chorus IPC. (For Chorus ClassiX *only*) ]"
+ " [-g granularity_of_request_timing]"
"\n", argv [0]));
+ continue;
}
+
+ if (use_chorus_ipc_ == 1)
+ {
+ ior_header_ = ACE_OS::strdup ("cool-chorus");
+ ACE_OS::strcpy (server_host_, "");
+ }
+
// thread_count_ + 1 because there is one utilization thread also
// wanting to begin at the same time the clients begin..
ACE_NEW (barrier_, ACE_Barrier (thread_count_ + 1));
ACE_NEW (latency_, double [thread_count_]);
ACE_NEW (global_jitter_array_, double *[thread_count_]);
- ACE_NEW (ave_latency_, int [thread_count_]);
}
Client::Client (Task_State *ts)
: ACE_Task<ACE_MT_SYNCH> (ACE_Thread_Manager::instance ()),
ts_ (ts)
{
-
-}
-
-void
-Client::put_ave_latency (int ave_latency, u_int thread_id) {
- ts_->lock_.acquire ();
- ts_->ave_latency_[thread_id] = ave_latency;
- ts_->lock_.release ();
}
void
Client::put_latency (double *jitter, double latency, u_int thread_id)
{
- ts_->lock_.acquire ();
+ ACE_MT (ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, ts_->lock_));
+
ts_->latency_[thread_id] = latency;
- ACE_DEBUG ((LM_DEBUG, "(%t) My latency was %f\n", latency));
ts_->global_jitter_array_ [thread_id] = jitter;
- ts_->lock_.release ();
+
+#if defined (ACE_LACKS_FLOATING_POINT)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) My latency was %u\n",
+ latency));
+#else
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) My latency was %f\n",
+ latency));
+#endif /* ! ACE_LACKS_FLOATING_POINT */
}
double
@@ -146,44 +135,67 @@ Client::get_low_priority_latency (void)
for (u_int i = 1; i < ts_->start_count_; i++)
l += (double) ts_->latency_[i];
- return l / (double) (ts_->start_count_ - 1);
+ return ts_->start_count_ > 1? l / (double) (ts_->start_count_ - 1) : 0;
}
int
Client::get_latency (u_int thread_id)
{
- return ts_->ave_latency_ [thread_id];
+ return ts_->latency_ [thread_id];
}
double
-Client::get_high_priority_jitter () {
+Client::get_high_priority_jitter (void)
+{
double jitter = 0;
double average = get_high_priority_latency ();
+
+ // Compute the standard deviation (i.e. jitter) from the values
+ // stored in the global_jitter_array_.
+
+ // we first compute the sum of the squares of the differences
+ // each latency has from the average
for (u_int i = 0; i < ts_->loop_count_; i ++)
{
- double difference = ts_->global_jitter_array_ [0][i] - average;
+ double difference =
+ ts_->global_jitter_array_ [0][i] - average;
jitter += difference * difference;
}
- return sqrt (jitter / (double) (ts_->loop_count_ - 1));
+
+ // Return the square root of the sum of the differences computed
+ // above, i.e. jitter.
+ return sqrt (jitter);
}
double
-Client::get_low_priority_jitter () {
+Client::get_low_priority_jitter (void)
+{
double jitter = 0;
double average = get_low_priority_latency ();
+
+ // Compute the standard deviation (i.e. jitter) from the values
+ // stored in the global_jitter_array_.
+
+ // We first compute the sum of the squares of the differences each
+ // latency has from the average.
for (u_int j = 1; j < ts_->start_count_; j ++)
for (u_int i = 0; i < ts_->loop_count_; i ++)
{
double difference = ts_->global_jitter_array_ [j][i] - average;
jitter += difference * difference;
}
- return (double) (jitter / ((ts_->loop_count_* (ts_->start_count_ - 1)) - 1));
+
+ // Return the square root of the sum of the differences computed
+ // above, i.e. jitter.
+ return sqrt (jitter);
}
int
Client::svc (void)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) Thread created\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) Thread created\n"));
+
u_int thread_id;
Cubit_ptr cb;
char ior [1024];
@@ -203,10 +215,11 @@ Client::svc (void)
ACE_DEBUG ((LM_DEBUG,
"(%t) Im the high priority client, my id is %d.\n",
thread_id));
- ::sprintf (ior,
- "cool-tcp://%s:%d",
- ts_->server_host_,
- ts_->base_port_);
+ ACE_OS::sprintf (ior,
+ "%s://%s:%d",
+ ts_->ior_header_,
+ ts_->server_host_,
+ ts_->base_port_);
frequency = CB_HIGH_PRIORITY_RATE;
}
else
@@ -214,10 +227,11 @@ Client::svc (void)
ACE_DEBUG ((LM_DEBUG,
"(%t) Im a low priority client, my id is %d\n",
thread_id));
- ::sprintf (ior,
- "cool-tcp://%s:%d",
- ts_->server_host_,
- ts_->base_port_);// + thread_id);
+ ACE_OS::sprintf (ior,
+ "%s://%s:%d",
+ ts_->ior_header_,
+ ts_->server_host_,
+ ts_->base_port_);// + thread_id);
frequency = CB_LOW_PRIORITY_RATE;
}
}
@@ -225,6 +239,9 @@ Client::svc (void)
{
switch (thread_id)
{
+/////////////////////////////////////
+// THIS NEEDS TO BE UPDATED AS ABOVE.
+/////////////////////////////////////
case CB_40HZ_CONSUMER:
ACE_DEBUG ((LM_DEBUG, "(%t) Im the high priority client, my id is %d.\n", thread_id));
::sprintf (ior, "cool-tcp://%s:%d", ts_->server_host_, ts_->base_port_);
@@ -274,15 +291,13 @@ Client::svc (void)
//
- // Initialise client's binding to an
+ // Initialize client's binding to an
// arbitrary cubit server (at some host)
//
COOL::EOABindingData bindingData (ior);
cb = Cubit::_bind(bindingData, env);
-
-
// objref = orb_ptr->string_to_object ((CORBA::String) ior, env);
// if (env.exception () != 0)
@@ -318,8 +333,6 @@ Client::svc (void)
ACE_OS::fflush (stdout);
ACE_DEBUG ((LM_DEBUG, "Object Created at: '%ul'", cb));
ACE_DEBUG ((LM_DEBUG, "connected to object '%s'", str));
- // if (cb->cube_short (2, env) == 8) // dummy call.
- // ACE_DEBUG ((LM_DEBUG, "(%t) Made successful dummy call"));
}
ACE_DEBUG ((LM_DEBUG, "(%t) Waiting for other threads to finish binding..\n"));
@@ -350,18 +363,32 @@ Client::run_tests (Cubit_ptr cb,
double latency = 0;
double sleep_time = (1/frequency) * (1000 * 1000);
double delta = 0;
- // Make the calls in a loop.
+ int pstartTime = 0;
+ int pstopTime = 0;
+ double real_time = 0.0;
+
+#if defined (USE_QUANTIFY)
+ quantify_stop_recording_data();
+ quantify_clear_data ();
+#endif /* USE_QUANTIFY */
+
+ // Make the calls in a loop.
for (i = 0; i < loop_count; i++)
{
-#if defined (VXWORKS)
- TimeStamp(&start[i]);
-#else
- ACE_Profile_Timer timer;
+ ACE_High_Res_Timer timer_;
ACE_Time_Value tv (0, (long int) (sleep_time - delta));
ACE_OS::sleep (tv);
- timer.start ();
-#endif /* defined (VXWORKS) */
+
+ // Elapsed time will be in microseconds.
+ ACE_Time_Value delta_t;
+
+#if defined (CHORUS)
+ pstartTime = pccTime1Get();
+#else /* CHORUS */
+ timer_.start ();
+#endif /* !CHORUS */
+
switch (datatype)
{
case CB_OCTET:
@@ -369,7 +396,16 @@ Client::run_tests (Cubit_ptr cb,
// Cube an octet.
CORBA::Octet arg_octet = func (i), ret_octet = 0;
+#if defined (USE_QUANTIFY)
+ /* start recording quantify data from here */
+ quantify_start_recording_data ();
+#endif /* USE_QUANTIFY */
ret_octet = cb->cube_octet (arg_octet, env);
+
+#if defined (USE_QUANTIFY)
+ quantify_stop_recording_data();
+#endif /* USE_QUANTIFY */
+
if (env.exception () != 0)
{
CORBA::SystemException* ex;
@@ -405,8 +441,17 @@ Client::run_tests (Cubit_ptr cb,
CORBA::Short arg_short = func (i), ret_short;
+#if defined (USE_QUANTIFY)
+ // start recording quantify data from here.
+ quantify_start_recording_data ();
+#endif /* USE_QUANTIFY */
+
ret_short = cb->cube_short (arg_short, env);
+#if defined (USE_QUANTIFY)
+ quantify_stop_recording_data();
+#endif /* USE_QUANTIFY */
+
if (env.exception () != 0)
ACE_ERROR_RETURN ((LM_ERROR,
"%s:Call failed\n",
@@ -430,8 +475,17 @@ Client::run_tests (Cubit_ptr cb,
CORBA::Long arg_long = func (i), ret_long;
+#if defined (USE_QUANTIFY)
+ // start recording quantify data from here.
+ quantify_start_recording_data ();
+#endif /* USE_QUANTIFY */
+
ret_long = cb->cube_long (arg_long, env);
+#if defined (USE_QUANTIFY)
+ quantify_stop_recording_data();
+#endif /* USE_QUANTIFY */
+
if (env.exception () != 0)
ACE_ERROR_RETURN ((LM_ERROR,"%s:Call failed\n", env.exception ()), 2);
@@ -456,8 +510,17 @@ Client::run_tests (Cubit_ptr cb,
arg_struct.s = func (i);
arg_struct.o = func (i);
+#if defined (USE_QUANTIFY)
+ // start recording quantify data from here.
+ quantify_start_recording_data ();
+#endif /* USE_QUANTIFY */
+
ret_struct = cb->cube_struct (arg_struct, env);
+#if defined (USE_QUANTIFY)
+ quantify_stop_recording_data();
+#endif /* USE_QUANTIFY */
+
if (env.exception () != 0)
ACE_ERROR_RETURN ((LM_ERROR,"%s:Call failed\n", env.exception ()), 2);
@@ -477,45 +540,84 @@ Client::run_tests (Cubit_ptr cb,
}
}
-#if defined (VXWORKS)
- TimeStamp(&stop[i]);
- elapsed_time[i] = DeltaTime(start[i], stop[i]);
- delta = ( (0.4 * fabs (elapsed_time[i])) + (0.6 * delta) ); // pow(10,6)
- latency += (double)elapsed_time[i];
-#else
- timer.stop();
- ACE_Profile_Timer::ACE_Elapsed_Time et;
- timer.elapsed_time (et);
- delta = ((0.4 * fabs (et.real_time * (1000 * 1000))) + (0.6 * delta)); // pow(10,6)
- latency += et.real_time;
- my_jitter_array [i] = et.real_time * 1000;
-#endif /* defined (VXWORKS) */
+ // use sysBench when CHORUS defined and option specified on command line
+#if defined (CHORUS)
+ if ( (loop_count % ts_->grain_) == 0)
+ pstopTime = pccTime1Get();
+#else /* CHORUS */
+ // if CHORUS is not defined just use plain timer_.stop ().
+ timer_.stop ();
+ timer_.elapsed_time (delta_t);
+#endif /* !CHORUS */
+
+ // Calculate time elapsed
+#if defined (ACE_LACKS_FLOATING_POINT)
+# if defined (CHORUS)
+ real_time = pstopTime - pstartTime;
+ my_jitter_array [i/ts_->grain_] = real_time; // in units of microseconds.
+ // update the latency array, correcting the index using the granularity
+# else /* CHORUS */
+ // Store the time in usecs.
+ real_time = delta_t.sec () * ACE_ONE_SECOND_IN_USECS +
+ delta_t.usec ();
+ my_jitter_array [i] = real_time; // in units of microseconds.
+# endif /* !CHORUS */
+ delta = ((40 * fabs (real_time) / 100) + (60 * delta / 100)); // pow(10,6)
+ latency += real_time;
+#else /* ACE_LACKS_FLOATING_POINT */
+ delta = ((0.4 * fabs (real_time * (1000 * 1000))) + (0.6 * delta)); // pow(10,6)
+ latency += real_time;
+ my_jitter_array [i] = real_time * 1000;
+#endif /* !ACE_LACKS_FLOATING_POINT */
}
-#if defined (VXWORKS)
- int ave_latency = stats(elapsed_time, loop_count);
- put_ave_latency(ave_latency, thread_id);
-#endif
-
if (call_count > 0)
{
-#if !defined (VXWORKS)
if (error_count == 0)
{
+#if defined (ACE_LACKS_FLOATING_POINT)
+ double calls_per_second = (call_count * ACE_ONE_SECOND_IN_USECS) / latency;
+#endif /* ACE_LACKS_FLOATING_POINT */
latency /= call_count;
if (latency > 0)
{
+#if defined (ACE_LACKS_FLOATING_POINT)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) cube average call ACE_OS::time\t= %u usec, \t"
+ "%u calls/second\n",
+ latency,
+ calls_per_second));
+
+ this->put_latency (my_jitter_array,
+ latency,
+ thread_id);
+#else
ACE_DEBUG ((LM_DEBUG, "cube average call ACE_OS::time\t= %f msec, \t"
"%f calls/second\n",
latency * 1000,
1 / latency));
- this->put_latency (my_jitter_array, latency * 1000, thread_id);
+ this->put_latency (my_jitter_array,
+ latency * 1000,
+ thread_id);
+#endif /* ! ACE_LACKS_FLOATING_POINT */
+ }
+ else
+ {
+ // still we have to call this function to store a valid array pointer.
+ this->put_latency (my_jitter_array,
+ 0,
+ thread_id);
+ ACE_DEBUG ((LM_DEBUG,
+ "*** Warning: Latency is less than or equal to zero."
+ " Precision may have been lost.\n"));
}
}
-#endif /* !defined (VXWORKS) */
- ACE_DEBUG ((LM_DEBUG, "%d calls, %d errors\n", call_count, error_count));
+ ACE_DEBUG ((LM_DEBUG,
+ "%d calls, %d errors\n",
+ call_count,
+ error_count));
}
// cb->please_exit (env);