summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-12-07 18:10:50 +0000
committernobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-12-07 18:10:50 +0000
commit358bfa49cfe63a6f3032cd566afe8045e430be55 (patch)
tree9d68773251af989328c473c17ec5cc6beeafbf27
parentf22d093304909515f3c74d2e6b46797c8b46c5a7 (diff)
downloadATCD-358bfa49cfe63a6f3032cd566afe8045e430be55.tar.gz
This commit was manufactured by cvs2svn to create branch
'corba-env-clean'.
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp122
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h69
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp165
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h73
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp72
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h54
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp108
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h67
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp114
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h67
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp201
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h75
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp33
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp105
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h67
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp185
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp208
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp33
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp114
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h65
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp103
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h60
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp286
-rw-r--r--TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp182
24 files changed, 2628 insertions, 0 deletions
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp
new file mode 100644
index 00000000000..147a3c4152c
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp
@@ -0,0 +1,122 @@
+/**
+ * @file Consumer.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Consumer.h"
+#include "ECFS_Configuration.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_Federated_Scalability, Consumer, "$Id")
+
+ECFS_Consumer::
+ ECFS_Consumer (CORBA::Long experiment_id,
+ CORBA::ULong iterations)
+ : experiment_id_ (experiment_id)
+ , samples_ (iterations)
+{
+}
+
+void
+ECFS_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+
+ this->proxy_supplier_ =
+ consumer_admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushConsumer_var consumer =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::ConsumerQOS consumer_qos;
+ consumer_qos.is_gateway = 0;
+ consumer_qos.dependencies.length (2);
+ RtecEventComm::EventHeader& h0 =
+ consumer_qos.dependencies[0].event.header;
+ h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ h0.source = 1;
+
+ RtecEventComm::EventHeader& h1 =
+ consumer_qos.dependencies[1].event.header;
+ h1.type = ECFS_RESPONSE_EVENT_TYPE;
+ h1.source = this->experiment_id_;
+
+ this->proxy_supplier_->connect_push_consumer (consumer.in (),
+ consumer_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+ proxy = this->proxy_supplier_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+const Control::Samples &
+ECFS_Consumer::samples (void) const
+{
+ return this->samples_;
+}
+
+void
+ECFS_Consumer::push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // ACE_DEBUG ((LM_DEBUG, "Consumer::push (%P|%t)\n"));
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+
+ ACE_hrtime_t creation;
+ ORBSVCS_Time::TimeT_to_hrtime (creation,
+ events[0].header.creation_time);
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ CORBA::ULong l = this->samples_.length ();
+ this->samples_.length (l + 1);
+ this->samples_[l] = now - creation;
+}
+
+void
+ECFS_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_supplier_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h
new file mode 100644
index 00000000000..cd1d7428edc
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h
@@ -0,0 +1,69 @@
+/**
+ * @file Consumer.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECFS_CONSUMER_H
+#define ECFS_CONSUMER_H
+
+#include "ControlC.h"
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Consumer
+ *
+ * @brief Implement a simple consumer to keep track of the latency
+ *
+ */
+class ECFS_Consumer
+ : public virtual POA_RtecEventComm::PushConsumer
+ , public virtual PortableServer::RefCountServantBase
+
+{
+public:
+ /// Constructor
+ ECFS_Consumer (CORBA::Long experiment_id,
+ CORBA::ULong iterations);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ /// Get the samples
+ const Control::Samples &samples (void) const;
+
+ //@{
+ /** @name The RtecEventComm::PushConsumer methods
+ */
+ virtual void push (const RtecEventComm::EventSet& events,
+ CORBA::Environment &_env)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ virtual void disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The experiment id
+ CORBA::Long experiment_id_;
+
+ /// The samples recorded so far
+ Control::Samples samples_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_;
+};
+
+#endif /* ECFS_CONSUMER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp
new file mode 100644
index 00000000000..41ac5aa08cf
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp
@@ -0,0 +1,165 @@
+/**
+ * @file Coordinator.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Coordinator.h"
+
+#include "ace/High_Res_Timer.h"
+#include "ace/Sample_History.h"
+#include "ace/Basic_Stats.h"
+
+ACE_RCSID(EC_Federated_Scalability, Coordinator, "$Id$")
+
+ECFS_Coordinator::ECFS_Coordinator (int peers_expected,
+ int consumer_count,
+ int iterations,
+ int do_dump_history,
+ CORBA::ORB_ptr orb)
+ : peers_expected_ (peers_expected)
+ , consumer_count_ (consumer_count)
+ , iterations_ (iterations)
+ , do_dump_history_ (do_dump_history)
+ , orb_ (CORBA::ORB::_duplicate (orb))
+ , peers_count_ (0)
+ , peers_ (0)
+{
+ ACE_NEW (peers_, Control::Peer_var[this->peers_expected_]);
+}
+
+ECFS_Coordinator::~ECFS_Coordinator (void)
+{
+ delete[] this->peers_;
+}
+
+void
+ECFS_Coordinator::join (Control::Peer_ptr peer,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (this->peers_count_ == this->peers_expected_)
+ return;
+
+ this->peers_[this->peers_count_++] =
+ Control::Peer::_duplicate (peer);
+
+ if (this->peers_count_ < this->peers_expected_)
+ return;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Coordinator (%P|%t) Building the federation\n"));
+ /// Build the EC federation
+ size_t i;
+ for (i = 0; i != this->peers_count_; ++i)
+ {
+ RtecEventChannelAdmin::EventChannel_var channel =
+ this->peers_[i]->channel (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ for (size_t j = 0; j != this->peers_count_; ++j)
+ {
+ if (i != j)
+ {
+ this->peers_[j]->connect (channel.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ }
+ }
+
+ Control::Loopback_var *loopbacks;
+ ACE_NEW (loopbacks, Control::Loopback_var[this->peers_count_]);
+
+ /// Run the tests
+ for (i = 0; i != 1; ++i)
+ {
+ CORBA::Long experiment_id = 128 + i;
+
+ size_t lcount = 0;
+
+ size_t j;
+ for (j = 0; j != this->peers_count_; ++j)
+ {
+ if (j != i)
+ {
+ loopbacks[lcount++] =
+ this->peers_[j]->setup_loopback (experiment_id,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ }
+
+ for (int c = 5; c != 105; c += 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Coordinator (%P|%t) "
+ "Starting (%T) test for %d consumer\n",
+ c));
+ CORBA::Long gsf;
+ Control::Samples_var samples =
+ this->peers_[i]->run_experiment (c,
+ experiment_id,
+ this->iterations_,
+ gsf,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ ACE_Sample_History history (samples->length ());
+
+
+ char filename[1024];
+ ACE_OS::sprintf (filename,
+ "ec_federated_scalability.%d.log",
+ c);
+ FILE *output_file = ACE_OS::fopen (filename, "w");
+ if (output_file == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Cannot open output file %s",
+ filename));
+ }
+ else
+ {
+ for (CORBA::ULong k = 0; k != samples->length (); ++k)
+ {
+ history.sample (samples[k]);
+ ACE_OS::fprintf (output_file,
+ "HISTO: %d " ACE_UINT64_FORMAT_SPECIFIER "\n",
+ k, samples[k] / gsf);
+ }
+ ACE_OS::fclose (output_file);
+ }
+
+ ACE_Basic_Stats stats;
+ history.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+
+ // if (this->do_dump_history_)
+ // {
+ // history.dump_samples ("HISTORY", gsf);
+ // }
+ }
+
+ for (j = 0; j != lcount; ++j)
+ {
+ loopbacks[j]->destroy (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+
+ }
+
+ for (i = 0; i != this->peers_count_; ++i)
+ {
+ this->peers_[i]->shutdown (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ this->orb_->shutdown (0, ACE_TRY_ENV);
+ ACE_CHECK;
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h
new file mode 100644
index 00000000000..a59e60a69d8
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h
@@ -0,0 +1,73 @@
+/**
+ * @file Coordinator.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECFS_COORDINATOR_H
+#define ECFS_COORDINATOR_H
+
+#include "ControlS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Coordinator
+ *
+ * @brief Implement the Control::Coordinator interface
+ *
+ */
+class ECFS_Coordinator
+ : public virtual POA_Control::Coordinator
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ ECFS_Coordinator (int peers_expected,
+ int consumer_count,
+ int iterations,
+ int do_dump_history,
+ CORBA::ORB_ptr orb);
+
+ /// Destructor
+ virtual ~ECFS_Coordinator (void);
+
+ //@{
+ /** @name The Control::Coordinator methods
+ */
+ virtual void join (Control::Peer_ptr peer,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// Number of peers expected
+ size_t peers_expected_;
+
+ /// Number of consumers to run on each test
+ size_t consumer_count_;
+
+ /// Number of iterations on each test
+ int iterations_;
+
+ /// This flag is set to 1 to dump the complete test history
+ int do_dump_history_;
+
+ /// Keep a reference to the ORB, used in shutdown
+ CORBA::ORB_var orb_;
+
+ /// Current number of peers
+ size_t peers_count_;
+
+ /// Peer collection
+ Control::Peer_var *peers_;
+
+};
+
+#endif /* ECFS_COORDINATOR_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp
new file mode 100644
index 00000000000..b314b80feeb
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp
@@ -0,0 +1,72 @@
+/**
+ * @file Loopback.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Loopback.h"
+#include "ECFS_Configuration.h"
+
+ACE_RCSID(EC_Federated_Scalability, Loopback, "$Id")
+
+ECFS_Loopback::ECFS_Loopback (void)
+{
+}
+
+void
+ECFS_Loopback::init (CORBA::Long experiment_id,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ this->supplier_ =
+ Servant_var<ECFS_Loopback_Supplier> (
+ new ECFS_Loopback_Supplier (experiment_id)
+ );
+ this->supplier_->connect (ec, ACE_TRY_ENV);
+ ACE_CHECK;
+
+ this->consumer_ =
+ Servant_var<ECFS_Loopback_Consumer> (
+ new ECFS_Loopback_Consumer (experiment_id,
+ this->supplier_.in ())
+ );
+ this->consumer_->connect (ec, ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Loopback::destroy (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_TRY
+ {
+ this->consumer_->disconnect (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ this->supplier_->disconnect (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class Servant_var<ECFS_Loopback_Consumer>;
+template class Servant_var<ECFS_Loopback_Supplier>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate Servant_var<ECFS_Loopback_Consumer>
+#pragma instantiate Servant_var<ECFS_Loopback_Supplier>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h
new file mode 100644
index 00000000000..32ff0f92c8b
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h
@@ -0,0 +1,54 @@
+/**
+ * @file Loopback.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECFS_LOOPBACK_H
+#define ECFS_LOOPBACK_H
+
+#include "ControlS.h"
+#include "Servant_var.h"
+#include "Loopback_Consumer.h"
+#include "Loopback_Supplier.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Loopback
+ *
+ * @brief Implement the Control::Loopback interface
+ *
+ */
+class ECFS_Loopback
+ : public virtual POA_Control::Loopback
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ ECFS_Loopback (void);
+
+ /// Initialize the loopback
+ void init (CORBA::Long experiment_id,
+ RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ //@{
+ /** @name The Control::Loopback methods
+ */
+ virtual void destroy (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Keep a reference to the loopback consumer
+ Servant_var<ECFS_Loopback_Consumer> consumer_;
+
+ /// Keep a reference to the loopback supplier
+ Servant_var<ECFS_Loopback_Supplier> supplier_;
+};
+
+#endif /* ECFS_LOOPBACK_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp
new file mode 100644
index 00000000000..1d6451510d7
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp
@@ -0,0 +1,108 @@
+/**
+ * @file Loopback_Consumer.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Loopback_Consumer.h"
+#include "ECFS_Configuration.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_Federated_Scalability, Loopback_Consumer, "$Id$")
+
+ECFS_Loopback_Consumer::
+ ECFS_Loopback_Consumer (CORBA::Long experiment_id,
+ ECFS_Loopback_Supplier *supplier)
+ : experiment_id_ (experiment_id)
+ , supplier_ (supplier)
+{
+ this->supplier_->_add_ref ();
+}
+
+void
+ECFS_Loopback_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+
+ this->proxy_supplier_ =
+ consumer_admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushConsumer_var consumer =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::ConsumerQOS consumer_qos;
+ consumer_qos.is_gateway = 0;
+ consumer_qos.dependencies.length (2);
+ RtecEventComm::EventHeader& h0 =
+ consumer_qos.dependencies[0].event.header;
+ h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ h0.source = 1;
+
+ RtecEventComm::EventHeader& h1 =
+ consumer_qos.dependencies[1].event.header;
+ h1.type = ECFS_START_EVENT_TYPE;
+ h1.source = this->experiment_id_;
+
+ this->proxy_supplier_->connect_push_consumer (consumer.in (),
+ consumer_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Loopback_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+ proxy = this->proxy_supplier_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Loopback_Consumer::push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // ACE_DEBUG ((LM_DEBUG, "Loopback_Consumer::push (%P|%t)\n"));
+ this->supplier_->push (events, ACE_TRY_ENV);
+}
+
+void
+ECFS_Loopback_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_supplier_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h
new file mode 100644
index 00000000000..50a8ec67e24
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h
@@ -0,0 +1,67 @@
+/**
+ * @file Loopback_Consumer.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECFS_LOOPBACK_CONSUMER_H
+#define ECFS_LOOPBACK_CONSUMER_H
+
+#include "Loopback_Supplier.h"
+#include "Servant_var.h"
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Loopback_Consumer
+ *
+ * @brief Implement a simple consumer to keep track of the latency
+ *
+ */
+class ECFS_Loopback_Consumer
+ : public virtual POA_RtecEventComm::PushConsumer
+ , public virtual PortableServer::RefCountServantBase
+
+{
+public:
+ /// Constructor
+ ECFS_Loopback_Consumer (CORBA::Long experiment_id,
+ ECFS_Loopback_Supplier *supplier);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ //@{
+ /** @name The RtecEventComm::PushConsumer methods
+ */
+ virtual void push (const RtecEventComm::EventSet& events,
+ CORBA::Environment &_env)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ virtual void disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The experiment id
+ CORBA::Long experiment_id_;
+
+ /// The supplier used to close the loopback
+ Servant_var<ECFS_Loopback_Supplier> supplier_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_;
+};
+
+#endif /* ECFS_LOOPBACK_CONSUMER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp
new file mode 100644
index 00000000000..5259385fa1f
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp
@@ -0,0 +1,114 @@
+/**
+ * @file Loopback_Supplier.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Loopback_Supplier.h"
+#include "ECFS_Configuration.h"
+
+ACE_RCSID(EC_Federated_Scalability, Loopback_Supplier, "$Id$")
+
+ECFS_Loopback_Supplier::ECFS_Loopback_Supplier (CORBA::Long experiment_id)
+ : experiment_id_ (experiment_id)
+{
+}
+
+void
+ECFS_Loopback_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+
+ this->proxy_consumer_ =
+ supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushSupplier_var supplier =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::SupplierQOS supplier_qos;
+ supplier_qos.is_gateway = 0;
+ supplier_qos.publications.length (1);
+ RtecEventComm::EventHeader& sh0 =
+ supplier_qos.publications[0].event.header;
+ sh0.type = ECFS_RESPONSE_EVENT_TYPE;
+ sh0.source = this->experiment_id_;
+
+ this->proxy_consumer_->connect_push_supplier (supplier.in (),
+ supplier_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Loopback_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Loopback_Supplier::push (const RtecEventComm::EventSet &source,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_;
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "Loopback_Supplier::push (%P|%t)\n"));
+ RtecEventComm::EventSet events (source);
+ for (CORBA::ULong i = 0; i != events.length (); ++i)
+ {
+ events[i].header.ttl = 1;
+ events[i].header.type = ECFS_RESPONSE_EVENT_TYPE;
+ events[i].header.source = this->experiment_id_;
+ }
+
+ proxy->push (events, ACE_TRY_ENV);
+}
+
+void
+ECFS_Loopback_Supplier::disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_consumer_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h
new file mode 100644
index 00000000000..eb40fe10eda
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h
@@ -0,0 +1,67 @@
+/**
+ * @file Loopback_Supplier.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#ifndef ECFS_LOOPBACK_SUPPLIER_H
+#define ECFS_LOOPBACK_SUPPLIER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Loopback_Supplier
+ *
+ * @brief Implement a simple supplier to keep track of the latency
+ *
+ */
+class ECFS_Loopback_Supplier
+ : public virtual POA_RtecEventComm::PushSupplier
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ /**
+ * The experiment ID is used to configure the supplier ID on the
+ * publication.
+ */
+ ECFS_Loopback_Supplier (CORBA::Long experiment_id);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ void push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ //@{
+ /** @name The RtecEventComm::PushSupplier methods
+ */
+ virtual void disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// The experiment id
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The experiment id
+ CORBA::Long experiment_id_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_;
+};
+
+#endif /* ECFS_LOOPBACK_SUPPLIER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp
new file mode 100644
index 00000000000..0344c4c2c00
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp
@@ -0,0 +1,201 @@
+/**
+ * @file Peer.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Peer.h"
+#include "Servant_var.h"
+#include "Consumer.h"
+#include "Supplier.h"
+#include "Loopback.h"
+#include "ECFS_Configuration.h"
+
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Gateway.h"
+
+#include "ace/High_Res_Timer.h"
+
+ACE_RCSID(EC_Federated_Scalability, Peer, "$Id$")
+
+ECFS_Peer::ECFS_Peer (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+ECFS_Peer::~ECFS_Peer (void)
+{
+}
+
+void
+ECFS_Peer::init (PortableServer::POA_ptr root_poa,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ TAO_EC_Event_Channel_Attributes attr (root_poa, root_poa);
+ Servant_var<TAO_EC_Event_Channel> ec_impl (
+ new TAO_EC_Event_Channel (attr)
+ );
+
+ ec_impl->activate (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ this->event_channel_ =
+ ec_impl->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+RtecEventChannelAdmin::EventChannel_ptr
+ECFS_Peer::channel (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ return RtecEventChannelAdmin::EventChannel::_duplicate (this->event_channel_.in ());
+}
+
+void
+ECFS_Peer::connect (RtecEventChannelAdmin::EventChannel_ptr remote_ec,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // ACE_DEBUG ((LM_DEBUG, "(%P|%t) Connecting....\n"));
+ Servant_var<TAO_EC_Gateway_IIOP> gateway (new TAO_EC_Gateway_IIOP);
+ gateway->init (remote_ec,
+ this->event_channel_.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::Observer_var observer =
+ gateway->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::Observer_Handle h =
+ this->event_channel_->append_observer (observer.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ gateway->observer_handle (h);
+}
+
+Control::Loopback_ptr
+ECFS_Peer::setup_loopback (CORBA::Long experiment_id,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ Servant_var<ECFS_Loopback> loopback (new ECFS_Loopback);
+
+ loopback->init (experiment_id,
+ this->event_channel_.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (Control::Loopback::_nil ());
+
+ return loopback->_this (ACE_TRY_ENV);
+}
+
+Control::Samples *
+ECFS_Peer::run_experiment (CORBA::Long consumer_count,
+ CORBA::Long experiment_id,
+ CORBA::Long iterations,
+ CORBA::Long_out gsf,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ Servant_var<ECFS_Consumer> *consumer;
+ ACE_NEW_THROW_EX (consumer,
+ Servant_var<ECFS_Consumer>[consumer_count],
+ CORBA::NO_MEMORY ());
+ ACE_CHECK_RETURN (0);
+ int i;
+ for (i = 0; i != consumer_count; ++i)
+ {
+ consumer[i] =
+ Servant_var<ECFS_Consumer> (new ECFS_Consumer (experiment_id,
+ iterations));
+ consumer[i]->connect (this->event_channel_.in (), ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ }
+
+ Servant_var<ECFS_Supplier> supplier (
+ new ECFS_Supplier (experiment_id)
+ );
+
+ supplier->connect (this->event_channel_.in (), ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+
+ // ACE_DEBUG ((LM_DEBUG, "Connected consumer & supplier\n"));
+
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.type = ECFS_START_EVENT_TYPE;
+ event[0].header.source = experiment_id;
+ event[0].header.ttl = 1;
+
+ for (i = 0; i != iterations; ++i)
+ {
+ ACE_hrtime_t creation = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
+ creation);
+ // push one event...
+ supplier->push (event, ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ }
+
+ supplier->disconnect (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+
+ Control::Samples_var samples (new Control::Samples (iterations));
+ samples->length (iterations);
+ for (int j = 0; j != iterations; ++j)
+ samples[j] = 0;
+
+ for (i = 0; i != consumer_count; ++i)
+ {
+ for (int j = 0; j != iterations; ++j)
+ {
+ CORBA::ULongLong sample =
+ consumer[i]->samples ()[j];
+ if (samples[j] < sample)
+ samples[j] = sample;
+ }
+ consumer[i]->disconnect (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Calibrating high res timer ...."));
+ ACE_High_Res_Timer::calibrate ();
+ gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "Done (%d)\n", gsf));
+
+ return samples._retn ();
+}
+
+void
+ECFS_Peer::shutdown (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->event_channel_->destroy (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ this->orb_->shutdown (0, ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class Servant_var<TAO_EC_Event_Channel>;
+template class Servant_var<TAO_EC_Gateway_IIOP>;
+template class Servant_var<ECFS_Consumer>;
+template class Servant_var<ECFS_Supplier>;
+template class Servant_var<ECFS_Loopback>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate Servant_var<TAO_EC_Event_Channel>
+#pragma instantiate Servant_var<TAO_EC_Gateway_IIOP>
+#pragma instantiate Servant_var<ECFS_Consumer>
+#pragma instantiate Servant_var<ECFS_Supplier>
+#pragma instantiate Servant_var<ECFS_Loopback>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h
new file mode 100644
index 00000000000..7efdc6bd39d
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h
@@ -0,0 +1,75 @@
+/**
+ * @file Peer.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECFS_PEER_H
+#define ECFS_PEER_H
+
+#include "ControlS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Peer
+ *
+ * @brief Implement the Control::Peer interface
+ *
+ */
+class ECFS_Peer
+ : public virtual POA_Control::Peer
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ ECFS_Peer (CORBA::ORB_ptr orb);
+
+ /// Destructor
+ virtual ~ECFS_Peer (void);
+
+ /// Initialize the peer
+ void init (PortableServer::POA_ptr poa,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ //@{
+ /** @name The Control::Peer methods
+ */
+ virtual RtecEventChannelAdmin::EventChannel_ptr
+ channel (CORBA::Environment&)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void connect (RtecEventChannelAdmin::EventChannel_ptr remote_ec,
+ CORBA::Environment&)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ Control::Loopback_ptr setup_loopback (CORBA::Long experiment_id,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual Control::Samples* run_experiment (CORBA::Long consumer_count,
+ CORBA::Long experiment_id,
+ CORBA::Long iterations,
+ CORBA::Long_out gsf,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void shutdown (CORBA::Environment&)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// Keep a reference to the ORB, used in shutdown
+ CORBA::ORB_var orb_;
+
+ /// Event Channel references
+ RtecEventChannelAdmin::EventChannel_var event_channel_;
+};
+
+#endif /* ECFS_PEER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp
new file mode 100644
index 00000000000..d5ca3e9d36a
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp
@@ -0,0 +1,33 @@
+/**
+ * @file Servant_var.cpp
+ *
+ * $Id$
+ *
+ * @author Jody Hagins <jody@atdesk.com>
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+#ifndef CS_SERVANT_VAR_CPP
+#define CS_SERVANT_VAR_CPP
+
+#include "Servant_var.h"
+
+#if !defined(__ACE_INLINE__)
+#include "Servant_var.inl"
+#endif /* __ACE_INLINE__ */
+
+template<typename SERVANT>
+Servant_var<SERVANT>::~Servant_var ()
+{
+ if (this->ptr_ == 0)
+ return;
+
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY {
+ this->ptr_->_remove_ref (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ } ACE_CATCHANY {
+ // @@ This event should be logged...
+ } ACE_ENDTRY;
+}
+
+#endif /* CS_SERVANT_VAR_CPP */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp
new file mode 100644
index 00000000000..08cc54f957e
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp
@@ -0,0 +1,105 @@
+/**
+ * @file Supplier.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Supplier.h"
+#include "ECFS_Configuration.h"
+
+ACE_RCSID(EC_Federated_Scalability, Supplier, "$Id$")
+
+ECFS_Supplier::ECFS_Supplier (CORBA::Long experiment_id)
+ : experiment_id_ (experiment_id)
+{
+}
+
+void
+ECFS_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+
+ this->proxy_consumer_ =
+ supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushSupplier_var supplier =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::SupplierQOS supplier_qos;
+ supplier_qos.is_gateway = 0;
+ supplier_qos.publications.length (1);
+ RtecEventComm::EventHeader& sh0 =
+ supplier_qos.publications[0].event.header;
+ sh0.type = ECFS_START_EVENT_TYPE;
+ sh0.source = this->experiment_id_;
+
+ this->proxy_consumer_->connect_push_supplier (supplier.in (),
+ supplier_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECFS_Supplier::push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_;
+ }
+
+ proxy->push (events, ACE_TRY_ENV);
+}
+
+void
+ECFS_Supplier::disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_consumer_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h
new file mode 100644
index 00000000000..171b4ec715f
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h
@@ -0,0 +1,67 @@
+/**
+ * @file Supplier.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#ifndef ECFS_SUPPLIER_H
+#define ECFS_SUPPLIER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECFS_Supplier
+ *
+ * @brief Implement a simple supplier to keep track of the latency
+ *
+ */
+class ECFS_Supplier
+ : public virtual POA_RtecEventComm::PushSupplier
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ /**
+ * The experiment ID is used to configure the supplier ID on the
+ * publication.
+ */
+ ECFS_Supplier (CORBA::Long experiment_id);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ void push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ //@{
+ /** @name The RtecEventComm::PushSupplier methods
+ */
+ virtual void disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// The experiment id
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The experiment id
+ CORBA::Long experiment_id_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_;
+};
+
+#endif /* ECFS_SUPPLIER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp
new file mode 100644
index 00000000000..870ca870754
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp
@@ -0,0 +1,185 @@
+/**
+ * @file client.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Peer.h"
+#include "Servant_var.h"
+
+#include "orbsvcs/Event/EC_Default_Factory.h"
+
+#include "ace/Get_Opt.h"
+#include "ace/Sched_Params.h"
+
+ACE_RCSID(EC_Federated_Scalability, client, "$Id$")
+
+const char *ior = "file://server.ior";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "k:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'k':
+ ior = get_opts.optarg;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-k <IOR> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ TAO_EC_Default_Factory::init_svcs ();
+
+ int priority =
+ (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
+ priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
+ priority);
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+
+ if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
+ priority,
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "client (%P|%t): user is not superuser, "
+ "test runs in time-shared class\n"));
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ "client (%P|%t): sched_params failed\n"));
+ }
+
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ // Get the event channel object reference
+ CORBA::Object_var coordinator_object =
+ orb->string_to_object (ior, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ Control::Coordinator_var coordinator =
+ Control::Coordinator::_narrow (coordinator_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (coordinator.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) Invalid or nil coordinator\n"));
+ return 1;
+ }
+
+ CORBA::Object_var manager_object =
+ orb->resolve_initial_references ("ORBPolicyManager",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::PolicyManager_var policy_manager =
+ CORBA::PolicyManager::_narrow (manager_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Any sync_scope;
+ sync_scope <<= Messaging::SYNC_WITH_TARGET;
+
+ CORBA::PolicyList policy_list (1);
+ policy_list.length (1);
+ policy_list[0] =
+ orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
+ sync_scope,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ policy_manager->set_policy_overrides (policy_list,
+ CORBA::SET_OVERRIDE,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (root_poa.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Panic: nil RootPOA\n"),
+ 1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ Servant_var<ECFS_Peer> peer_impl (new ECFS_Peer (orb.in ()));
+
+ peer_impl->init (root_poa.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ Control::Peer_var peer (peer_impl->_this (ACE_TRY_ENV));
+ ACE_TRY_CHECK;
+
+ coordinator->join (peer.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n"));
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class Servant_var<ECFS_Peer>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate Servant_var<ECFS_Peer>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp
new file mode 100644
index 00000000000..5f8dd6cacc7
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp
@@ -0,0 +1,208 @@
+/**
+ * @file server.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+
+#include "Coordinator.h"
+#include "Servant_var.h"
+
+#include "ace/Get_Opt.h"
+#include "ace/Auto_Ptr.h"
+#include "ace/Sched_Params.h"
+
+ACE_RCSID(EC_Federated_Scalability, server, "$Id$")
+
+const char *ior_output_file = "server.ior";
+int peer_count = 3;
+int iterations = 10000;
+int consumer_count = 10;
+int do_dump_history = 0;
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "o:p:c:i:h");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.optarg;
+ break;
+
+ case 'p':
+ peer_count = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'c':
+ consumer_count = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'i':
+ iterations = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'h':
+ do_dump_history = 1;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile> "
+ "-p <peer_count> "
+ "-i <iterations> "
+ "-c <consumer_count> "
+ "-h (dump test history) "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ int priority =
+ (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
+ priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
+ priority);
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+
+ if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
+ priority,
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "server (%P|%t): user is not superuser, "
+ "test runs in time-shared class\n"));
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ "server (%P|%t): sched_params failed\n"));
+ }
+
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ CORBA::Object_var manager_object =
+ orb->resolve_initial_references ("ORBPolicyManager",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::PolicyManager_var policy_manager =
+ CORBA::PolicyManager::_narrow (manager_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Any sync_scope;
+ sync_scope <<= Messaging::SYNC_WITH_TARGET;
+
+ CORBA::PolicyList policy_list (1);
+ policy_list.length (1);
+ policy_list[0] =
+ orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
+ sync_scope,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ policy_manager->set_policy_overrides (policy_list,
+ CORBA::SET_OVERRIDE,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (root_poa.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Panic: nil RootPOA\n"),
+ 1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ Servant_var<ECFS_Coordinator> coordinator_impl (
+ new ECFS_Coordinator (peer_count,
+ consumer_count,
+ iterations,
+ do_dump_history,
+ orb.in ())
+ );
+
+ Control::Coordinator_var coordinator =
+ coordinator_impl->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (coordinator.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Output the ior to the <ior_output_file>
+ FILE *output_file = ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n"));
+
+ root_poa->destroy (1, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class Servant_var<ECFS_Coordinator>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate Servant_var<ECFS_Coordinator>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp
new file mode 100644
index 00000000000..a4ded501b6b
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp
@@ -0,0 +1,33 @@
+/**
+ * @file Client_Task.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Client_Task.h"
+
+ACE_RCSID(EC_Scalability, Client_Task, "$Id$")
+
+ECS_Client_Task::ECS_Client_Task (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+int
+ECS_Client_Task::svc (void)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ this->orb_->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ return -1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp
new file mode 100644
index 00000000000..6134b826bf8
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp
@@ -0,0 +1,114 @@
+/**
+ * @file Consumer.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Consumer.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_Scalability, Consumer, "$Id$")
+
+ECS_Consumer::ECS_Consumer (int iterations)
+ : sample_history_ (iterations)
+{
+}
+
+void
+ECS_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ ec->for_consumers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+
+ this->proxy_supplier_ =
+ consumer_admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushConsumer_var consumer =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::ConsumerQOS consumer_qos;
+ consumer_qos.dependencies.length (2);
+ RtecEventComm::EventHeader& h0 =
+ consumer_qos.dependencies[0].event.header;
+ h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
+ h0.source = 1;
+
+ RtecEventComm::EventHeader& h1 =
+ consumer_qos.dependencies[1].event.header;
+ h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type
+ h1.source = ACE_ES_EVENT_SOURCE_ANY;
+
+ this->proxy_supplier_->connect_push_consumer (consumer.in (),
+ consumer_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECS_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_supplier_.in ()))
+ return;
+ proxy = this->proxy_supplier_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+ACE_Sample_History &
+ECS_Consumer::sample_history (void)
+{
+ return this->sample_history_;
+}
+
+void
+ECS_Consumer::push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_hrtime_t now = ACE_OS::gethrtime ();
+
+ ACE_hrtime_t creation;
+ ORBSVCS_Time::TimeT_to_hrtime (creation,
+ events[0].header.creation_time);
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->sample_history_.sample (now - creation);
+}
+
+void
+ECS_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_supplier_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h
new file mode 100644
index 00000000000..4bd7f38262e
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h
@@ -0,0 +1,65 @@
+/**
+ * @file Consumer.h
+ *
+ * $Id$
+ *
+ */
+
+#ifndef ECS_CONSUMER_H
+#define ECS_CONSUMER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "ace/Sample_History.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECS_Consumer
+ *
+ * @brief Implement a simple consumer to keep track of the latency
+ *
+ */
+class ECS_Consumer
+ : public virtual POA_RtecEventComm::PushConsumer
+ , public virtual PortableServer::RefCountServantBase
+
+{
+public:
+ /// Constructor
+ ECS_Consumer (int iterations);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ /// Access the history of samples
+ ACE_Sample_History &sample_history (void);
+
+ //@{
+ /** @name The RtecEventComm::PushConsumer methods
+ */
+ virtual void push (const RtecEventComm::EventSet& events,
+ CORBA::Environment &_env)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ virtual void disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_;
+
+ /// The history of latency samples
+ ACE_Sample_History sample_history_;
+};
+
+#endif /* ECS_CONSUMER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp
new file mode 100644
index 00000000000..61ff20dc369
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp
@@ -0,0 +1,103 @@
+/**
+ * @file Supplier.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Supplier.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+ACE_RCSID(EC_Scalability, Supplier, "$Id$")
+
+ECS_Supplier::ECS_Supplier (void)
+{
+}
+
+void
+ECS_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (!CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+
+ this->proxy_consumer_ =
+ supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ RtecEventComm::PushSupplier_var supplier =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RtecEventChannelAdmin::SupplierQOS supplier_qos;
+ supplier_qos.publications.length (1);
+ RtecEventComm::EventHeader& sh0 =
+ supplier_qos.publications[0].event.header;
+ sh0.type = ACE_ES_EVENT_UNDEFINED; // first free event type
+ sh0.source = 1; // first free event source
+
+ this->proxy_consumer_->connect_push_supplier (supplier.in (),
+ supplier_qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECS_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_._retn ();
+ }
+
+ ACE_TRY
+ {
+ proxy->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY {} ACE_ENDTRY;
+
+ PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id = poa->servant_to_id (this,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+ECS_Supplier::push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (CORBA::is_nil (this->proxy_consumer_.in ()))
+ return;
+ proxy = this->proxy_consumer_;
+ }
+ proxy->push (events, ACE_TRY_ENV);
+
+}
+
+void
+ECS_Supplier::disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->proxy_consumer_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+}
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h
new file mode 100644
index 00000000000..6e0fd8b780a
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h
@@ -0,0 +1,60 @@
+/**
+ * @file Supplier.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ *
+ */
+
+#ifndef ECS_SUPPLIER_H
+#define ECS_SUPPLIER_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminC.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/**
+ * @class ECS_Supplier
+ *
+ * @brief Implement a simple supplier to keep track of the latency
+ *
+ */
+class ECS_Supplier
+ : public virtual POA_RtecEventComm::PushSupplier
+ , public virtual PortableServer::RefCountServantBase
+{
+public:
+ /// Constructor
+ ECS_Supplier (void);
+
+ /// Connect to the event channel
+ void connect (RtecEventChannelAdmin::EventChannel_ptr ec,
+ CORBA::Environment &ACE_TRY_ENV);
+
+ /// Disconnect from the event channel
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ void push (const RtecEventComm::EventSet &events,
+ CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ //@{
+ /** @name The RtecEventComm::PushSupplier methods
+ */
+ virtual void disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ //@}
+
+private:
+ /// Synchronize access to the internal data
+ TAO_SYNCH_MUTEX mutex_;
+
+ /// The proxy this object is connected to
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_;
+};
+
+#endif /* ECS_SUPPLIER_H */
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp
new file mode 100644
index 00000000000..c95f50fc5d0
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp
@@ -0,0 +1,286 @@
+/**
+ * @file client.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "Consumer.h"
+#include "Supplier.h"
+#include "Client_Task.h"
+
+#include "orbsvcs/Event_Service_Constants.h"
+#include "tao/Messaging.h"
+#include "tao/PortableServer/PortableServer.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "ace/Get_Opt.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Sample_History.h"
+#include "ace/Basic_Stats.h"
+#include "ace/Stats.h"
+#include "ace/Sched_Params.h"
+#include "ace/Task.h"
+
+ACE_RCSID(EC_Scalability, client, "$Id$")
+
+const char *ior = "file://test.ior";
+int consumer_count = 10;
+int iterations = 10000;
+int do_dump_history = 0;
+
+/// Parse the arguments.
+static int parse_args (int argc, char *argv[]);
+
+int
+main (int argc, char *argv [])
+{
+ int priority =
+ (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+
+ if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
+ priority,
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "server (%P|%t): user is not superuser, "
+ "test runs in time-shared class\n"));
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ "server (%P|%t): sched_params failed\n"));
+ }
+
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var manager_object =
+ orb->resolve_initial_references ("ORBPolicyManager",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::PolicyManager_var policy_manager =
+ CORBA::PolicyManager::_narrow (manager_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Any sync_scope;
+ sync_scope <<= Messaging::SYNC_WITH_TARGET;
+
+ CORBA::PolicyList policy_list (1);
+ policy_list.length (1);
+ policy_list[0] =
+ orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
+ sync_scope,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ policy_manager->set_policy_overrides (policy_list,
+ CORBA::SET_OVERRIDE,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to initialize the POA.\n"),
+ 1);
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ // Get the event channel object reference
+ CORBA::Object_var ec_object =
+ orb->string_to_object (ior, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ RtecEventChannelAdmin::EventChannel_var ec =
+ RtecEventChannelAdmin::EventChannel::_narrow (ec_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ if (CORBA::is_nil (ec.in ()))
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) Invalid or nil event channel\n"));
+ return 1;
+ }
+
+ ECS_Consumer **consumer_impl;
+ ACE_NEW_RETURN (consumer_impl,
+ ECS_Consumer*[consumer_count],
+ 1);
+ int i;
+ for (i = 0; i != consumer_count; ++i)
+ {
+ ACE_NEW_RETURN (consumer_impl[i],
+ ECS_Consumer (iterations),
+ 1);
+ consumer_impl[i]->connect (ec.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ ECS_Supplier *supplier_impl;
+ ACE_NEW_RETURN (supplier_impl,
+ ECS_Supplier,
+ 1);
+ PortableServer::ServantBase_var supplier_owner (supplier_impl);
+
+ supplier_impl->connect (ec.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "Connected consumer & supplier\n"));
+
+ ECS_Client_Task task (orb.in ());
+ task.activate ();
+
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.type = ACE_ES_EVENT_UNDEFINED;
+ event[0].header.source = 1;
+ event[0].header.ttl = 1;
+ // event[0].data.payload.length(1024);
+
+ ACE_hrtime_t start = ACE_OS::gethrtime ();
+ for (i = 0; i != iterations; ++i)
+ {
+ ACE_hrtime_t creation = ACE_OS::gethrtime ();
+ ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
+ creation);
+ // push one event...
+ supplier_impl->push (event, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_hrtime_t end = ACE_OS::gethrtime ();
+
+ for (i = 0; i != consumer_count; ++i)
+ {
+ consumer_impl[i]->disconnect (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ supplier_impl->disconnect (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Calibrate the high resolution timer *before* starting the
+ // test.
+ ACE_DEBUG ((LM_DEBUG, "Calibrating high res timer ...."));
+ ACE_High_Res_Timer::calibrate ();
+
+ ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+ ACE_DEBUG ((LM_DEBUG, "Done (%d)\n", gsf));
+
+ ACE_Sample_History history (iterations);
+ for (int j = 0; j != iterations; ++j)
+ {
+ ACE_UINT64 value = 0;
+ for (int i = 0; i != consumer_count; ++i)
+ {
+ ACE_Sample_History &consumer_history =
+ consumer_impl[i]->sample_history ();
+
+ ACE_UINT64 consumer_sample =
+ consumer_history.get_sample (j);
+ if (consumer_sample > value)
+ value = consumer_sample;
+ }
+ history.sample (value);
+ }
+
+ if (do_dump_history)
+ {
+ history.dump_samples ("HISTORY", gsf);
+ }
+
+ ACE_Basic_Stats stats;
+ history.collect_basic_stats (stats);
+ stats.dump_results ("Total", gsf);
+
+ ACE_Throughput_Stats::dump_throughput ("Total", gsf,
+ end - start,
+ stats.samples_count ());
+
+
+ orb->shutdown (0, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::Exception, ex)
+ {
+ ACE_PRINT_EXCEPTION (ex, argv[0]);
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+// ****************************************************************
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "hc:i:k:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'h':
+ do_dump_history = 1;
+ break;
+
+ case 'c':
+ consumer_count = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'i':
+ iterations = ACE_OS::atoi (get_opts.optarg);
+ break;
+
+ case 'k':
+ ior = get_opts.optarg;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-h (dump full sample history) "
+ "-c <consumer_count> "
+ "-i <iterations> "
+ "-k <IOR> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp
new file mode 100644
index 00000000000..9145637d711
--- /dev/null
+++ b/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp
@@ -0,0 +1,182 @@
+/**
+ * @file server.cpp
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan <coryan@uci.edu>
+ */
+
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+
+#include "ace/Get_Opt.h"
+#include "ace/Auto_Ptr.h"
+#include "ace/Sched_Params.h"
+
+ACE_RCSID(EC_Scalability, server, "$Id$")
+
+const char *ior_output_file = "test.ior";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "o:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.optarg;
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile>"
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int main (int argc, char *argv[])
+{
+ TAO_EC_Default_Factory::init_svcs ();
+
+ int priority =
+ (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
+ + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
+ priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
+ priority);
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+
+ if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
+ priority,
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "server (%P|%t): user is not superuser, "
+ "test runs in time-shared class\n"));
+ }
+ else
+ ACE_ERROR ((LM_ERROR,
+ "server (%P|%t): sched_params failed\n"));
+ }
+
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var manager_object =
+ orb->resolve_initial_references ("ORBPolicyManager",
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::PolicyManager_var policy_manager =
+ CORBA::PolicyManager::_narrow (manager_object.in (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Any sync_scope;
+ sync_scope <<= Messaging::SYNC_WITH_TARGET;
+
+ CORBA::PolicyList policy_list (1);
+ policy_list.length (1);
+ policy_list[0] =
+ orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
+ sync_scope,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ policy_manager->set_policy_overrides (policy_list,
+ CORBA::SET_OVERRIDE,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (root_poa.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Panic: nil RootPOA\n"),
+ 1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ TAO_EC_Event_Channel_Attributes attr (root_poa.in (),
+ root_poa.in ());
+ TAO_EC_Event_Channel* ec_impl;
+ ACE_NEW_RETURN (ec_impl,
+ TAO_EC_Event_Channel (attr),
+ 1);
+ PortableServer::ServantBase_var ec_owner (ec_impl);
+
+ ec_impl->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ RtecEventChannelAdmin::EventChannel_var ec =
+ ec_impl->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (ec.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Output the ior to the <ior_output_file>
+ FILE *output_file = ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n"));
+
+ root_poa->destroy (1, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */