summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/Event/Basic/Random.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/Event/Basic/Random.cpp')
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Random.cpp565
1 files changed, 565 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.cpp b/TAO/orbsvcs/tests/Event/Basic/Random.cpp
new file mode 100644
index 00000000000..330677662e7
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/Random.cpp
@@ -0,0 +1,565 @@
+// $Id$
+
+#include "Random.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Time_Utilities.h"
+#include "ace/Arg_Shifter.h"
+
+ACE_RCSID(EC_Tests, Random, "$Id$")
+
+int
+main (int argc, char* argv[])
+{
+ RND_Driver driver;
+ return driver.run (argc, argv);
+}
+
+// ****************************************************************
+
+const int base_type = 20;
+
+void
+deactivate_servant (PortableServer::Servant servant,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ PortableServer::POA_var poa =
+ servant->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (servant, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (oid.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+
+RND_Driver::RND_Driver (void)
+ : timer_ (this),
+ nsuppliers_ (4),
+ nconsumers_ (4),
+ max_recursion_ (1)
+{
+ TAO_EC_Default_Factory::init_svcs ();
+}
+
+int
+RND_Driver::run (int argc, char *argv[])
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ ACE_Arg_Shifter arg_shifter (argc, argv);
+
+ while (arg_shifter.is_anything_left ())
+ {
+ char *arg = arg_shifter.get_current ();
+
+ if (ACE_OS::strcasecmp (arg, "-suppliers") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 1)
+ this->nsuppliers_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else if (ACE_OS::strcasecmp (arg, "-consumers") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 1)
+ this->nconsumers_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else if (ACE_OS::strcasecmp (arg, "-max_recursion") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 0)
+ this->max_recursion_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else
+ arg_shifter.ignore_arg ();
+ }
+
+ // ****************************************************************
+
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ PortableServer::POAManager_var poa_manager =
+ poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (),
+ poa.in ());
+ attributes.consumer_reconnect = 1;
+ attributes.supplier_reconnect = 1;
+
+ TAO_EC_Event_Channel ec_impl (attributes);
+ ec_impl.activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ RtecEventChannelAdmin::EventChannel_var event_channel =
+ ec_impl._this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ // Obtain the consumer admin..
+ this->consumer_admin_ =
+ event_channel->for_consumers (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Obtain the supplier admin..
+ this->supplier_admin_ =
+ event_channel->for_suppliers (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ {
+ // Let's say that the execution time for event 2 is 1
+ // milliseconds...
+ ACE_Time_Value tv (0, 50000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are
+ // reserved for the EC...
+ qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
+ time,
+ 0);
+
+ this->timer_.connect (this->consumer_admin_.in (),
+ qos.get_ConsumerQOS (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ {
+ ACE_SupplierQOS_Factory qos;
+ qos.insert (0, base_type, 0, 1);
+
+ this->supplier_.connect (this->supplier_admin_.in (),
+ qos.get_SupplierQOS (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_NEW_RETURN (this->consumers_,
+ RND_Consumer*[this->nconsumers_],
+ 1);
+ for (int i = 0; i != this->nconsumers_; ++i)
+ {
+ ACE_NEW_RETURN (this->consumers_[i],
+ RND_Consumer (this),
+ 1);
+
+ CORBA::Object_var obj =
+ this->consumers_[i]->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_NEW_RETURN (this->suppliers_,
+ RND_Supplier*[this->nsuppliers_],
+ 1);
+ for (int j = 0; j != this->nsuppliers_; ++j)
+ {
+ ACE_NEW_RETURN (this->suppliers_[j],
+ RND_Supplier,
+ 1);
+ this->suppliers_[j]->activate ();
+
+ CORBA::Object_var obj =
+ this->suppliers_[j]->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_Time_Value tv (30, 0);
+ orb->run (tv);
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ // ****************************************************************
+
+ {
+ for (int k = 0; k != this->nsuppliers_; ++k)
+ {
+ deactivate_servant (this->suppliers_[k],
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->suppliers_[k]->_remove_ref (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ delete[] this->suppliers_;
+ this->suppliers_ = 0;
+ }
+
+ // ****************************************************************
+
+ // We destroy now to verify that the callbacks work and do not
+ // produce any problems.
+ event_channel->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ {
+ for (int k = 0; k != this->nconsumers_; ++k)
+ {
+ deactivate_servant (this->consumers_[k],
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->consumers_[k]->_remove_ref (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ delete[] this->consumers_;
+ this->consumers_ = 0;
+ }
+
+ // ****************************************************************
+
+ deactivate_servant (&ec_impl,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ poa->destroy (1, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Random");
+ return 1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+void
+RND_Driver::timer (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ int r = ACE_OS::rand ();
+ if (r < 0)
+ r = -r;
+
+ int n = r% 20;
+
+ switch (n)
+ {
+ case 0:
+ case 1:
+ {
+ // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n"));
+ if (e.header.source < this->max_recursion_)
+ {
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0] = e;
+ event[0].header.source ++;
+ this->supplier_.push (event, ACE_TRY_ENV);
+ }
+ }
+ break;
+
+ default:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ // ACE_DEBUG ((LM_DEBUG, "Received event\n"));
+ break;
+
+ case 6:
+ {
+ int n = ACE_OS::rand () % this->nsuppliers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n));
+
+ ACE_SupplierQOS_Factory qos;
+ qos.insert (0, base_type, 0, 1);
+
+ this->suppliers_[n]->connect (this->supplier_admin_.in (),
+ qos.get_SupplierQOS (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 7:
+ {
+ int n = ACE_OS::rand () % this->nconsumers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n));
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ qos.insert_type (base_type, 0);
+
+ this->consumers_[n]->connect (this->consumer_admin_.in (),
+ qos.get_ConsumerQOS (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 8:
+ {
+ int n = ACE_OS::rand () % this->nsuppliers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n));
+
+ this->suppliers_[n]->disconnect (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 9:
+ {
+ int n = ACE_OS::rand () % this->nconsumers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n));
+
+ this->consumers_[n]->disconnect (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+ }
+}
+
+void
+RND_Driver::event (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ this->timer (e, ACE_TRY_ENV);
+}
+
+// ****************************************************************
+
+void
+RND_Timer::push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_TRY
+ {
+ this->driver_->timer (event[0], ACE_TRY_ENV);
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+}
+
+// ****************************************************************
+
+void
+RND_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin,
+ const RtecEventChannelAdmin::ConsumerQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ {
+ this->proxy_ = admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ proxy =
+ RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_.in ());
+ }
+ RtecEventComm::PushConsumer_var me =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+ proxy->connect_push_consumer (me.in (),
+ qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+RND_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+ this->proxy_->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
+
+void
+RND_Consumer::push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->driver_->event (event[0], ACE_TRY_ENV);
+}
+
+void
+RND_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+// ****************************************************************
+
+void
+RND_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin,
+ const RtecEventChannelAdmin::SupplierQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ {
+ this->proxy_ = admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
+ }
+ RtecEventComm::PushSupplier_var me =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+ proxy->connect_push_supplier (me.in (),
+ qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+RND_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+ this->proxy_->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+}
+
+void
+RND_Supplier::push_new_event (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.type = base_type;
+ event[0].header.source = 0;
+
+ this->push (event, ACE_TRY_ENV);
+}
+
+void
+RND_Supplier::push (RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+
+ proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
+ }
+
+ proxy->push (event, ACE_TRY_ENV);
+}
+
+void
+RND_Supplier::disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+int
+RND_Supplier::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "Thread %t started\n"));
+ int percent = 10;
+ int niterations = 5000;
+ for (int i = 0; i != niterations; ++i)
+ {
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ this->push_new_event (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+ if (i * 100 / niterations >= percent)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Thread %t %d%%\n", percent));
+ percent += 10;
+ }
+ }
+ ACE_DEBUG ((LM_DEBUG, "Thread %t completed\n"));
+ return 0;
+}