diff options
-rw-r--r-- | TAO/ChangeLog-99c | 32 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp | 35 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/Basic/Reconnect.cpp | 70 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/Basic/Reconnect.h | 15 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/lib/Consumer.cpp | 5 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/lib/Driver.cpp | 86 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/lib/Driver.h | 11 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/lib/Driver.i | 1 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event/lib/Supplier.cpp | 32 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Makefile | 3 |
11 files changed, 211 insertions, 81 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index a174df82793..c130bfac0d1 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,3 +1,35 @@ +Mon May 10 21:54:30 1999 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/tests/Makefile: + Added the Event directory + + * orbsvcs/tests/Event/Basic/Reconnect.h: + * orbsvcs/tests/Event/Basic/Reconnect.cpp: + Added new option to control the number of disconnections, fixed + some minor problems. Removed execution of the regular test since + it will take too long to run thousands of consumers and + suppliers. + + * orbsvcs/tests/Event/lib/Driver.h: + * orbsvcs/tests/Event/lib/Driver.i: + * orbsvcs/tests/Event/lib/Driver.cpp: + Fixed and cleaned up memory managment for tasks. + Fixed computation of the Supplier QOS + Fixed argument parsing + Fixed throughput computation + + * orbsvcs/tests/Event/lib/Supplier.cpp: + Better error handling if the QoS is invalid + + * orbsvcs/tests/Event/lib/Consumer.cpp: + Fixed throughput computation + + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp: + Implemented reconnection of suppliers + + * orbsvcs/orbsvcs/Event/EC_Event_Channel.i: + Fixed typo + Mon May 10 19:44:16 1999 Jeff Parsons <parsons@cs.wustl.edu> * Domain.pidl: diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i index 19c1be241ed..979da8826eb 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i @@ -152,5 +152,5 @@ TAO_EC_Event_Channel::consumer_reconnect (void) const ACE_INLINE int TAO_EC_Event_Channel::supplier_reconnect (void) const { - return this->consumer_reconnect_; + return this->supplier_reconnect_; } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp index a428a612d5d..c5e37b2a796 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp @@ -11,6 +11,8 @@ ACE_RCSID(Event, EC_ProxyConsumer, "$Id$") +typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock; + TAO_EC_ProxyPushConsumer:: TAO_EC_ProxyPushConsumer (TAO_EC_Event_Channel* ec) : event_channel_ (ec), @@ -98,8 +100,37 @@ TAO_EC_ProxyPushConsumer::connect_push_supplier ( ACE_CHECK; if (this->is_connected_i ()) - ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ()); - + { + if (this->event_channel_->supplier_reconnect () == 0) + ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ()); + + // Re-connections are allowed, go ahead and disconnect the + // consumer... + this->supplier_ = + RtecEventComm::PushSupplier::_nil (); + + this->filter_->unbind (this); + this->event_channel_->supplier_filter_builder ()->destroy (this->filter_); + this->filter_ = 0; + + // @@ Please read the comments in EC_ProxySuppliers about + // possible race conditions in this area... + TAO_EC_Unlock reverse_lock (*this->lock_); + + { + ACE_GUARD_THROW_EX ( + TAO_EC_Unlock, ace_mon, reverse_lock, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + this->event_channel_->disconnected (this, ACE_TRY_ENV); + ACE_CHECK; + } + + // What if a second thread connected us after this? + if (this->is_connected_i ()) + return; + } this->supplier_ = RtecEventComm::PushSupplier::_duplicate (push_supplier); this->qos_ = qos; diff --git a/TAO/orbsvcs/tests/Event/Basic/Reconnect.cpp b/TAO/orbsvcs/tests/Event/Basic/Reconnect.cpp index 795cdebc017..14808260286 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Reconnect.cpp +++ b/TAO/orbsvcs/tests/Event/Basic/Reconnect.cpp @@ -19,7 +19,8 @@ main (int argc, char *argv []) EC_Reconnect::EC_Reconnect (void) : allow_consumer_reconnect_ (0), - allow_supplier_reconnect_ (0) + allow_supplier_reconnect_ (0), + disconnections_ (1000) { } @@ -29,7 +30,7 @@ EC_Reconnect::parse_args (int& argc, char* argv[]) if (this->EC_Driver::parse_args (argc, argv) != 0) return -1; - ACE_Get_Opt get_opt (argc, argv, "sc"); + ACE_Get_Opt get_opt (argc, argv, "scd:"); int opt; while ((opt = get_opt ()) != EOF) @@ -42,6 +43,9 @@ EC_Reconnect::parse_args (int& argc, char* argv[]) case 's': this->allow_supplier_reconnect_ = 1; break; + case 'd': + this->disconnections_ = ACE_OS::atoi (get_opt.optarg); + break; case '?': default: @@ -54,11 +58,25 @@ EC_Reconnect::parse_args (int& argc, char* argv[]) } void +EC_Reconnect::print_args (void) const +{ + this->EC_Driver::print_args (); + + ACE_DEBUG ((LM_DEBUG, "EC_Reconnect: \n" + " consumer_reconnect = %d\n" + " supplier_reconnect = %d\n" + " disconnect_count = %d\n", + this->allow_consumer_reconnect_, + this->allow_supplier_reconnect_, + this->disconnections_)); +} + +void EC_Reconnect::print_usage (void) { this->EC_Driver::print_usage (); - ACE_DEBUG ((LM_DEBUG, "EC_Reconnect usage: [-s] [-c]\n")); + ACE_DEBUG ((LM_DEBUG, "EC_Reconnect usage: [-s] [-c] [-d disc]\n")); } void @@ -69,23 +87,22 @@ EC_Reconnect::modify_attributes (TAO_EC_Event_Channel_Attributes& attr) } void -EC_Reconnect::consumer_push (void* consumer_cookie, - const RtecEventComm::EventSet& event, - CORBA::Environment& ACE_TRY_ENV) -{ - this->EC_Driver::consumer_push (consumer_cookie, - event, - ACE_TRY_ENV); -} - -void EC_Reconnect::execute_test (CORBA::Environment& ACE_TRY_ENV) { this->execute_consumer_test (ACE_TRY_ENV); this->execute_supplier_test (ACE_TRY_ENV); - // ?? this->EC_Driver::execute_test (ACE_TRY_ENV); + this->consumer_reconnect_.dump_results ("Reconnect", "consumer"); + + this->supplier_reconnect_.dump_results ("Reconnect", "supplier"); + + // this->EC_Driver::execute_test (ACE_TRY_ENV); +} + +void +EC_Reconnect::dump_results (void) +{ } void @@ -97,7 +114,7 @@ EC_Reconnect::execute_consumer_test (CORBA::Environment& ACE_TRY_ENV) if (this->allow_consumer_reconnect_) { - for (int i = 0; i < this->burst_count_; ++i) + for (int i = 0; i < this->disconnections_; ++i) { ACE_hrtime_t start = ACE_OS::gethrtime (); this->consumers_[0]->connect (qos, @@ -129,13 +146,20 @@ EC_Reconnect::execute_consumer_test (CORBA::Environment& ACE_TRY_ENV) "Expected AlreadyConnected exception"); } ACE_ENDTRY; + ACE_TRY_ENV.clear (); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->event_channel_->for_consumers (ACE_TRY_ENV); + ACE_CHECK; - for (int i = 0; i < this->burst_count_; ++i) + for (int i = 0; i < this->disconnections_; ++i) { ACE_hrtime_t start = ACE_OS::gethrtime (); this->consumers_[0]->disconnect (ACE_TRY_ENV); ACE_CHECK; - this->consumers_[0]->connect (qos, shutdown_event_type, + this->consumers_[0]->connect (consumer_admin.in (), + qos, + shutdown_event_type, ACE_TRY_ENV); ACE_CHECK; ACE_hrtime_t stop = ACE_OS::gethrtime (); @@ -153,7 +177,7 @@ EC_Reconnect::execute_supplier_test (CORBA::Environment& ACE_TRY_ENV) if (this->allow_supplier_reconnect_) { - for (int i = 0; i < this->burst_count_; ++i) + for (int i = 0; i < this->disconnections_; ++i) { ACE_hrtime_t start = ACE_OS::gethrtime (); this->suppliers_[0]->connect (qos, shutdown_event_type, @@ -183,13 +207,19 @@ EC_Reconnect::execute_supplier_test (CORBA::Environment& ACE_TRY_ENV) "Expected AlreadyConnected exception"); } ACE_ENDTRY; + ACE_TRY_ENV.clear (); + + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->event_channel_->for_suppliers (ACE_TRY_ENV); + ACE_CHECK; - for (int i = 0; i < this->burst_count_; ++i) + for (int i = 0; i < this->disconnections_; ++i) { ACE_hrtime_t start = ACE_OS::gethrtime (); this->suppliers_[0]->disconnect (ACE_TRY_ENV); ACE_CHECK; - this->suppliers_[0]->connect (qos, + this->suppliers_[0]->connect (supplier_admin.in (), + qos, shutdown_event_type, ACE_TRY_ENV); ACE_CHECK; diff --git a/TAO/orbsvcs/tests/Event/Basic/Reconnect.h b/TAO/orbsvcs/tests/Event/Basic/Reconnect.h index 4f4ae15ba2d..9e98eee77df 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Reconnect.h +++ b/TAO/orbsvcs/tests/Event/Basic/Reconnect.h @@ -36,8 +36,7 @@ class EC_Reconnect : public EC_Driver // disabled (the default) // + The EC does allow reconnections if the feature is enabled // and: - // - The subscriptions are changed (eventually) - // - The are no memory leaks + // - There are no memory leaks // - Compares the time required for a reconnection vs a complete // connect/disconnect cycle, specially as the number of // suppliers and consumers increases. @@ -48,6 +47,7 @@ public: // = The EC_Driver methods virtual int parse_args (int& argc, char* argv[]); + virtual void print_args (void) const; virtual void print_usage (void); // add some command line args to enable/disable reconnections @@ -57,20 +57,21 @@ public: void execute_test (CORBA::Environment& ACE_TRY_ENV); // Don't run the suppliers, just test connect and disconnect calls. + void dump_results (void); + // Don't dump the EC_Driver results, they are meaningless. + void execute_consumer_test (CORBA::Environment& ACE_TRY_ENV); void execute_supplier_test (CORBA::Environment& ACE_TRY_ENV); // Separate the suppliers and consumers. - virtual void consumer_push (void* consumer_cookie, - const RtecEventComm::EventSet& event, - CORBA::Environment& ACE_TRY_ENV); - // Verify that the subscription changes - private: int allow_consumer_reconnect_; int allow_supplier_reconnect_; // What aspect of reconnection are we going to test? + int disconnections_; + // The number of disconnections + EC_Driver::Latency_Stats consumer_reconnect_; EC_Driver::Latency_Stats supplier_reconnect_; }; diff --git a/TAO/orbsvcs/tests/Event/lib/Consumer.cpp b/TAO/orbsvcs/tests/Event/lib/Consumer.cpp index 20140722f28..4f5cde9e7de 100644 --- a/TAO/orbsvcs/tests/Event/lib/Consumer.cpp +++ b/TAO/orbsvcs/tests/Event/lib/Consumer.cpp @@ -35,6 +35,9 @@ EC_Consumer::connect ( int shutdown_event_type, CORBA::Environment &ACE_TRY_ENV) { + if (CORBA::is_nil (this->supplier_proxy_.in ())) + return; // @@ Throw? + this->shutdown_event_type_ = shutdown_event_type; RtecEventComm::PushConsumer_var objref = this->_this (ACE_TRY_ENV); @@ -98,6 +101,8 @@ EC_Consumer::push (const RtecEventComm::EventSet& events, } ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + if (this->push_count_ == 0) + this->throughput_.start (); // We start the timer as soon as we receive the first event... this->throughput_.sample (); diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.cpp b/TAO/orbsvcs/tests/Event/lib/Driver.cpp index b50a5c7f2ab..815d2527742 100644 --- a/TAO/orbsvcs/tests/Event/lib/Driver.cpp +++ b/TAO/orbsvcs/tests/Event/lib/Driver.cpp @@ -26,6 +26,7 @@ EC_Driver::EC_Driver (void) consumers_ (0), n_suppliers_ (1), suppliers_ (0), + tasks_ (0), burst_count_ (100), burst_size_ (100), payload_size_ (0), @@ -532,10 +533,8 @@ EC_Driver::build_consumer_qos ( qos_factory.start_disjunction_group (); qos_factory.insert_type (shutdown_event_type, rt_info); - for (int i = 0; i != this->consumer_type_count_; ++i) - { - qos_factory.insert_type (type_start + i, rt_info); - } + for (int j = 0; j != this->consumer_type_count_; ++j) + qos_factory.insert_type (type_start + j, rt_info); qos = qos_factory.get_ConsumerQOS (); } @@ -583,21 +582,25 @@ EC_Driver::build_supplier_qos ( RtecScheduler::handle_t rt_info = 0; ACE_SupplierQOS_Factory qos_factory; - for (int i = 0; i != this->supplier_type_count_; ++i) - { - qos_factory.insert (supplier_id, - type_start + i, - rt_info, 1); - } + for (int j = 0; j != this->supplier_type_count_; ++j) + qos_factory.insert (supplier_id, + type_start + j, + rt_info, 1); + qos_factory.insert (supplier_id, shutdown_event_type, rt_info, 1); + + qos = qos_factory.get_SupplierQOS (); } void EC_Driver::execute_test (CORBA::Environment &ACE_TRY_ENV) { - this->activate_suppliers (ACE_TRY_ENV); + if (this->allocate_tasks () == -1) + return; + + this->activate_tasks (ACE_TRY_ENV); ACE_CHECK; if (this->verbose ()) @@ -613,6 +616,19 @@ EC_Driver::execute_test (CORBA::Environment &ACE_TRY_ENV) ACE_DEBUG ((LM_DEBUG, "EC_Driver (%P|%t) suppliers finished\n")); } +int +EC_Driver::allocate_tasks (void) +{ + ACE_NEW_RETURN (this->tasks_, + ACE_Task_Base*[this->n_suppliers_], + -1); + + for (int i = 0; i < this->n_suppliers_; ++i) + this->tasks_[i] = + this->allocate_task (i); + return 0; +} + ACE_Task_Base* EC_Driver::allocate_task (int i) { @@ -628,6 +644,27 @@ EC_Driver::allocate_task (int i) } void +EC_Driver::activate_tasks (CORBA::Environment &) +{ + int priority = + (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; + + for (int i = 0; i < this->n_suppliers_; ++i) + { + this->tasks_[i] = this->allocate_task (i); + + if (this->tasks_[i]->activate (this->thr_create_flags_, + 1, 0, priority) == -1) + { + ACE_ERROR ((LM_ERROR, + "Cannot activate thread for supplier %d\n", + i)); + } + } +} + +void EC_Driver::disconnect_suppliers (CORBA::Environment &ACE_TRY_ENV) { for (int i = 0; i < this->n_suppliers_; ++i) @@ -861,7 +898,7 @@ EC_Driver::parse_args (int &argc, char *argv []) else { - arg_shifter.consume_arg (); + arg_shifter.ignore_arg (); } } @@ -898,30 +935,6 @@ EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes&) } void -EC_Driver::activate_suppliers (CORBA::Environment &) -{ - int priority = - (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) - + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; - - ACE_NEW (this->tasks_, - ACE_Task_Base*[this->n_suppliers_]); - - for (int i = 0; i < this->n_suppliers_; ++i) - { - this->tasks_[i] = this->allocate_task (i); - - if (this->tasks_[i]->activate (this->thr_create_flags_, - 1, 0, priority) == -1) - { - ACE_ERROR ((LM_ERROR, - "Cannot activate thread for supplier %d\n", - i)); - } - } -} - -void EC_Driver::consumer_push (void*, const RtecEventComm::EventSet&, CORBA::Environment&) @@ -1024,7 +1037,6 @@ EC_Driver::Throughput_Stats::dump_results (const char *test_name, if (this->done_ == 0) { - this->stop (); ACE_DEBUG ((LM_DEBUG, "%s/%s: incomplete data," " potentially skewed results\n", diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.h b/TAO/orbsvcs/tests/Event/lib/Driver.h index 5f6d5afb66b..2a5a2bd24f0 100644 --- a/TAO/orbsvcs/tests/Event/lib/Driver.h +++ b/TAO/orbsvcs/tests/Event/lib/Driver.h @@ -131,10 +131,6 @@ public: virtual void modify_attributes (TAO_EC_Event_Channel_Attributes& attr); // Allow modifications of the default EC attributes - virtual void activate_suppliers (CORBA::Environment& env); - // Activate all the suppliers, by default runs each supplier on its - // own thread. - virtual void consumer_push (void* consumer_cookie, const RtecEventComm::EventSet& event, CORBA::Environment& ACE_TRY_ENV); @@ -153,7 +149,7 @@ public: // Obtain the EC from the Naming service virtual CosNaming::NamingContext_var - get_naming_context (CORBA::Environment &ACE_TRY_ENV); + get_naming_context (CORBA::Environment &ACE_TRY_ENV); #endif #if !defined(EC_DISABLE_OLD_EC) @@ -172,9 +168,14 @@ public: virtual EC_Supplier* allocate_supplier (int i); // Allocate one consumer or supplier + virtual int allocate_tasks (void); virtual ACE_Task_Base* allocate_task (int i); // Allocate one task for supplier number <i> + virtual void activate_tasks (CORBA::Environment& env); + // Activate all the tasks, by default runs each supplier on its + // own thread. + class Latency_Stats { // = TITLE diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.i b/TAO/orbsvcs/tests/Event/lib/Driver.i index 3c189faa883..b93a23fdbd2 100644 --- a/TAO/orbsvcs/tests/Event/lib/Driver.i +++ b/TAO/orbsvcs/tests/Event/lib/Driver.i @@ -62,4 +62,5 @@ ACE_INLINE void EC_Driver::Throughput_Stats::sample (void) { this->n_++; + this->stop_ = ACE_OS::gethrtime (); } diff --git a/TAO/orbsvcs/tests/Event/lib/Supplier.cpp b/TAO/orbsvcs/tests/Event/lib/Supplier.cpp index 928f00114d2..58a58a2cd9e 100644 --- a/TAO/orbsvcs/tests/Event/lib/Supplier.cpp +++ b/TAO/orbsvcs/tests/Event/lib/Supplier.cpp @@ -56,6 +56,9 @@ EC_Supplier::send_event (const RtecEventComm::EventSet& event, { ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + if (this->push_count_ == 0) + this->throughput_.start (); + // We start the timer as soon as we receive the first event... this->throughput_.sample (); @@ -75,16 +78,26 @@ void EC_Supplier::event_type (int event_number, RtecEventComm::Event &event) { - int i = event_number % this->qos_.publications.length (); - int type = this->qos_.publications[i].event.header.type; - if (type == this->shutdown_event_type_) - i = 0; + CORBA::ULong l = this->qos_.publications.length (); + + if (l == 0) + { + event.header.source = 0; + event.header.type = this->shutdown_event_type_; + } + else + { + int i = event_number % l; + int type = this->qos_.publications[i].event.header.type; + if (type == this->shutdown_event_type_) + i = 0; - RtecEventComm::EventHeader& header = - this->qos_.publications[i].event.header; + RtecEventComm::EventHeader& header = + this->qos_.publications[i].event.header; - event.header.source = header.source; - event.header.type = header.type; + event.header.source = header.source; + event.header.type = header.type; + } } void @@ -106,6 +119,9 @@ EC_Supplier::connect (const RtecEventChannelAdmin::SupplierQOS& qos, int shutdown_event_type, CORBA::Environment &ACE_TRY_ENV) { + if (CORBA::is_nil (this->consumer_proxy_.in ())) + return; // @@ Throw? + this->qos_ = qos; this->shutdown_event_type_ = shutdown_event_type; diff --git a/TAO/orbsvcs/tests/Makefile b/TAO/orbsvcs/tests/Makefile index c4247d21f1b..cb6c4e9d3a3 100644 --- a/TAO/orbsvcs/tests/Makefile +++ b/TAO/orbsvcs/tests/Makefile @@ -23,7 +23,8 @@ DIRS = Simple_Naming \ CosEC_Multiple \ ImplRepo \ Sched_Conf \ - Time + Time \ + Event #---------------------------------------------------------------------------- # Include macros and targets |