summaryrefslogtreecommitdiff
path: root/ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp')
-rw-r--r--ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp338
1 files changed, 338 insertions, 0 deletions
diff --git a/ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp b/ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp
new file mode 100644
index 00000000000..949a1f5123a
--- /dev/null
+++ b/ACE/TAO/performance-tests/RTCorba/Multiple_Endpoints/Orb_Per_Priority/client.cpp
@@ -0,0 +1,338 @@
+// $Id$
+
+#include "testC.h"
+#include "tao/RTCORBA/RTCORBA.h"
+#include "tao/RTCORBA/Priority_Mapping_Manager.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "ace/Get_Opt.h"
+#include "ace/Task.h"
+#include "ace/Stats.h"
+#include "ace/Throughput_Stats.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Sched_Params.h"
+#include "ace/Barrier.h"
+#include "ace/OS_NS_errno.h"
+#include "ace/OS_NS_unistd.h"
+
+ACE_RCSID(Latency, client, "$Id$")
+
+class Client : public ACE_Task_Base
+{
+ // = TITLE
+ // Run the client thread
+ //
+ // = DESCRIPTION
+ // Use the ACE_Task_Base class to run the client threads.
+ //
+public:
+ Client (void);
+ // ctor
+
+ void set (int niterations,
+ int id,
+ CORBA::ORB_ptr,
+ ACE_Barrier *before,
+ ACE_Barrier *after);
+ // Set the test attributes.
+
+ void accumulate_into (ACE_Throughput_Stats &throughput) const;
+ // Accumulate the throughput statistics into <throughput>
+
+ void dump_stats (const ACE_TCHAR* msg, ACE_UINT32 gsf);
+ // Accumulate the throughput statistics into <throughput>
+
+ // = The ACE_Task_Base methods....
+ virtual int svc (void);
+
+private:
+ CORBA::ORB_ptr orb_;
+
+ Test_var server_;
+ // The server.
+
+ int niterations_;
+ // The number of iterations on each client thread.
+
+ int id_;
+ // The application ID for this thread...
+
+ ACE_Throughput_Stats throughput_;
+ // Keep throughput statistics on a per-thread basis
+
+ ACE_Barrier *before_connection_;
+ ACE_Barrier *after_connection_;
+};
+
+// ****************************************************************
+
+const ACE_TCHAR *ior_base = ACE_TEXT("file://test.ior");
+int nthreads = 0;
+int niterations = 1000;
+int period = -1;
+const int MAX_THREADS = 128;
+Client client[MAX_THREADS];
+int priorities[MAX_THREADS];
+
+int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("i:t:n:p:"));
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'i':
+ ior_base = get_opts.opt_arg ();
+ break;
+ case 't':
+ if (nthreads < MAX_THREADS)
+ {
+ priorities[nthreads] = ACE_OS::atoi (get_opts.opt_arg ());
+ nthreads++;
+ }
+ break;
+ case 'n':
+ niterations = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case 'p':
+ period = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-i <ior> "
+ "-t <priority> "
+ "-n <niterations> "
+ "-p <period> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+ACE_TMAIN(int argc, ACE_TCHAR *argv[])
+{
+ int policy = ACE_SCHED_FIFO;
+ int flags = THR_SCHED_FIFO|THR_NEW_LWP|THR_JOINABLE|THR_BOUND;
+ int priority =
+ ACE_Sched_Params::priority_max (policy);
+
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+ if (ACE_OS::sched_params (ACE_Sched_Params (policy,
+ priority,
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "client (%P|%t): user is not superuser, "
+ "test runs in time-shared class\n"));
+ policy = ACE_SCHED_OTHER;
+ flags = THR_NEW_LWP|THR_JOINABLE;
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ "client (%P|%t): sched_params failed\n"));
+ }
+
+ try
+ {
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv);
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ // Obtain Priority Mapping used by the ORB.
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("PriorityMappingManager");
+
+ RTCORBA::PriorityMappingManager_var mapping_manager =
+ RTCORBA::PriorityMappingManager::_narrow (object.in ());
+
+ if (CORBA::is_nil (mapping_manager.in ()))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Priority Mapping Manager is nil\n"),
+ 1);
+ }
+
+ RTCORBA::PriorityMapping *pm =
+ mapping_manager->mapping ();
+
+ ACE_Barrier before_connection (nthreads);
+ ACE_Barrier after_connection (nthreads);
+
+ for (int i = 0; i != nthreads; ++i)
+ {
+ client[i].set (niterations,
+ i,
+ orb.in (),
+ &before_connection,
+ &after_connection);
+
+ CORBA::Short native_priority = 0;
+ pm->to_native (priorities[i], native_priority);
+
+ if (client[i].activate (flags,
+ 1, 1,
+ native_priority) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot activate client threads\n"),
+ 1);
+ }
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ ACE_Throughput_Stats throughput;
+
+ for (int j = 0; j != nthreads; ++j)
+ {
+ client[j].accumulate_into (throughput);
+
+ ACE_TCHAR buf[64];
+ ACE_OS::sprintf (buf, ACE_TEXT("Thread[index= %d]"), j);
+ client[j].dump_stats (buf, gsf);
+ }
+
+ throughput.dump_results (ACE_TEXT("Aggregated"), gsf);
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ ex._tao_print_exception ("Orb per priority client: exception raised");
+ return 1;
+ }
+
+ return 0;
+}
+
+// ****************************************************************
+
+Client::Client (void)
+{
+}
+
+void
+Client::set (int niterations,
+ int id,
+ CORBA::ORB_ptr orb,
+ ACE_Barrier *before,
+ ACE_Barrier *after)
+{
+ this->niterations_ = niterations;
+ this->id_ = id;
+ orb_ = orb;
+ this->before_connection_ = before;
+ this->after_connection_ = after;
+}
+
+int
+Client::svc (void)
+{
+ ACE_hthread_t current;
+ ACE_Thread::self (current);
+ int native_priority;
+ ACE_Thread::getprio (current, native_priority);
+ ACE_DEBUG ((LM_DEBUG,
+ "Thread (%t): index = %d corba_priority = %d"
+ " actual native priority = %d\n",
+ this->id_,
+ priorities[this->id_],
+ native_priority));
+
+ int i = 0;
+ try
+ {
+ char ior[100];
+ ACE_OS::sprintf (ior,
+ "%s_%d",
+ ior_base,
+ priorities[this->id_]);
+
+ CORBA::Object_var object =
+ orb_->string_to_object (ior);
+
+ Test_var server =
+ Test::_narrow (object.in ());
+
+ if (CORBA::is_nil (server.in ()))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Object reference <%C> is nil.\n",
+ ior),
+ 1);
+ }
+
+ this->before_connection_->wait ();
+
+ // Try to make sure every thread gets its own connection.
+ for (int j = 0; j < 100; ++j)
+ {
+ /* CORBA::PolicyList_var pols;
+ server->_validate_connection (pols.out ());
+ */
+
+ server->test_method (this->id_);
+ }
+
+ this->after_connection_->wait ();
+
+ ACE_hrtime_t throughput_base = ACE_OS::gethrtime ();
+
+ for (i = 0; i < this->niterations_; ++i)
+ {
+ // Record current time.
+ ACE_hrtime_t latency_base = ACE_OS::gethrtime ();
+
+ // Invoke method.
+ server->test_method (this->id_);
+
+ // Grab timestamp again.
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+
+ if (period != -1)
+ {
+ ACE_Time_Value tv (0, period * 1000);
+ ACE_OS::sleep (tv);
+ }
+
+ // Record statistics.
+ this->throughput_.sample (now - throughput_base,
+ now - latency_base);
+ }
+
+ /* char orb_name[50];
+ ACE_OS::sprintf (orb_name, "%d", priorities[this->id_]);
+ server->shutdown (orb_name);
+ */
+ }
+ catch (const CORBA::Exception& ex)
+ {
+ char message[100];
+ ACE_OS::sprintf (message,
+ "ORB_per_Priority::client: Exception in thread with native priority = %d, on iteration = %d",
+ this->id_,
+ i);
+ ex._tao_print_exception (message);
+ }
+ return 0;
+}
+
+void
+Client::accumulate_into (ACE_Throughput_Stats &throughput) const
+{
+ throughput.accumulate (this->throughput_);
+}
+
+void
+Client::dump_stats (const ACE_TCHAR* msg, ACE_UINT32 gsf)
+{
+ this->throughput_.dump_results (msg, gsf);
+}