From 1ebb71ac683e1f26289cf10e2ba4835ed9e9a739 Mon Sep 17 00:00:00 2001 From: coryan Date: Wed, 21 Apr 1999 19:48:43 +0000 Subject: ChangeLogTag:Wed Apr 21 14:45:28 1999 Carlos O'Ryan --- TAO/ChangeLog-99c | 37 +++++++++++++++++++ TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp | 2 +- TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp | 8 ++--- TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h | 4 --- TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp | 42 +++++++++++++--------- TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h | 8 +++++ TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp | 6 ++-- TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp | 6 ++-- TAO/orbsvcs/orbsvcs/Event/README | 16 ++++++++- TAO/orbsvcs/orbsvcs/Event_Service_Constants.h | 3 ++ TAO/orbsvcs/orbsvcs/Event_Utilities.i | 8 ++--- TAO/orbsvcs/orbsvcs/Makefile | 2 +- TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp | 18 ++++++++-- 13 files changed, 119 insertions(+), 41 deletions(-) diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index c436f207415..8896fa9337d 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,3 +1,40 @@ +Wed Apr 21 14:45:28 1999 Carlos O'Ryan + + * orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp: + Use Recursive_Mutexes for the Basic_Factory, otherwise we risk a + dead-lock if the user decides to disconnect or push a new event + that it finally reaches the same consumer. + + * orbsvcs/orbsvcs/Event/EC_Dispatching.h: + * orbsvcs/orbsvcs/Event/EC_Dispatching.cpp: + * orbsvcs/orbsvcs/Event/EC_ProxySupplier.h: + * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp: + Added new method push_to_consumer() to the ProxyPushSupplier + implementation, this method is invoked by the dispatching module + to really push the event. Its job is to verify that the consumer + is not disconnected and/or suspended. + The Dispatching module does not need to receive the consumer + anymore. + + * orbsvcs/orbsvcs/Event_Service_Constants.h: + * orbsvcs/orbsvcs/Event_Utilities.i: + Added new macro for the ANY_SOURCE source id. + + * orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp: + * orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp: + Timeout filters should not accept any events in the filter() + methods. + The Timeout generator passes the event directly to their + push_nocopy() methods. + + * orbsvcs/orbsvcs/Makefile: + Fixed little problem with the Time -> ImplRepo dependency in + TAO_ORBSVCS + + * orbsvcs/tests/Event_Latency/Event_Latency.cpp: + Added compile-time support for the new Event Channel, just for + experimentation purposes. + Wed Apr 21 11:06:57 1999 David L. Levine * performance-tests/Cubit/TAO/MT_Cubit/Globals.{h,cpp}, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp index b709323b563..f3ddfbbf6b9 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp @@ -167,7 +167,7 @@ TAO_EC_Basic_Factory::destroy_consumer_lock (ACE_Lock* x) ACE_Lock* TAO_EC_Basic_Factory::create_supplier_lock (void) { - return new ACE_Lock_Adapter (); + return new ACE_Lock_Adapter (); } void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp index d380adb9d98..8664fec1a3c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp @@ -30,21 +30,19 @@ TAO_EC_Reactive_Dispatching::shutdown (void) } void -TAO_EC_Reactive_Dispatching::push (TAO_EC_ProxyPushSupplier*, - RtecEventComm::PushConsumer_ptr consumer, +TAO_EC_Reactive_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy, const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - consumer->push (event, ACE_TRY_ENV); + proxy->push_to_consumer (event, ACE_TRY_ENV); } void TAO_EC_Reactive_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy, - RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - consumer->push (event, ACE_TRY_ENV); + proxy->push_to_consumer (event, ACE_TRY_ENV); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h index 8b99ed935cb..b4ae4e44869 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h @@ -61,12 +61,10 @@ public: // their jobs. virtual void push (TAO_EC_ProxyPushSupplier* proxy, - RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env) = 0; virtual void push_nocopy (TAO_EC_ProxyPushSupplier* proxy, - RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env) = 0; @@ -95,12 +93,10 @@ public: virtual void activate (void); virtual void shutdown (void); virtual void push (TAO_EC_ProxyPushSupplier* proxy, - RtecEventComm::PushConsumer_ptr consumer, const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env); virtual void push_nocopy (TAO_EC_ProxyPushSupplier* proxy, - RtecEventComm::PushConsumer_ptr consumer, RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index 9d612e50bd1..e3942715db5 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -201,14 +201,10 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // Do not take a lock, this is a call back from our child filter, so - // we are holding the lock already (in the filter() method). - if (this->is_connected_i ()) - this->event_channel_->dispatching ()->push (this, - this->consumer_.in (), - event, - qos_info, - ACE_TRY_ENV); + this->event_channel_->dispatching ()->push (this, + event, + qos_info, + ACE_TRY_ENV); this->child_->clear (); } @@ -217,17 +213,31 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // Do not take a lock, this is a call back from our child filter, so - // we are holding the lock already (in the filter() method). - if (this->is_connected_i ()) - this->event_channel_->dispatching ()->push_nocopy (this, - this->consumer_.in (), - event, - qos_info, - ACE_TRY_ENV); + this->event_channel_->dispatching ()->push_nocopy (this, + event, + qos_info, + ACE_TRY_ENV); this->child_->clear (); } +void +TAO_EC_ProxyPushSupplier::push_to_consumer (const RtecEventComm::EventSet& event, + CORBA::Environment& ACE_TRY_ENV) +{ + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + if (this->is_connected_i () == 0) + return; // TAO_THROW (RtecEventComm::Disconnected ());???? + + if (this->suspended_ != 0) + return; + + this->consumer_->push (event, ACE_TRY_ENV); +} + void TAO_EC_ProxyPushSupplier::clear (void) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h index f889cfb8214..a493233cc89 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h @@ -102,6 +102,11 @@ public: virtual PortableServer::POA_ptr _default_POA (CORBA::Environment& env); // Override the ServantBase method. + void push_to_consumer (const RtecEventComm::EventSet &event, + CORBA::Environment &env); + // Pushes to the consumer, verifies that it is connected and that it + // is not suspended. + // = The RtecEventChannelAdmin::ProxyPushSupplier methods... virtual void connect_push_consumer ( RtecEventComm::PushConsumer_ptr push_consumer, @@ -159,6 +164,9 @@ private: CORBA::Boolean suspended_; // Is this consumer suspended? + int state_; + // The state, see the enum above for a description. + RtecEventChannelAdmin::ConsumerQOS qos_; // The subscription and QoS information... diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp index 6bc7b0afa8c..d74c7d3d478 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp @@ -62,8 +62,7 @@ TAO_EC_Timeout_Filter::filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - this->push (event, qos_info, ACE_TRY_ENV); - return 1; + return 0; } int @@ -71,8 +70,7 @@ TAO_EC_Timeout_Filter::filter_nocopy (RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - this->push_nocopy (event, qos_info, ACE_TRY_ENV); - return 1; + return 0; } void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp index 8cc5e206761..d8461e2b056 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp @@ -40,9 +40,9 @@ TAO_EC_Timeout_Adapter::handle_timeout (const ACE_Time_Value & /* tv */, RtecEventComm::EventSet single_event (1, 1, &e, 0); TAO_EC_QOS_Info qos_info = filter->qos_info (); - filter->filter (single_event, - qos_info, - ACE_TRY_ENV); + filter->push_nocopy (single_event, + qos_info, + ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY diff --git a/TAO/orbsvcs/orbsvcs/Event/README b/TAO/orbsvcs/orbsvcs/Event/README index c8d2ebc84f2..a59664868d4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/README +++ b/TAO/orbsvcs/orbsvcs/Event/README @@ -223,9 +223,23 @@ components: The SupplierFiltering object pushes the event [since the event is a set it has to push one event at a time] to the a set of ProxyPushSuppliers [recall that this are the consumer ambassadors] - + They pass the event through their own set of filters, if the + filter accepts the event it callbacks on the ProxyPushSupplier. + At that point the ProxyPushSupplier requests that the + DispatchingModule pushes the event. + The dispatching module finally pushes the event to the consumer. - Adding a consumer: + The client calls for_consumers() and obtain_push_supplier() to + obtain a reference to the [global] ConsumerAdmin object and to its + own ProxyPushSupplier object. + The ProxyPushSupplier object is initially empty, once the user + calls connect_push_consumer() on it the set of filters is created + using the Filter_Builder strategy and the user supplied QoS + parameters. + At this point the ProxyPushSupplier becomes "connected" and it + uses the Event_Channel implementation to broadcast its + subscriptions to all the Suppliers in the set. - Adding a supplier: diff --git a/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h b/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h index dd91d88c36e..fbcab0569b7 100644 --- a/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h +++ b/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h @@ -46,6 +46,9 @@ const long ACE_ES_CONJUNCTION_DESIGNATOR = 8; const long ACE_ES_DISJUNCTION_DESIGNATOR = 9; const long ACE_ES_EVENT_UNDEFINED = 16; +// = Predefined event sources. +const long ACE_ES_EVENT_SOURCE_ANY = 0; + // The max number of priorities provided by the target platform. // TODO: This should be defined in ACE (somehow) and only mapped here // to some variables (and even that is doubtful). diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.i b/TAO/orbsvcs/orbsvcs/Event_Utilities.i index 53910d1ac1c..34fb9916eaa 100644 --- a/TAO/orbsvcs/orbsvcs/Event_Utilities.i +++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.i @@ -22,7 +22,7 @@ ACE_ConsumerQOS_Factory::insert_type (RtecEventComm::EventType type, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.header.source = 0; + dependency.event.header.source = ACE_ES_EVENT_SOURCE_ANY; dependency.event.header.type = type; //dependency.event.header.creation_time = 0; //dependency.event.header.ec_recv_time = 0; @@ -51,7 +51,7 @@ ACE_ConsumerQOS_Factory::insert_time (RtecEventComm::EventType type, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.header.source = 0; + dependency.event.header.source = ACE_ES_EVENT_SOURCE_ANY; dependency.event.header.type = type; dependency.event.header.creation_time = interval; //dependency.event.header.ec_recv_time = 0; @@ -73,7 +73,7 @@ ACE_ConsumerQOS_Factory::insert_act (RtecEventComm::EventData act) return this->insert (dependency); } -ACE_INLINE const RtecEventChannelAdmin::ConsumerQOS& +ACE_INLINE const RtecEventChannelAdmin::ConsumerQOS& ACE_ConsumerQOS_Factory::get_ConsumerQOS (void) { return qos_; @@ -87,7 +87,7 @@ ACE_ConsumerQOS_Factory::operator const RtecEventChannelAdmin::ConsumerQOS& (voi // ************************************************************ -ACE_INLINE const RtecEventChannelAdmin::SupplierQOS& +ACE_INLINE const RtecEventChannelAdmin::SupplierQOS& ACE_SupplierQOS_Factory::get_SupplierQOS (void) { return qos_; diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile index 4b1b4f36922..cdb29e463b9 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile +++ b/TAO/orbsvcs/orbsvcs/Makefile @@ -90,7 +90,7 @@ ifneq (,$(findstring Time,$(TAO_ORBSVCS))) #### TAO's Time Service requires its ImplRepo Service. ifeq (,$(findstring ImplRepo,$(TAO_ORBSVCS))) #### Add ImplRepo - TAO_ORBSVCS += "ImplRepo " + TAO_ORBSVCS += ImplRepo endif # ! ImplRepo endif # Time diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp index 218c5d1debc..e84518490c7 100644 --- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp +++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp @@ -13,6 +13,8 @@ #include "orbsvcs/Scheduler_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Basic_Factory.h" #include "Event_Latency.h" #include "tao/Timeprobe.h" @@ -623,8 +625,8 @@ Latency_Supplier::push (const RtecEventComm::EventSet &events, } else { - ACE_ERROR ((LM_ERROR, "(%t) %s received unexpected events: ", - entry_point ())); + ACE_ERROR ((LM_ERROR, "(%t) %s received unexpected events: %d\n", + entry_point (), events[i].header.type)); // ::dump_sequence (events); return; } @@ -887,6 +889,7 @@ main (int argc, char *argv []) // the cost of doing it later. ACE_TIMEPROBE_RESET; +#if 1 CosNaming::Name channel_name (1); channel_name.length (1); channel_name[0].id = CORBA::string_dup ("EventService"); @@ -899,6 +902,17 @@ main (int argc, char *argv []) RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in (), TAO_TRY_ENV); TAO_CHECK_ENV; +#else + TAO_EC_Basic_Factory ec_factory (root_poa.in ()); + + TAO_EC_Event_Channel ec_impl (&ec_factory); + ec_impl.activate (TAO_TRY_ENV); + TAO_CHECK_ENV; + + RtecEventChannelAdmin::EventChannel_var ec = + ec_impl._this (TAO_TRY_ENV); + TAO_CHECK_ENV; +#endif /* 0 */ // Create supplier(s). Latency_Supplier **supplier; -- cgit v1.2.1