diff options
author | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-12-07 18:10:50 +0000 |
---|---|---|
committer | nobody <nobody@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-12-07 18:10:50 +0000 |
commit | 358bfa49cfe63a6f3032cd566afe8045e430be55 (patch) | |
tree | 9d68773251af989328c473c17ec5cc6beeafbf27 | |
parent | f22d093304909515f3c74d2e6b46797c8b46c5a7 (diff) | |
download | ATCD-358bfa49cfe63a6f3032cd566afe8045e430be55.tar.gz |
This commit was manufactured by cvs2svn to create branch
'corba-env-clean'.
24 files changed, 2628 insertions, 0 deletions
diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp new file mode 100644 index 00000000000..147a3c4152c --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.cpp @@ -0,0 +1,122 @@ +/** + * @file Consumer.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Consumer.h" +#include "ECFS_Configuration.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID(EC_Federated_Scalability, Consumer, "$Id") + +ECFS_Consumer:: + ECFS_Consumer (CORBA::Long experiment_id, + CORBA::ULong iterations) + : experiment_id_ (experiment_id) + , samples_ (iterations) +{ +} + +void +ECFS_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_supplier_.in ())) + return; + + this->proxy_supplier_ = + consumer_admin->obtain_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushConsumer_var consumer = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::ConsumerQOS consumer_qos; + consumer_qos.is_gateway = 0; + consumer_qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + consumer_qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = 1; + + RtecEventComm::EventHeader& h1 = + consumer_qos.dependencies[1].event.header; + h1.type = ECFS_RESPONSE_EVENT_TYPE; + h1.source = this->experiment_id_; + + this->proxy_supplier_->connect_push_consumer (consumer.in (), + consumer_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushSupplier_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_supplier_.in ())) + return; + proxy = this->proxy_supplier_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_supplier (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +const Control::Samples & +ECFS_Consumer::samples (void) const +{ + return this->samples_; +} + +void +ECFS_Consumer::push (const RtecEventComm::EventSet &events, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // ACE_DEBUG ((LM_DEBUG, "Consumer::push (%P|%t)\n")); + ACE_hrtime_t now = ACE_OS::gethrtime (); + + ACE_hrtime_t creation; + ORBSVCS_Time::TimeT_to_hrtime (creation, + events[0].header.creation_time); + + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + CORBA::ULong l = this->samples_.length (); + this->samples_.length (l + 1); + this->samples_[l] = now - creation; +} + +void +ECFS_Consumer::disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_supplier_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h new file mode 100644 index 00000000000..cd1d7428edc --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Consumer.h @@ -0,0 +1,69 @@ +/** + * @file Consumer.h + * + * $Id$ + * + */ + +#ifndef ECFS_CONSUMER_H +#define ECFS_CONSUMER_H + +#include "ControlC.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Consumer + * + * @brief Implement a simple consumer to keep track of the latency + * + */ +class ECFS_Consumer + : public virtual POA_RtecEventComm::PushConsumer + , public virtual PortableServer::RefCountServantBase + +{ +public: + /// Constructor + ECFS_Consumer (CORBA::Long experiment_id, + CORBA::ULong iterations); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + /// Get the samples + const Control::Samples &samples (void) const; + + //@{ + /** @name The RtecEventComm::PushConsumer methods + */ + virtual void push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The experiment id + CORBA::Long experiment_id_; + + /// The samples recorded so far + Control::Samples samples_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_; +}; + +#endif /* ECFS_CONSUMER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp new file mode 100644 index 00000000000..41ac5aa08cf --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.cpp @@ -0,0 +1,165 @@ +/** + * @file Coordinator.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Coordinator.h" + +#include "ace/High_Res_Timer.h" +#include "ace/Sample_History.h" +#include "ace/Basic_Stats.h" + +ACE_RCSID(EC_Federated_Scalability, Coordinator, "$Id$") + +ECFS_Coordinator::ECFS_Coordinator (int peers_expected, + int consumer_count, + int iterations, + int do_dump_history, + CORBA::ORB_ptr orb) + : peers_expected_ (peers_expected) + , consumer_count_ (consumer_count) + , iterations_ (iterations) + , do_dump_history_ (do_dump_history) + , orb_ (CORBA::ORB::_duplicate (orb)) + , peers_count_ (0) + , peers_ (0) +{ + ACE_NEW (peers_, Control::Peer_var[this->peers_expected_]); +} + +ECFS_Coordinator::~ECFS_Coordinator (void) +{ + delete[] this->peers_; +} + +void +ECFS_Coordinator::join (Control::Peer_ptr peer, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (this->peers_count_ == this->peers_expected_) + return; + + this->peers_[this->peers_count_++] = + Control::Peer::_duplicate (peer); + + if (this->peers_count_ < this->peers_expected_) + return; + } + + ACE_DEBUG ((LM_DEBUG, + "Coordinator (%P|%t) Building the federation\n")); + /// Build the EC federation + size_t i; + for (i = 0; i != this->peers_count_; ++i) + { + RtecEventChannelAdmin::EventChannel_var channel = + this->peers_[i]->channel (ACE_TRY_ENV); + ACE_CHECK; + + for (size_t j = 0; j != this->peers_count_; ++j) + { + if (i != j) + { + this->peers_[j]->connect (channel.in (), ACE_TRY_ENV); + ACE_CHECK; + } + } + } + + Control::Loopback_var *loopbacks; + ACE_NEW (loopbacks, Control::Loopback_var[this->peers_count_]); + + /// Run the tests + for (i = 0; i != 1; ++i) + { + CORBA::Long experiment_id = 128 + i; + + size_t lcount = 0; + + size_t j; + for (j = 0; j != this->peers_count_; ++j) + { + if (j != i) + { + loopbacks[lcount++] = + this->peers_[j]->setup_loopback (experiment_id, + ACE_TRY_ENV); + ACE_CHECK; + } + } + + for (int c = 5; c != 105; c += 5) + { + ACE_DEBUG ((LM_DEBUG, + "Coordinator (%P|%t) " + "Starting (%T) test for %d consumer\n", + c)); + CORBA::Long gsf; + Control::Samples_var samples = + this->peers_[i]->run_experiment (c, + experiment_id, + this->iterations_, + gsf, + ACE_TRY_ENV); + ACE_CHECK; + + ACE_Sample_History history (samples->length ()); + + + char filename[1024]; + ACE_OS::sprintf (filename, + "ec_federated_scalability.%d.log", + c); + FILE *output_file = ACE_OS::fopen (filename, "w"); + if (output_file == 0) + { + ACE_ERROR ((LM_ERROR, + "Cannot open output file %s", + filename)); + } + else + { + for (CORBA::ULong k = 0; k != samples->length (); ++k) + { + history.sample (samples[k]); + ACE_OS::fprintf (output_file, + "HISTO: %d " ACE_UINT64_FORMAT_SPECIFIER "\n", + k, samples[k] / gsf); + } + ACE_OS::fclose (output_file); + } + + ACE_Basic_Stats stats; + history.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + + // if (this->do_dump_history_) + // { + // history.dump_samples ("HISTORY", gsf); + // } + } + + for (j = 0; j != lcount; ++j) + { + loopbacks[j]->destroy (ACE_TRY_ENV); + ACE_CHECK; + } + + + } + + for (i = 0; i != this->peers_count_; ++i) + { + this->peers_[i]->shutdown (ACE_TRY_ENV); + ACE_CHECK; + } + + this->orb_->shutdown (0, ACE_TRY_ENV); + ACE_CHECK; +} diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h new file mode 100644 index 00000000000..a59e60a69d8 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Coordinator.h @@ -0,0 +1,73 @@ +/** + * @file Coordinator.h + * + * $Id$ + * + */ + +#ifndef ECFS_COORDINATOR_H +#define ECFS_COORDINATOR_H + +#include "ControlS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Coordinator + * + * @brief Implement the Control::Coordinator interface + * + */ +class ECFS_Coordinator + : public virtual POA_Control::Coordinator + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + ECFS_Coordinator (int peers_expected, + int consumer_count, + int iterations, + int do_dump_history, + CORBA::ORB_ptr orb); + + /// Destructor + virtual ~ECFS_Coordinator (void); + + //@{ + /** @name The Control::Coordinator methods + */ + virtual void join (Control::Peer_ptr peer, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// Number of peers expected + size_t peers_expected_; + + /// Number of consumers to run on each test + size_t consumer_count_; + + /// Number of iterations on each test + int iterations_; + + /// This flag is set to 1 to dump the complete test history + int do_dump_history_; + + /// Keep a reference to the ORB, used in shutdown + CORBA::ORB_var orb_; + + /// Current number of peers + size_t peers_count_; + + /// Peer collection + Control::Peer_var *peers_; + +}; + +#endif /* ECFS_COORDINATOR_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp new file mode 100644 index 00000000000..b314b80feeb --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.cpp @@ -0,0 +1,72 @@ +/** + * @file Loopback.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Loopback.h" +#include "ECFS_Configuration.h" + +ACE_RCSID(EC_Federated_Scalability, Loopback, "$Id") + +ECFS_Loopback::ECFS_Loopback (void) +{ +} + +void +ECFS_Loopback::init (CORBA::Long experiment_id, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + this->supplier_ = + Servant_var<ECFS_Loopback_Supplier> ( + new ECFS_Loopback_Supplier (experiment_id) + ); + this->supplier_->connect (ec, ACE_TRY_ENV); + ACE_CHECK; + + this->consumer_ = + Servant_var<ECFS_Loopback_Consumer> ( + new ECFS_Loopback_Consumer (experiment_id, + this->supplier_.in ()) + ); + this->consumer_->connect (ec, ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Loopback::destroy (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_TRY + { + this->consumer_->disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + + this->supplier_->disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class Servant_var<ECFS_Loopback_Consumer>; +template class Servant_var<ECFS_Loopback_Supplier>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate Servant_var<ECFS_Loopback_Consumer> +#pragma instantiate Servant_var<ECFS_Loopback_Supplier> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h new file mode 100644 index 00000000000..32ff0f92c8b --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback.h @@ -0,0 +1,54 @@ +/** + * @file Loopback.h + * + * $Id$ + * + */ + +#ifndef ECFS_LOOPBACK_H +#define ECFS_LOOPBACK_H + +#include "ControlS.h" +#include "Servant_var.h" +#include "Loopback_Consumer.h" +#include "Loopback_Supplier.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Loopback + * + * @brief Implement the Control::Loopback interface + * + */ +class ECFS_Loopback + : public virtual POA_Control::Loopback + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + ECFS_Loopback (void); + + /// Initialize the loopback + void init (CORBA::Long experiment_id, + RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + //@{ + /** @name The Control::Loopback methods + */ + virtual void destroy (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Keep a reference to the loopback consumer + Servant_var<ECFS_Loopback_Consumer> consumer_; + + /// Keep a reference to the loopback supplier + Servant_var<ECFS_Loopback_Supplier> supplier_; +}; + +#endif /* ECFS_LOOPBACK_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp new file mode 100644 index 00000000000..1d6451510d7 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.cpp @@ -0,0 +1,108 @@ +/** + * @file Loopback_Consumer.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Loopback_Consumer.h" +#include "ECFS_Configuration.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID(EC_Federated_Scalability, Loopback_Consumer, "$Id$") + +ECFS_Loopback_Consumer:: + ECFS_Loopback_Consumer (CORBA::Long experiment_id, + ECFS_Loopback_Supplier *supplier) + : experiment_id_ (experiment_id) + , supplier_ (supplier) +{ + this->supplier_->_add_ref (); +} + +void +ECFS_Loopback_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_supplier_.in ())) + return; + + this->proxy_supplier_ = + consumer_admin->obtain_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushConsumer_var consumer = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::ConsumerQOS consumer_qos; + consumer_qos.is_gateway = 0; + consumer_qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + consumer_qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = 1; + + RtecEventComm::EventHeader& h1 = + consumer_qos.dependencies[1].event.header; + h1.type = ECFS_START_EVENT_TYPE; + h1.source = this->experiment_id_; + + this->proxy_supplier_->connect_push_consumer (consumer.in (), + consumer_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Loopback_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushSupplier_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_supplier_.in ())) + return; + proxy = this->proxy_supplier_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_supplier (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Loopback_Consumer::push (const RtecEventComm::EventSet &events, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // ACE_DEBUG ((LM_DEBUG, "Loopback_Consumer::push (%P|%t)\n")); + this->supplier_->push (events, ACE_TRY_ENV); +} + +void +ECFS_Loopback_Consumer::disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_supplier_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h new file mode 100644 index 00000000000..50a8ec67e24 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Consumer.h @@ -0,0 +1,67 @@ +/** + * @file Loopback_Consumer.h + * + * $Id$ + * + */ + +#ifndef ECFS_LOOPBACK_CONSUMER_H +#define ECFS_LOOPBACK_CONSUMER_H + +#include "Loopback_Supplier.h" +#include "Servant_var.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Loopback_Consumer + * + * @brief Implement a simple consumer to keep track of the latency + * + */ +class ECFS_Loopback_Consumer + : public virtual POA_RtecEventComm::PushConsumer + , public virtual PortableServer::RefCountServantBase + +{ +public: + /// Constructor + ECFS_Loopback_Consumer (CORBA::Long experiment_id, + ECFS_Loopback_Supplier *supplier); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + //@{ + /** @name The RtecEventComm::PushConsumer methods + */ + virtual void push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The experiment id + CORBA::Long experiment_id_; + + /// The supplier used to close the loopback + Servant_var<ECFS_Loopback_Supplier> supplier_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_; +}; + +#endif /* ECFS_LOOPBACK_CONSUMER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp new file mode 100644 index 00000000000..5259385fa1f --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.cpp @@ -0,0 +1,114 @@ +/** + * @file Loopback_Supplier.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Loopback_Supplier.h" +#include "ECFS_Configuration.h" + +ACE_RCSID(EC_Federated_Scalability, Loopback_Supplier, "$Id$") + +ECFS_Loopback_Supplier::ECFS_Loopback_Supplier (CORBA::Long experiment_id) + : experiment_id_ (experiment_id) +{ +} + +void +ECFS_Loopback_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_consumer_.in ())) + return; + + this->proxy_consumer_ = + supplier_admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushSupplier_var supplier = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::SupplierQOS supplier_qos; + supplier_qos.is_gateway = 0; + supplier_qos.publications.length (1); + RtecEventComm::EventHeader& sh0 = + supplier_qos.publications[0].event.header; + sh0.type = ECFS_RESPONSE_EVENT_TYPE; + sh0.source = this->experiment_id_; + + this->proxy_consumer_->connect_push_supplier (supplier.in (), + supplier_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Loopback_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_consumer (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Loopback_Supplier::push (const RtecEventComm::EventSet &source, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_; + } + + // ACE_DEBUG ((LM_DEBUG, "Loopback_Supplier::push (%P|%t)\n")); + RtecEventComm::EventSet events (source); + for (CORBA::ULong i = 0; i != events.length (); ++i) + { + events[i].header.ttl = 1; + events[i].header.type = ECFS_RESPONSE_EVENT_TYPE; + events[i].header.source = this->experiment_id_; + } + + proxy->push (events, ACE_TRY_ENV); +} + +void +ECFS_Loopback_Supplier::disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_consumer_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h new file mode 100644 index 00000000000..eb40fe10eda --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Loopback_Supplier.h @@ -0,0 +1,67 @@ +/** + * @file Loopback_Supplier.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#ifndef ECFS_LOOPBACK_SUPPLIER_H +#define ECFS_LOOPBACK_SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Loopback_Supplier + * + * @brief Implement a simple supplier to keep track of the latency + * + */ +class ECFS_Loopback_Supplier + : public virtual POA_RtecEventComm::PushSupplier + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + /** + * The experiment ID is used to configure the supplier ID on the + * publication. + */ + ECFS_Loopback_Supplier (CORBA::Long experiment_id); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + void push (const RtecEventComm::EventSet &events, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@{ + /** @name The RtecEventComm::PushSupplier methods + */ + virtual void disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// The experiment id + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The experiment id + CORBA::Long experiment_id_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_; +}; + +#endif /* ECFS_LOOPBACK_SUPPLIER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp new file mode 100644 index 00000000000..0344c4c2c00 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.cpp @@ -0,0 +1,201 @@ +/** + * @file Peer.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Peer.h" +#include "Servant_var.h" +#include "Consumer.h" +#include "Supplier.h" +#include "Loopback.h" +#include "ECFS_Configuration.h" + +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Gateway.h" + +#include "ace/High_Res_Timer.h" + +ACE_RCSID(EC_Federated_Scalability, Peer, "$Id$") + +ECFS_Peer::ECFS_Peer (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +ECFS_Peer::~ECFS_Peer (void) +{ +} + +void +ECFS_Peer::init (PortableServer::POA_ptr root_poa, + CORBA::Environment &ACE_TRY_ENV) +{ + TAO_EC_Event_Channel_Attributes attr (root_poa, root_poa); + Servant_var<TAO_EC_Event_Channel> ec_impl ( + new TAO_EC_Event_Channel (attr) + ); + + ec_impl->activate (ACE_TRY_ENV); + ACE_CHECK; + + this->event_channel_ = + ec_impl->_this (ACE_TRY_ENV); + ACE_CHECK; +} + +RtecEventChannelAdmin::EventChannel_ptr +ECFS_Peer::channel (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return RtecEventChannelAdmin::EventChannel::_duplicate (this->event_channel_.in ()); +} + +void +ECFS_Peer::connect (RtecEventChannelAdmin::EventChannel_ptr remote_ec, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // ACE_DEBUG ((LM_DEBUG, "(%P|%t) Connecting....\n")); + Servant_var<TAO_EC_Gateway_IIOP> gateway (new TAO_EC_Gateway_IIOP); + gateway->init (remote_ec, + this->event_channel_.in (), + ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::Observer_var observer = + gateway->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::Observer_Handle h = + this->event_channel_->append_observer (observer.in (), + ACE_TRY_ENV); + ACE_CHECK; + + gateway->observer_handle (h); +} + +Control::Loopback_ptr +ECFS_Peer::setup_loopback (CORBA::Long experiment_id, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + Servant_var<ECFS_Loopback> loopback (new ECFS_Loopback); + + loopback->init (experiment_id, + this->event_channel_.in (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (Control::Loopback::_nil ()); + + return loopback->_this (ACE_TRY_ENV); +} + +Control::Samples * +ECFS_Peer::run_experiment (CORBA::Long consumer_count, + CORBA::Long experiment_id, + CORBA::Long iterations, + CORBA::Long_out gsf, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + Servant_var<ECFS_Consumer> *consumer; + ACE_NEW_THROW_EX (consumer, + Servant_var<ECFS_Consumer>[consumer_count], + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (0); + int i; + for (i = 0; i != consumer_count; ++i) + { + consumer[i] = + Servant_var<ECFS_Consumer> (new ECFS_Consumer (experiment_id, + iterations)); + consumer[i]->connect (this->event_channel_.in (), ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + } + + Servant_var<ECFS_Supplier> supplier ( + new ECFS_Supplier (experiment_id) + ); + + supplier->connect (this->event_channel_.in (), ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + // ACE_DEBUG ((LM_DEBUG, "Connected consumer & supplier\n")); + + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ECFS_START_EVENT_TYPE; + event[0].header.source = experiment_id; + event[0].header.ttl = 1; + + for (i = 0; i != iterations; ++i) + { + ACE_hrtime_t creation = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, + creation); + // push one event... + supplier->push (event, ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + } + + supplier->disconnect (ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + Control::Samples_var samples (new Control::Samples (iterations)); + samples->length (iterations); + for (int j = 0; j != iterations; ++j) + samples[j] = 0; + + for (i = 0; i != consumer_count; ++i) + { + for (int j = 0; j != iterations; ++j) + { + CORBA::ULongLong sample = + consumer[i]->samples ()[j]; + if (samples[j] < sample) + samples[j] = sample; + } + consumer[i]->disconnect (ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + } + + ACE_DEBUG ((LM_DEBUG, "Calibrating high res timer ....")); + ACE_High_Res_Timer::calibrate (); + gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "Done (%d)\n", gsf)); + + return samples._retn (); +} + +void +ECFS_Peer::shutdown (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->event_channel_->destroy (ACE_TRY_ENV); + ACE_CHECK; + + this->orb_->shutdown (0, ACE_TRY_ENV); + ACE_CHECK; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class Servant_var<TAO_EC_Event_Channel>; +template class Servant_var<TAO_EC_Gateway_IIOP>; +template class Servant_var<ECFS_Consumer>; +template class Servant_var<ECFS_Supplier>; +template class Servant_var<ECFS_Loopback>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate Servant_var<TAO_EC_Event_Channel> +#pragma instantiate Servant_var<TAO_EC_Gateway_IIOP> +#pragma instantiate Servant_var<ECFS_Consumer> +#pragma instantiate Servant_var<ECFS_Supplier> +#pragma instantiate Servant_var<ECFS_Loopback> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h new file mode 100644 index 00000000000..7efdc6bd39d --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Peer.h @@ -0,0 +1,75 @@ +/** + * @file Peer.h + * + * $Id$ + * + */ + +#ifndef ECFS_PEER_H +#define ECFS_PEER_H + +#include "ControlS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Peer + * + * @brief Implement the Control::Peer interface + * + */ +class ECFS_Peer + : public virtual POA_Control::Peer + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + ECFS_Peer (CORBA::ORB_ptr orb); + + /// Destructor + virtual ~ECFS_Peer (void); + + /// Initialize the peer + void init (PortableServer::POA_ptr poa, + CORBA::Environment &ACE_TRY_ENV); + + //@{ + /** @name The Control::Peer methods + */ + virtual RtecEventChannelAdmin::EventChannel_ptr + channel (CORBA::Environment&) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void connect (RtecEventChannelAdmin::EventChannel_ptr remote_ec, + CORBA::Environment&) + ACE_THROW_SPEC ((CORBA::SystemException)); + + Control::Loopback_ptr setup_loopback (CORBA::Long experiment_id, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual Control::Samples* run_experiment (CORBA::Long consumer_count, + CORBA::Long experiment_id, + CORBA::Long iterations, + CORBA::Long_out gsf, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void shutdown (CORBA::Environment&) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// Keep a reference to the ORB, used in shutdown + CORBA::ORB_var orb_; + + /// Event Channel references + RtecEventChannelAdmin::EventChannel_var event_channel_; +}; + +#endif /* ECFS_PEER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp new file mode 100644 index 00000000000..d5ca3e9d36a --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Servant_var.cpp @@ -0,0 +1,33 @@ +/** + * @file Servant_var.cpp + * + * $Id$ + * + * @author Jody Hagins <jody@atdesk.com> + * @author Carlos O'Ryan <coryan@uci.edu> + */ +#ifndef CS_SERVANT_VAR_CPP +#define CS_SERVANT_VAR_CPP + +#include "Servant_var.h" + +#if !defined(__ACE_INLINE__) +#include "Servant_var.inl" +#endif /* __ACE_INLINE__ */ + +template<typename SERVANT> +Servant_var<SERVANT>::~Servant_var () +{ + if (this->ptr_ == 0) + return; + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY { + this->ptr_->_remove_ref (ACE_TRY_ENV); + ACE_TRY_CHECK; + } ACE_CATCHANY { + // @@ This event should be logged... + } ACE_ENDTRY; +} + +#endif /* CS_SERVANT_VAR_CPP */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp new file mode 100644 index 00000000000..08cc54f957e --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.cpp @@ -0,0 +1,105 @@ +/** + * @file Supplier.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Supplier.h" +#include "ECFS_Configuration.h" + +ACE_RCSID(EC_Federated_Scalability, Supplier, "$Id$") + +ECFS_Supplier::ECFS_Supplier (CORBA::Long experiment_id) + : experiment_id_ (experiment_id) +{ +} + +void +ECFS_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_consumer_.in ())) + return; + + this->proxy_consumer_ = + supplier_admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushSupplier_var supplier = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::SupplierQOS supplier_qos; + supplier_qos.is_gateway = 0; + supplier_qos.publications.length (1); + RtecEventComm::EventHeader& sh0 = + supplier_qos.publications[0].event.header; + sh0.type = ECFS_START_EVENT_TYPE; + sh0.source = this->experiment_id_; + + this->proxy_consumer_->connect_push_supplier (supplier.in (), + supplier_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_consumer (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECFS_Supplier::push (const RtecEventComm::EventSet &events, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_; + } + + proxy->push (events, ACE_TRY_ENV); +} + +void +ECFS_Supplier::disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_consumer_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h new file mode 100644 index 00000000000..171b4ec715f --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/Supplier.h @@ -0,0 +1,67 @@ +/** + * @file Supplier.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#ifndef ECFS_SUPPLIER_H +#define ECFS_SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECFS_Supplier + * + * @brief Implement a simple supplier to keep track of the latency + * + */ +class ECFS_Supplier + : public virtual POA_RtecEventComm::PushSupplier + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + /** + * The experiment ID is used to configure the supplier ID on the + * publication. + */ + ECFS_Supplier (CORBA::Long experiment_id); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + void push (const RtecEventComm::EventSet &events, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@{ + /** @name The RtecEventComm::PushSupplier methods + */ + virtual void disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// The experiment id + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The experiment id + CORBA::Long experiment_id_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_; +}; + +#endif /* ECFS_SUPPLIER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp new file mode 100644 index 00000000000..870ca870754 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/client.cpp @@ -0,0 +1,185 @@ +/** + * @file client.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Peer.h" +#include "Servant_var.h" + +#include "orbsvcs/Event/EC_Default_Factory.h" + +#include "ace/Get_Opt.h" +#include "ace/Sched_Params.h" + +ACE_RCSID(EC_Federated_Scalability, client, "$Id$") + +const char *ior = "file://server.ior"; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "k:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.optarg; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <IOR> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + TAO_EC_Default_Factory::init_svcs (); + + int priority = + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; + priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, + priority); + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + { + ACE_DEBUG ((LM_DEBUG, + "client (%P|%t): user is not superuser, " + "test runs in time-shared class\n")); + } + else + ACE_ERROR ((LM_ERROR, + "client (%P|%t): sched_params failed\n")); + } + + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + // Get the event channel object reference + CORBA::Object_var coordinator_object = + orb->string_to_object (ior, ACE_TRY_ENV); + ACE_TRY_CHECK; + + Control::Coordinator_var coordinator = + Control::Coordinator::_narrow (coordinator_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (CORBA::is_nil (coordinator.in ())) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) Invalid or nil coordinator\n")); + return 1; + } + + CORBA::Object_var manager_object = + orb->resolve_initial_references ("ORBPolicyManager", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (manager_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Any sync_scope; + sync_scope <<= Messaging::SYNC_WITH_TARGET; + + CORBA::PolicyList policy_list (1); + policy_list.length (1); + policy_list[0] = + orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + sync_scope, + ACE_TRY_ENV); + ACE_TRY_CHECK; + policy_manager->set_policy_overrides (policy_list, + CORBA::SET_OVERRIDE, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (CORBA::is_nil (root_poa.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic: nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + Servant_var<ECFS_Peer> peer_impl (new ECFS_Peer (orb.in ())); + + peer_impl->init (root_poa.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + Control::Peer_var peer (peer_impl->_this (ACE_TRY_ENV)); + ACE_TRY_CHECK; + + coordinator->join (peer.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + orb->run (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n")); + + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class Servant_var<ECFS_Peer>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate Servant_var<ECFS_Peer> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp new file mode 100644 index 00000000000..5f8dd6cacc7 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Federated_Scalability/server.cpp @@ -0,0 +1,208 @@ +/** + * @file server.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + + +#include "Coordinator.h" +#include "Servant_var.h" + +#include "ace/Get_Opt.h" +#include "ace/Auto_Ptr.h" +#include "ace/Sched_Params.h" + +ACE_RCSID(EC_Federated_Scalability, server, "$Id$") + +const char *ior_output_file = "server.ior"; +int peer_count = 3; +int iterations = 10000; +int consumer_count = 10; +int do_dump_history = 0; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:p:c:i:h"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.optarg; + break; + + case 'p': + peer_count = ACE_OS::atoi (get_opts.optarg); + break; + + case 'c': + consumer_count = ACE_OS::atoi (get_opts.optarg); + break; + + case 'i': + iterations = ACE_OS::atoi (get_opts.optarg); + break; + + case 'h': + do_dump_history = 1; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> " + "-p <peer_count> " + "-i <iterations> " + "-c <consumer_count> " + "-h (dump test history) " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + int priority = + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; + priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, + priority); + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + { + ACE_DEBUG ((LM_DEBUG, + "server (%P|%t): user is not superuser, " + "test runs in time-shared class\n")); + } + else + ACE_ERROR ((LM_ERROR, + "server (%P|%t): sched_params failed\n")); + } + + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + CORBA::Object_var manager_object = + orb->resolve_initial_references ("ORBPolicyManager", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (manager_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Any sync_scope; + sync_scope <<= Messaging::SYNC_WITH_TARGET; + + CORBA::PolicyList policy_list (1); + policy_list.length (1); + policy_list[0] = + orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + sync_scope, + ACE_TRY_ENV); + ACE_TRY_CHECK; + policy_manager->set_policy_overrides (policy_list, + CORBA::SET_OVERRIDE, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (CORBA::is_nil (root_poa.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic: nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + Servant_var<ECFS_Coordinator> coordinator_impl ( + new ECFS_Coordinator (peer_count, + consumer_count, + iterations, + do_dump_history, + orb.in ()) + ); + + Control::Coordinator_var coordinator = + coordinator_impl->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (coordinator.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Output the ior to the <ior_output_file> + FILE *output_file = ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + orb->run (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n")); + + root_poa->destroy (1, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class Servant_var<ECFS_Coordinator>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate Servant_var<ECFS_Coordinator> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp new file mode 100644 index 00000000000..a4ded501b6b --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Client_Task.cpp @@ -0,0 +1,33 @@ +/** + * @file Client_Task.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Client_Task.h" + +ACE_RCSID(EC_Scalability, Client_Task, "$Id$") + +ECS_Client_Task::ECS_Client_Task (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +int +ECS_Client_Task::svc (void) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->orb_->run (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + return -1; + } + ACE_ENDTRY; + return 0; +} diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp new file mode 100644 index 00000000000..6134b826bf8 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.cpp @@ -0,0 +1,114 @@ +/** + * @file Consumer.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Consumer.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID(EC_Scalability, Consumer, "$Id$") + +ECS_Consumer::ECS_Consumer (int iterations) + : sample_history_ (iterations) +{ +} + +void +ECS_Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_supplier_.in ())) + return; + + this->proxy_supplier_ = + consumer_admin->obtain_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushConsumer_var consumer = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::ConsumerQOS consumer_qos; + consumer_qos.dependencies.length (2); + RtecEventComm::EventHeader& h0 = + consumer_qos.dependencies[0].event.header; + h0.type = ACE_ES_DISJUNCTION_DESIGNATOR; + h0.source = 1; + + RtecEventComm::EventHeader& h1 = + consumer_qos.dependencies[1].event.header; + h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type + h1.source = ACE_ES_EVENT_SOURCE_ANY; + + this->proxy_supplier_->connect_push_consumer (consumer.in (), + consumer_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECS_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushSupplier_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_supplier_.in ())) + return; + proxy = this->proxy_supplier_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_supplier (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +ACE_Sample_History & +ECS_Consumer::sample_history (void) +{ + return this->sample_history_; +} + +void +ECS_Consumer::push (const RtecEventComm::EventSet &events, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_hrtime_t now = ACE_OS::gethrtime (); + + ACE_hrtime_t creation; + ORBSVCS_Time::TimeT_to_hrtime (creation, + events[0].header.creation_time); + + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->sample_history_.sample (now - creation); +} + +void +ECS_Consumer::disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_supplier_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h new file mode 100644 index 00000000000..4bd7f38262e --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Consumer.h @@ -0,0 +1,65 @@ +/** + * @file Consumer.h + * + * $Id$ + * + */ + +#ifndef ECS_CONSUMER_H +#define ECS_CONSUMER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "ace/Sample_History.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECS_Consumer + * + * @brief Implement a simple consumer to keep track of the latency + * + */ +class ECS_Consumer + : public virtual POA_RtecEventComm::PushConsumer + , public virtual PortableServer::RefCountServantBase + +{ +public: + /// Constructor + ECS_Consumer (int iterations); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + /// Access the history of samples + ACE_Sample_History &sample_history (void); + + //@{ + /** @name The RtecEventComm::PushConsumer methods + */ + virtual void push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier_; + + /// The history of latency samples + ACE_Sample_History sample_history_; +}; + +#endif /* ECS_CONSUMER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp new file mode 100644 index 00000000000..61ff20dc369 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.cpp @@ -0,0 +1,103 @@ +/** + * @file Supplier.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Supplier.h" +#include "orbsvcs/Event_Service_Constants.h" + +ACE_RCSID(EC_Scalability, Supplier, "$Id$") + +ECS_Supplier::ECS_Supplier (void) +{ +} + +void +ECS_Supplier::connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (ACE_TRY_ENV); + ACE_CHECK; + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (!CORBA::is_nil (this->proxy_consumer_.in ())) + return; + + this->proxy_consumer_ = + supplier_admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + } + + RtecEventComm::PushSupplier_var supplier = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + + RtecEventChannelAdmin::SupplierQOS supplier_qos; + supplier_qos.publications.length (1); + RtecEventComm::EventHeader& sh0 = + supplier_qos.publications[0].event.header; + sh0.type = ACE_ES_EVENT_UNDEFINED; // first free event type + sh0.source = 1; // first free event source + + this->proxy_consumer_->connect_push_supplier (supplier.in (), + supplier_qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECS_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_._retn (); + } + + ACE_TRY + { + proxy->disconnect_push_consumer (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY {} ACE_ENDTRY; + + PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = poa->servant_to_id (this, + ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; +} + +void +ECS_Supplier::push (const RtecEventComm::EventSet &events, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + if (CORBA::is_nil (this->proxy_consumer_.in ())) + return; + proxy = this->proxy_consumer_; + } + proxy->push (events, ACE_TRY_ENV); + +} + +void +ECS_Supplier::disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_); + this->proxy_consumer_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); +} diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h new file mode 100644 index 00000000000..6e0fd8b780a --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/Supplier.h @@ -0,0 +1,60 @@ +/** + * @file Supplier.h + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + * + */ + +#ifndef ECS_SUPPLIER_H +#define ECS_SUPPLIER_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class ECS_Supplier + * + * @brief Implement a simple supplier to keep track of the latency + * + */ +class ECS_Supplier + : public virtual POA_RtecEventComm::PushSupplier + , public virtual PortableServer::RefCountServantBase +{ +public: + /// Constructor + ECS_Supplier (void); + + /// Connect to the event channel + void connect (RtecEventChannelAdmin::EventChannel_ptr ec, + CORBA::Environment &ACE_TRY_ENV); + + /// Disconnect from the event channel + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + void push (const RtecEventComm::EventSet &events, + CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + + //@{ + /** @name The RtecEventComm::PushSupplier methods + */ + virtual void disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)); + //@} + +private: + /// Synchronize access to the internal data + TAO_SYNCH_MUTEX mutex_; + + /// The proxy this object is connected to + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer_; +}; + +#endif /* ECS_SUPPLIER_H */ diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp new file mode 100644 index 00000000000..c95f50fc5d0 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/client.cpp @@ -0,0 +1,286 @@ +/** + * @file client.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "Consumer.h" +#include "Supplier.h" +#include "Client_Task.h" + +#include "orbsvcs/Event_Service_Constants.h" +#include "tao/Messaging.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/Strategies/advanced_resource.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" +#include "ace/Sample_History.h" +#include "ace/Basic_Stats.h" +#include "ace/Stats.h" +#include "ace/Sched_Params.h" +#include "ace/Task.h" + +ACE_RCSID(EC_Scalability, client, "$Id$") + +const char *ior = "file://test.ior"; +int consumer_count = 10; +int iterations = 10000; +int do_dump_history = 0; + +/// Parse the arguments. +static int parse_args (int argc, char *argv[]); + +int +main (int argc, char *argv []) +{ + int priority = + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + { + ACE_DEBUG ((LM_DEBUG, + "server (%P|%t): user is not superuser, " + "test runs in time-shared class\n")); + } + else + ACE_ERROR ((LM_ERROR, + "server (%P|%t): sched_params failed\n")); + } + + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var manager_object = + orb->resolve_initial_references ("ORBPolicyManager", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (manager_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Any sync_scope; + sync_scope <<= Messaging::SYNC_WITH_TARGET; + + CORBA::PolicyList policy_list (1); + policy_list.length (1); + policy_list[0] = + orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + sync_scope, + ACE_TRY_ENV); + ACE_TRY_CHECK; + policy_manager->set_policy_overrides (policy_list, + CORBA::SET_OVERRIDE, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to initialize the POA.\n"), + 1); + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + // Get the event channel object reference + CORBA::Object_var ec_object = + orb->string_to_object (ior, ACE_TRY_ENV); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var ec = + RtecEventChannelAdmin::EventChannel::_narrow (ec_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + if (CORBA::is_nil (ec.in ())) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) Invalid or nil event channel\n")); + return 1; + } + + ECS_Consumer **consumer_impl; + ACE_NEW_RETURN (consumer_impl, + ECS_Consumer*[consumer_count], + 1); + int i; + for (i = 0; i != consumer_count; ++i) + { + ACE_NEW_RETURN (consumer_impl[i], + ECS_Consumer (iterations), + 1); + consumer_impl[i]->connect (ec.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + ECS_Supplier *supplier_impl; + ACE_NEW_RETURN (supplier_impl, + ECS_Supplier, + 1); + PortableServer::ServantBase_var supplier_owner (supplier_impl); + + supplier_impl->connect (ec.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Connected consumer & supplier\n")); + + ECS_Client_Task task (orb.in ()); + task.activate (); + + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = ACE_ES_EVENT_UNDEFINED; + event[0].header.source = 1; + event[0].header.ttl = 1; + // event[0].data.payload.length(1024); + + ACE_hrtime_t start = ACE_OS::gethrtime (); + for (i = 0; i != iterations; ++i) + { + ACE_hrtime_t creation = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, + creation); + // push one event... + supplier_impl->push (event, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_hrtime_t end = ACE_OS::gethrtime (); + + for (i = 0; i != consumer_count; ++i) + { + consumer_impl[i]->disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + supplier_impl->disconnect (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Calibrate the high resolution timer *before* starting the + // test. + ACE_DEBUG ((LM_DEBUG, "Calibrating high res timer ....")); + ACE_High_Res_Timer::calibrate (); + + ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + ACE_DEBUG ((LM_DEBUG, "Done (%d)\n", gsf)); + + ACE_Sample_History history (iterations); + for (int j = 0; j != iterations; ++j) + { + ACE_UINT64 value = 0; + for (int i = 0; i != consumer_count; ++i) + { + ACE_Sample_History &consumer_history = + consumer_impl[i]->sample_history (); + + ACE_UINT64 consumer_sample = + consumer_history.get_sample (j); + if (consumer_sample > value) + value = consumer_sample; + } + history.sample (value); + } + + if (do_dump_history) + { + history.dump_samples ("HISTORY", gsf); + } + + ACE_Basic_Stats stats; + history.collect_basic_stats (stats); + stats.dump_results ("Total", gsf); + + ACE_Throughput_Stats::dump_throughput ("Total", gsf, + end - start, + stats.samples_count ()); + + + orb->shutdown (0, ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_Thread_Manager::instance ()->wait (); + + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::Exception, ex) + { + ACE_PRINT_EXCEPTION (ex, argv[0]); + } + ACE_ENDTRY; + return 0; +} + +// **************************************************************** + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "hc:i:k:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'h': + do_dump_history = 1; + break; + + case 'c': + consumer_count = ACE_OS::atoi (get_opts.optarg); + break; + + case 'i': + iterations = ACE_OS::atoi (get_opts.optarg); + break; + + case 'k': + ior = get_opts.optarg; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-h (dump full sample history) " + "-c <consumer_count> " + "-i <iterations> " + "-k <IOR> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp b/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp new file mode 100644 index 00000000000..9145637d711 --- /dev/null +++ b/TAO/orbsvcs/performance-tests/EC_Scalability/server.cpp @@ -0,0 +1,182 @@ +/** + * @file server.cpp + * + * $Id$ + * + * @author Carlos O'Ryan <coryan@uci.edu> + */ + +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/EC_Event_Channel.h" + +#include "ace/Get_Opt.h" +#include "ace/Auto_Ptr.h" +#include "ace/Sched_Params.h" + +ACE_RCSID(EC_Scalability, server, "$Id$") + +const char *ior_output_file = "test.ior"; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.optarg; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile>" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int main (int argc, char *argv[]) +{ + TAO_EC_Default_Factory::init_svcs (); + + int priority = + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; + priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO, + priority); + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + { + ACE_DEBUG ((LM_DEBUG, + "server (%P|%t): user is not superuser, " + "test runs in time-shared class\n")); + } + else + ACE_ERROR ((LM_ERROR, + "server (%P|%t): sched_params failed\n")); + } + + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var manager_object = + orb->resolve_initial_references ("ORBPolicyManager", + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::PolicyManager_var policy_manager = + CORBA::PolicyManager::_narrow (manager_object.in (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Any sync_scope; + sync_scope <<= Messaging::SYNC_WITH_TARGET; + + CORBA::PolicyList policy_list (1); + policy_list.length (1); + policy_list[0] = + orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, + sync_scope, + ACE_TRY_ENV); + ACE_TRY_CHECK; + policy_manager->set_policy_overrides (policy_list, + CORBA::SET_OVERRIDE, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (CORBA::is_nil (root_poa.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic: nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + TAO_EC_Event_Channel_Attributes attr (root_poa.in (), + root_poa.in ()); + TAO_EC_Event_Channel* ec_impl; + ACE_NEW_RETURN (ec_impl, + TAO_EC_Event_Channel (attr), + 1); + PortableServer::ServantBase_var ec_owner (ec_impl); + + ec_impl->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var ec = + ec_impl->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (ec.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Output the ior to the <ior_output_file> + FILE *output_file = ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + orb->run (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n")); + + root_poa->destroy (1, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |