summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/Event/Basic/Observer.cpp')
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Observer.cpp415
1 files changed, 415 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.cpp b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
new file mode 100644
index 00000000000..d58d828573f
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
@@ -0,0 +1,415 @@
+// $Id$
+
+#include "Observer.h"
+#include "Consumer.h"
+#include "Supplier.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "ace/Arg_Shifter.h"
+#include "ace/High_Res_Timer.h"
+
+ACE_RCSID (EC_Tests_Basic,
+ Observer,
+ "$Id$")
+
+int
+main (int argc, char *argv [])
+{
+ TAO_EC_Default_Factory::init_svcs ();
+ EC_Master master;
+ return master.run (argc, argv);
+}
+
+// ****************************************************************
+
+EC_Master::EC_Master (void)
+ : seed_ (0),
+ n_channels_ (4),
+ channels_ (0)
+{
+}
+
+EC_Master::~EC_Master (void)
+{
+ if (this->channels_ != 0)
+ {
+ for (int i = 0; i < this->n_channels_; ++i)
+ delete this->channels_[i];
+ delete[] this->channels_;
+ }
+}
+
+int
+EC_Master::run (int argc, char* argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+ // Calibrate the high resolution timer *before* starting the
+ // test.
+ ACE_High_Res_Timer::calibrate ();
+
+ this->seed_ = ACE_OS::time (0);
+
+ this->initialize_orb_and_poa (argc, argv ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (this->parse_args (argc, argv))
+ return 1;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "The seed value is %d\n", this->seed_));
+
+ ACE_NEW_RETURN (this->channels_,
+ EC_Observer*[this->n_channels_],
+ 1);
+
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ ACE_OS::rand_r (this->seed_);
+ ACE_NEW_RETURN (this->channels_[i],
+ EC_Observer (this,
+ this->seed_,
+ this->orb_.in (),
+ this->root_poa_.in (),
+ i),
+ 1);
+ }
+ }
+
+ {
+ char** targv;
+ ACE_NEW_RETURN (targv, char*[argc], 1);
+
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ int targc = argc;
+ for (int j = 0; j < targc; ++j)
+ targv[j] = argv[j];
+ this->channels_[i]->run_init (targc, targv ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ delete[] targv;
+ }
+
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ this->channels_[i]->execute_test (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ if (ACE_Thread_Manager::instance ()->wait () == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "EC_Master (%P|%t) thread manager wait failed\n"));
+ return 1;
+ }
+
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ this->channels_[i]->dump_results ();
+ }
+ }
+
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ this->channels_[i]->run_cleanup (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ this->channels_[i]->disconnect_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->channels_[i]->shutdown_clients (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->channels_[i]->destroy_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->channels_[i]->deactivate_ec (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->channels_[i]->cleanup_tasks ();
+ this->channels_[i]->cleanup_suppliers ();
+ this->channels_[i]->cleanup_consumers ();
+ this->channels_[i]->cleanup_ec ();
+ }
+ }
+
+ this->root_poa_->destroy (1,
+ 1
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ this->orb_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC_Driver::run");
+ }
+ ACE_CATCHALL
+ {
+ ACE_ERROR ((LM_ERROR, "EC_Driver (%P|%t) non-corba exception raised\n"));
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+void
+EC_Master::initialize_orb_and_poa (int &argc, char* argv[]
+ ACE_ENV_ARG_DECL)
+{
+ this->orb_ =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ CORBA::Object_var poa_object =
+ this->orb_->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "EC_Driver (%P|%t) Unable to initialize the POA.\n"));
+ return;
+ }
+
+ this->root_poa_ =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ PortableServer::POAManager_var poa_manager =
+ this->root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+int
+EC_Master::parse_args (int &argc, char *argv [])
+{
+ ACE_Arg_Shifter arg_shifter (argc, argv);
+
+ while (arg_shifter.is_anything_left ())
+ {
+ const char *arg = arg_shifter.get_current ();
+
+ if (ACE_OS::strcmp (arg, "-channels") == 0)
+ {
+ arg_shifter.consume_arg ();
+ this->n_channels_ = ACE_OS::atoi (arg_shifter.get_current ());
+ }
+ else if (ACE_OS::strcmp (arg, "-seed") == 0)
+ {
+ arg_shifter.consume_arg ();
+ this->seed_ = ACE_OS::atoi (arg_shifter.get_current ());
+ }
+
+ arg_shifter.ignore_arg ();
+ }
+ return 0;
+}
+
+int
+EC_Master::channel_count (void) const
+{
+ return this->n_channels_;
+}
+
+EC_Observer*
+EC_Master::channel (int i) const
+{
+ return this->channels_[i];
+}
+
+// ****************************************************************
+
+EC_Observer::EC_Observer (EC_Master *master,
+ ACE_RANDR_TYPE seed,
+ CORBA::ORB_ptr orb,
+ PortableServer::POA_ptr root_poa,
+ int id)
+ : master_ (master),
+ seed_ (seed),
+ id_ (id),
+ gwys_ (0)
+{
+ this->orb_ = CORBA::ORB::_duplicate (orb);
+ this->root_poa_ = PortableServer::POA::_duplicate (root_poa);
+}
+
+EC_Observer::~EC_Observer (void)
+{
+ if (this->gwys_ != 0)
+ delete[] this->gwys_;
+}
+
+void
+EC_Observer::initialize_orb_and_poa (int&, char*[]
+ ACE_ENV_ARG_DECL_NOT_USED)
+{
+}
+
+int
+EC_Observer::parse_args (int& argc, char* argv[])
+{
+ return this->EC_Driver::parse_args (argc, argv);
+}
+
+void
+EC_Observer::print_args (void) const
+{
+ this->EC_Driver::print_args ();
+}
+
+void
+EC_Observer::print_usage (void)
+{
+ this->EC_Driver::print_usage ();
+}
+
+void
+EC_Observer::execute_test (ACE_ENV_SINGLE_ARG_DECL)
+{
+ int peer_count = this->master_->channel_count ();
+ ACE_NEW (this->gwys_, TAO_EC_Gateway_IIOP[peer_count]);
+
+ for (int i = 0; i != peer_count; ++i)
+ {
+ if (i == this->id_)
+ continue;
+
+ RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
+ this->master_->channel (i)->event_channel_.in ();
+
+ this->gwys_[i].init (rmt_ec,
+ this->event_channel_.in ()
+ ACE_ENV_ARG_PARAMETER);
+
+ RtecEventChannelAdmin::Observer_var obs =
+ this->gwys_[i]._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::Observer_Handle h =
+ rmt_ec->append_observer (obs.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->gwys_[i].observer_handle (h);
+
+ ACE_CHECK;
+ }
+
+ if (this->allocate_tasks () == -1)
+ return;
+
+ this->activate_tasks (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (this->verbose ())
+ ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n",
+ this->id_));
+}
+
+void
+EC_Observer::run_cleanup (ACE_ENV_SINGLE_ARG_DECL)
+{
+ for (int j = 0; j != this->master_->channel_count (); ++j)
+ {
+ if (j == this->id_)
+ continue;
+
+ RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
+ this->master_->channel (j)->event_channel_.in ();
+ rmt_ec->remove_observer (this->gwys_[j].observer_handle ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->gwys_[j].shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+void
+EC_Observer::dump_results (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "===== Results for %d =====\n", this->id_));
+
+ ACE_Throughput_Stats throughput;
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ for (int j = 0; j < this->n_consumers_; ++j)
+ {
+ this->consumers_[j]->accumulate (throughput);
+ }
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ ACE_Throughput_Stats suppliers;
+ for (int i = 0; i < this->n_suppliers_; ++i)
+ {
+ this->suppliers_[i]->accumulate (suppliers);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
+ throughput.dump_results ("EC_Consumer/totals", gsf);
+
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+ suppliers.dump_results ("EC_Supplier/totals", gsf);
+}
+
+void
+EC_Observer::connect_consumer (
+ RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
+ int i
+ ACE_ENV_ARG_DECL)
+{
+ if (i == 0)
+ {
+ this->EC_Driver::connect_consumer (consumer_admin, i
+ ACE_ENV_ARG_PARAMETER);
+ return;
+ }
+ unsigned int x = ACE_OS::rand_r (this->seed_);
+ if (x < RAND_MAX / 8)
+ this->EC_Driver::connect_consumer (consumer_admin, i
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+EC_Observer::consumer_push (void*,
+ const RtecEventComm::EventSet&
+ ACE_ENV_ARG_DECL)
+{
+ unsigned int x = ACE_OS::rand_r (this->seed_);
+ if (x < (RAND_MAX / 64))
+ {
+ if (this->verbose ())
+ ACE_DEBUG ((LM_DEBUG,
+ "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_));
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ this->event_channel_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ for (int i = 1; i < this->n_consumers_; ++i)
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->consumers_[i]->connected ())
+ {
+ this->consumers_[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+ else
+ {
+ this->EC_Driver::connect_consumer (consumer_admin.in (),
+ i ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+ }
+ }
+}