summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-21 19:48:43 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-04-21 19:48:43 +0000
commit1ebb71ac683e1f26289cf10e2ba4835ed9e9a739 (patch)
tree68189489992fa799d9373d7cea02659034d11567
parentce3dfadfd9d7b01f721b12ff85c565cb5eb4d5bf (diff)
downloadATCD-1ebb71ac683e1f26289cf10e2ba4835ed9e9a739.tar.gz
ChangeLogTag:Wed Apr 21 14:45:28 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog-99c37
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp8
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp42
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/README16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event_Service_Constants.h3
-rw-r--r--TAO/orbsvcs/orbsvcs/Event_Utilities.i8
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile2
-rw-r--r--TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp18
13 files changed, 119 insertions, 41 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c
index c436f207415..8896fa9337d 100644
--- a/TAO/ChangeLog-99c
+++ b/TAO/ChangeLog-99c
@@ -1,3 +1,40 @@
+Wed Apr 21 14:45:28 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp:
+ Use Recursive_Mutexes for the Basic_Factory, otherwise we risk a
+ dead-lock if the user decides to disconnect or push a new event
+ that it finally reaches the same consumer.
+
+ * orbsvcs/orbsvcs/Event/EC_Dispatching.h:
+ * orbsvcs/orbsvcs/Event/EC_Dispatching.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp:
+ Added new method push_to_consumer() to the ProxyPushSupplier
+ implementation, this method is invoked by the dispatching module
+ to really push the event. Its job is to verify that the consumer
+ is not disconnected and/or suspended.
+ The Dispatching module does not need to receive the consumer
+ anymore.
+
+ * orbsvcs/orbsvcs/Event_Service_Constants.h:
+ * orbsvcs/orbsvcs/Event_Utilities.i:
+ Added new macro for the ANY_SOURCE source id.
+
+ * orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp:
+ Timeout filters should not accept any events in the filter()
+ methods.
+ The Timeout generator passes the event directly to their
+ push_nocopy() methods.
+
+ * orbsvcs/orbsvcs/Makefile:
+ Fixed little problem with the Time -> ImplRepo dependency in
+ TAO_ORBSVCS
+
+ * orbsvcs/tests/Event_Latency/Event_Latency.cpp:
+ Added compile-time support for the new Event Channel, just for
+ experimentation purposes.
+
Wed Apr 21 11:06:57 1999 David L. Levine <levine@cs.wustl.edu>
* performance-tests/Cubit/TAO/MT_Cubit/Globals.{h,cpp},
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
index b709323b563..f3ddfbbf6b9 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
@@ -167,7 +167,7 @@ TAO_EC_Basic_Factory::destroy_consumer_lock (ACE_Lock* x)
ACE_Lock*
TAO_EC_Basic_Factory::create_supplier_lock (void)
{
- return new ACE_Lock_Adapter<ACE_SYNCH_MUTEX> ();
+ return new ACE_Lock_Adapter<ACE_SYNCH_RECURSIVE_MUTEX> ();
}
void
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp
index d380adb9d98..8664fec1a3c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.cpp
@@ -30,21 +30,19 @@ TAO_EC_Reactive_Dispatching::shutdown (void)
}
void
-TAO_EC_Reactive_Dispatching::push (TAO_EC_ProxyPushSupplier*,
- RtecEventComm::PushConsumer_ptr consumer,
+TAO_EC_Reactive_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy,
const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- consumer->push (event, ACE_TRY_ENV);
+ proxy->push_to_consumer (event, ACE_TRY_ENV);
}
void
TAO_EC_Reactive_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
- RtecEventComm::PushConsumer_ptr consumer,
RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- consumer->push (event, ACE_TRY_ENV);
+ proxy->push_to_consumer (event, ACE_TRY_ENV);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h
index 8b99ed935cb..b4ae4e44869 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching.h
@@ -61,12 +61,10 @@ public:
// their jobs.
virtual void push (TAO_EC_ProxyPushSupplier* proxy,
- RtecEventComm::PushConsumer_ptr consumer,
const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& env) = 0;
virtual void push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
- RtecEventComm::PushConsumer_ptr consumer,
RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& env) = 0;
@@ -95,12 +93,10 @@ public:
virtual void activate (void);
virtual void shutdown (void);
virtual void push (TAO_EC_ProxyPushSupplier* proxy,
- RtecEventComm::PushConsumer_ptr consumer,
const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& env);
virtual void push_nocopy (TAO_EC_ProxyPushSupplier* proxy,
- RtecEventComm::PushConsumer_ptr consumer,
RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& env);
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index 9d612e50bd1..e3942715db5 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -201,14 +201,10 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // Do not take a lock, this is a call back from our child filter, so
- // we are holding the lock already (in the filter() method).
- if (this->is_connected_i ())
- this->event_channel_->dispatching ()->push (this,
- this->consumer_.in (),
- event,
- qos_info,
- ACE_TRY_ENV);
+ this->event_channel_->dispatching ()->push (this,
+ event,
+ qos_info,
+ ACE_TRY_ENV);
this->child_->clear ();
}
@@ -217,18 +213,32 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // Do not take a lock, this is a call back from our child filter, so
- // we are holding the lock already (in the filter() method).
- if (this->is_connected_i ())
- this->event_channel_->dispatching ()->push_nocopy (this,
- this->consumer_.in (),
- event,
- qos_info,
- ACE_TRY_ENV);
+ this->event_channel_->dispatching ()->push_nocopy (this,
+ event,
+ qos_info,
+ ACE_TRY_ENV);
this->child_->clear ();
}
void
+TAO_EC_ProxyPushSupplier::push_to_consumer (const RtecEventComm::EventSet& event,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+
+ if (this->is_connected_i () == 0)
+ return; // TAO_THROW (RtecEventComm::Disconnected ());????
+
+ if (this->suspended_ != 0)
+ return;
+
+ this->consumer_->push (event, ACE_TRY_ENV);
+}
+
+void
TAO_EC_ProxyPushSupplier::clear (void)
{
ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
index f889cfb8214..a493233cc89 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
@@ -102,6 +102,11 @@ public:
virtual PortableServer::POA_ptr _default_POA (CORBA::Environment& env);
// Override the ServantBase method.
+ void push_to_consumer (const RtecEventComm::EventSet &event,
+ CORBA::Environment &env);
+ // Pushes to the consumer, verifies that it is connected and that it
+ // is not suspended.
+
// = The RtecEventChannelAdmin::ProxyPushSupplier methods...
virtual void connect_push_consumer (
RtecEventComm::PushConsumer_ptr push_consumer,
@@ -159,6 +164,9 @@ private:
CORBA::Boolean suspended_;
// Is this consumer suspended?
+ int state_;
+ // The state, see the enum above for a description.
+
RtecEventChannelAdmin::ConsumerQOS qos_;
// The subscription and QoS information...
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp
index 6bc7b0afa8c..d74c7d3d478 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp
@@ -62,8 +62,7 @@ TAO_EC_Timeout_Filter::filter (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- this->push (event, qos_info, ACE_TRY_ENV);
- return 1;
+ return 0;
}
int
@@ -71,8 +70,7 @@ TAO_EC_Timeout_Filter::filter_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- this->push_nocopy (event, qos_info, ACE_TRY_ENV);
- return 1;
+ return 0;
}
void
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp
index 8cc5e206761..d8461e2b056 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Generator.cpp
@@ -40,9 +40,9 @@ TAO_EC_Timeout_Adapter::handle_timeout (const ACE_Time_Value & /* tv */,
RtecEventComm::EventSet single_event (1, 1, &e, 0);
TAO_EC_QOS_Info qos_info = filter->qos_info ();
- filter->filter (single_event,
- qos_info,
- ACE_TRY_ENV);
+ filter->push_nocopy (single_event,
+ qos_info,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
diff --git a/TAO/orbsvcs/orbsvcs/Event/README b/TAO/orbsvcs/orbsvcs/Event/README
index c8d2ebc84f2..a59664868d4 100644
--- a/TAO/orbsvcs/orbsvcs/Event/README
+++ b/TAO/orbsvcs/orbsvcs/Event/README
@@ -223,9 +223,23 @@ components:
The SupplierFiltering object pushes the event [since the event is
a set it has to push one event at a time] to the a set of
ProxyPushSuppliers [recall that this are the consumer ambassadors]
-
+ They pass the event through their own set of filters, if the
+ filter accepts the event it callbacks on the ProxyPushSupplier.
+ At that point the ProxyPushSupplier requests that the
+ DispatchingModule pushes the event.
+ The dispatching module finally pushes the event to the consumer.
- Adding a consumer:
+ The client calls for_consumers() and obtain_push_supplier() to
+ obtain a reference to the [global] ConsumerAdmin object and to its
+ own ProxyPushSupplier object.
+ The ProxyPushSupplier object is initially empty, once the user
+ calls connect_push_consumer() on it the set of filters is created
+ using the Filter_Builder strategy and the user supplied QoS
+ parameters.
+ At this point the ProxyPushSupplier becomes "connected" and it
+ uses the Event_Channel implementation to broadcast its
+ subscriptions to all the Suppliers in the set.
- Adding a supplier:
diff --git a/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h b/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h
index dd91d88c36e..fbcab0569b7 100644
--- a/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h
+++ b/TAO/orbsvcs/orbsvcs/Event_Service_Constants.h
@@ -46,6 +46,9 @@ const long ACE_ES_CONJUNCTION_DESIGNATOR = 8;
const long ACE_ES_DISJUNCTION_DESIGNATOR = 9;
const long ACE_ES_EVENT_UNDEFINED = 16;
+// = Predefined event sources.
+const long ACE_ES_EVENT_SOURCE_ANY = 0;
+
// The max number of priorities provided by the target platform.
// TODO: This should be defined in ACE (somehow) and only mapped here
// to some variables (and even that is doubtful).
diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.i b/TAO/orbsvcs/orbsvcs/Event_Utilities.i
index 53910d1ac1c..34fb9916eaa 100644
--- a/TAO/orbsvcs/orbsvcs/Event_Utilities.i
+++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.i
@@ -22,7 +22,7 @@ ACE_ConsumerQOS_Factory::insert_type (RtecEventComm::EventType type,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.header.source = 0;
+ dependency.event.header.source = ACE_ES_EVENT_SOURCE_ANY;
dependency.event.header.type = type;
//dependency.event.header.creation_time = 0;
//dependency.event.header.ec_recv_time = 0;
@@ -51,7 +51,7 @@ ACE_ConsumerQOS_Factory::insert_time (RtecEventComm::EventType type,
RtecScheduler::handle_t rt_info)
{
RtecEventChannelAdmin::Dependency dependency;
- dependency.event.header.source = 0;
+ dependency.event.header.source = ACE_ES_EVENT_SOURCE_ANY;
dependency.event.header.type = type;
dependency.event.header.creation_time = interval;
//dependency.event.header.ec_recv_time = 0;
@@ -73,7 +73,7 @@ ACE_ConsumerQOS_Factory::insert_act (RtecEventComm::EventData act)
return this->insert (dependency);
}
-ACE_INLINE const RtecEventChannelAdmin::ConsumerQOS&
+ACE_INLINE const RtecEventChannelAdmin::ConsumerQOS&
ACE_ConsumerQOS_Factory::get_ConsumerQOS (void)
{
return qos_;
@@ -87,7 +87,7 @@ ACE_ConsumerQOS_Factory::operator const RtecEventChannelAdmin::ConsumerQOS& (voi
// ************************************************************
-ACE_INLINE const RtecEventChannelAdmin::SupplierQOS&
+ACE_INLINE const RtecEventChannelAdmin::SupplierQOS&
ACE_SupplierQOS_Factory::get_SupplierQOS (void)
{
return qos_;
diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile
index 4b1b4f36922..cdb29e463b9 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile
+++ b/TAO/orbsvcs/orbsvcs/Makefile
@@ -90,7 +90,7 @@ ifneq (,$(findstring Time,$(TAO_ORBSVCS)))
#### TAO's Time Service requires its ImplRepo Service.
ifeq (,$(findstring ImplRepo,$(TAO_ORBSVCS)))
#### Add ImplRepo
- TAO_ORBSVCS += "ImplRepo "
+ TAO_ORBSVCS += ImplRepo
endif # ! ImplRepo
endif # Time
diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
index 218c5d1debc..e84518490c7 100644
--- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
+++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
@@ -13,6 +13,8 @@
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Time_Utilities.h"
#include "orbsvcs/RtecEventChannelAdminC.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Basic_Factory.h"
#include "Event_Latency.h"
#include "tao/Timeprobe.h"
@@ -623,8 +625,8 @@ Latency_Supplier::push (const RtecEventComm::EventSet &events,
}
else
{
- ACE_ERROR ((LM_ERROR, "(%t) %s received unexpected events: ",
- entry_point ()));
+ ACE_ERROR ((LM_ERROR, "(%t) %s received unexpected events: %d\n",
+ entry_point (), events[i].header.type));
// ::dump_sequence (events);
return;
}
@@ -887,6 +889,7 @@ main (int argc, char *argv [])
// the cost of doing it later.
ACE_TIMEPROBE_RESET;
+#if 1
CosNaming::Name channel_name (1);
channel_name.length (1);
channel_name[0].id = CORBA::string_dup ("EventService");
@@ -899,6 +902,17 @@ main (int argc, char *argv [])
RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in (),
TAO_TRY_ENV);
TAO_CHECK_ENV;
+#else
+ TAO_EC_Basic_Factory ec_factory (root_poa.in ());
+
+ TAO_EC_Event_Channel ec_impl (&ec_factory);
+ ec_impl.activate (TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+
+ RtecEventChannelAdmin::EventChannel_var ec =
+ ec_impl._this (TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+#endif /* 0 */
// Create supplier(s).
Latency_Supplier **supplier;