summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TAO/ChangeLog-99c67
-rw-r--r--TAO/orbsvcs/Event_Service/Event_Service.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp11
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h10
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp53
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h87
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i26
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h188
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i31
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp11
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h8
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h4
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i12
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Factory.h7
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Filter.h21
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Filter.i12
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp41
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h10
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i13
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp109
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h86
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.i1
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.cpp66
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.h259
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.i43
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp (renamed from TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp)112
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.h135
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.i19
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp10
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp9
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile2
-rw-r--r--TAO/tao/Any.h16
-rw-r--r--TAO/tao/MProfile.i38
-rw-r--r--TAO/tao/orbconf.h6
41 files changed, 1107 insertions, 498 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c
index 09792d16d63..f3af17ab83f 100644
--- a/TAO/ChangeLog-99c
+++ b/TAO/ChangeLog-99c
@@ -1,3 +1,70 @@
+Wed Mar 3 18:28:51 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/orbsvcs/Makefile:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.i:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.i:
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp:
+ Factored out the collection of ProxyPushSuppliers from the
+ ConsumerAdmin class: it is used in other places like the
+ SupplierFiltering strategies.
+ I also added extensive documentation about the several
+ variations on this particular strategy.
+
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.i:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp:
+ Added a new SupplierFiltering strategy, simply keep the list of
+ consumers for each supplier.
+
+ * orbsvcs/orbsvcs/Event/EC_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Filter.i:
+ * orbsvcs/orbsvcs/Event/EC_Filter.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Type_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp:
+ Changed the mechanism to match suppliers and consumers: the
+ filter objects have a new method that can be used to detect if a
+ event is potentially interesting to the consumer.
+ The red-black tree is not needed anymore.
+
+ * orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h:
+ * orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i:
+ * orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp:
+ Moved the instantiation of the EventHeader red-black tree to
+ this file, because now this is the only place we use it.
+
+ * orbsvcs/orbsvcs/Event/EC_Event_Channel.h:
+ * orbsvcs/orbsvcs/Event/EC_Event_Channel.i:
+ * orbsvcs/orbsvcs/Event/EC_Factory.h:
+ * orbsvcs/orbsvcs/Event/EC_Basic_Factory.h:
+ * orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Null_Factory.h:
+ * orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp:
+ Added new factory methods to create and detroy
+ EC_ProxyPushSupplier_Set objects.
+
+ * tao/orbconf.h:
+ Added description of the TAO_DOTTED_DECIMAL_ADDRESSES macro.
+
+ * tao/MProfile.i:
+ * tao/Any.h:
+ * orbsvcs/Event_Service/Event_Service.cpp:
+ Minor cosmetic fixes.
+
Wed Mar 3 03:05:06 1999 Jeff Parsons <parsons@cs.wustl.edu>
* TAO/performance-tests/Cubit/TAO/DII_Cubit/client.cpp:
diff --git a/TAO/orbsvcs/Event_Service/Event_Service.cpp b/TAO/orbsvcs/Event_Service/Event_Service.cpp
index 3e6614c5d7f..e7b7d420feb 100644
--- a/TAO/orbsvcs/Event_Service/Event_Service.cpp
+++ b/TAO/orbsvcs/Event_Service/Event_Service.cpp
@@ -65,7 +65,7 @@ parse_args (int argc, char *argv [])
case 'r':
reactive = 1;
break;
-
+
case '?':
default:
ACE_DEBUG ((LM_DEBUG,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
index 999a00f21ba..872bf6c6b75 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
@@ -3,12 +3,13 @@
#include "EC_Basic_Factory.h"
#include "EC_Dispatching.h"
#include "EC_Basic_Filter_Builder.h"
-#include "EC_ConsumerAdmin_T.h"
+#include "EC_ConsumerAdmin.h"
#include "EC_SupplierAdmin.h"
#include "EC_ProxyConsumer.h"
#include "EC_ProxySupplier.h"
#include "EC_SupplierFiltering.h"
#include "EC_ObserverStrategy.h"
+#include "EC_ProxyPushSupplier_Set_T.h"
#include "Timer_Module.h"
#if ! defined (__ACE_INLINE__)
@@ -50,7 +51,7 @@ TAO_EC_Basic_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x)
TAO_EC_ConsumerAdmin*
TAO_EC_Basic_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec)
{
- return new TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH> (ec);
+ return new TAO_EC_ConsumerAdmin (ec);
}
void
@@ -127,6 +128,18 @@ TAO_EC_Basic_Factory::destroy_observer_strategy (TAO_EC_ObserverStrategy *x)
delete x;
}
+TAO_EC_ProxyPushSupplier_Set*
+TAO_EC_Basic_Factory::create_proxy_push_supplier_set (TAO_EC_Event_Channel *)
+{
+ return new TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH> ();
+}
+
+void
+TAO_EC_Basic_Factory::destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set *x)
+{
+ delete x;
+}
+
PortableServer::POA_ptr
TAO_EC_Basic_Factory::consumer_poa (CORBA::Environment&)
{
@@ -189,16 +202,14 @@ TAO_EC_Basic_Factory::destroy_supplier_admin_lock (ACE_Lock* x)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH>;
-template class TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH>;
+template class TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH>;
template class ACE_Node<ACE_Command_Base*>;
template class ACE_Unbounded_Queue<ACE_Command_Base*>;
template class ACE_Unbounded_Queue_Iterator<ACE_Command_Base*>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH>
-#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH>
+#pragma instantiate TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH>
#pragma instantiate ACE_Node<ACE_Command_Base*>
#pragma instantiate ACE_Unbounded_Queue<ACE_Command_Base*>
#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_Command_Base*>
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h
index 3abda1f55d3..d7c37923c03 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h
@@ -94,6 +94,11 @@ public:
create_observer_strategy (TAO_EC_Event_Channel*);
virtual void
destroy_observer_strategy (TAO_EC_ObserverStrategy*);
+ virtual TAO_EC_ProxyPushSupplier_Set*
+ create_proxy_push_supplier_set (TAO_EC_Event_Channel*);
+ virtual void
+ destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set*);
+
virtual PortableServer::POA_ptr
consumer_poa (CORBA::Environment& env);
virtual PortableServer::POA_ptr
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp
index fa96875053d..8bef3f88a71 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp
@@ -3,7 +3,7 @@
#include "EC_Conjunction_Filter.h"
#if ! defined (__ACE_INLINE__)
-#include "EC_Filter.i"
+#include "EC_Conjunction_Filter.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Event, EC_Conjunction_Filter, "$Id$")
@@ -166,14 +166,17 @@ TAO_EC_Conjunction_Filter::max_event_size (void) const
return n;
}
-void
-TAO_EC_Conjunction_Filter::event_ids (TAO_EC_Filter::Headers& headers)
+int
+TAO_EC_Conjunction_Filter::can_match (
+ const RtecEventComm::EventHeader& header) const
{
ChildrenIterator end = this->end ();
for (ChildrenIterator i = this->begin ();
i != end;
++i)
{
- (*i)->event_ids (headers);
+ if ((*i)->can_match (header) != 0)
+ return 1;
}
+ return 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h
index 6c3e3c2f26a..3e4e4b45293 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h
@@ -52,7 +52,7 @@ public:
size_t n);
// Constructor. It assumes ownership of both the array and the
// children.
-
+
virtual ~TAO_EC_Conjunction_Filter (void);
// Destructor
@@ -72,7 +72,7 @@ public:
CORBA::Environment& env);
virtual void clear (void);
virtual CORBA::ULong max_event_size (void) const;
- virtual void event_ids (TAO_EC_Filter::Headers& headers);
+ virtual int can_match (const RtecEventComm::EventHeader& header) const;
typedef TAO_EC_Filter* value_type;
typedef TAO_EC_Filter* const const_value_type;
@@ -83,14 +83,14 @@ public:
typedef unsigned int Word;
-private:
+private:
int all_received (void) const;
// Determine if all the children have received their events.
-
+
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Conjunction_Filter
(const TAO_EC_Conjunction_Filter&))
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Conjunction_Filter& operator=
- (const TAO_EC_Conjunction_Filter&))
+ (const TAO_EC_Conjunction_Filter&))
private:
TAO_EC_Filter** children_;
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
index 7bacdf93ff5..da73b3c012f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
@@ -12,15 +12,22 @@
ACE_RCSID(Event, EC_ConsumerAdmin, "$Id$")
-TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec)
- : busy_lock_ (this),
- event_channel_ (ec),
- busy_hwm_ (1)
+TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec,
+ TAO_EC_ProxyPushSupplier_Set* ss)
+ : event_channel_ (ec),
+ supplier_set_ (ss)
{
+ if (this->supplier_set_ == 0)
+ {
+ this->supplier_set_ =
+ this->event_channel_->create_proxy_push_supplier_set ();
+ }
}
TAO_EC_ConsumerAdmin::~TAO_EC_ConsumerAdmin (void)
{
+ this->event_channel_->destroy_proxy_push_supplier_set (this->supplier_set_);
+ this->supplier_set_ = 0;
}
void
@@ -35,7 +42,7 @@ TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushConsumer *consumer,
CORBA::Environment &ACE_TRY_ENV)
{
ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
- ace_mon, this->busy_lock_,
+ ace_mon, this->busy_lock (),
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
SupplierSetIterator end = this->end ();
@@ -53,7 +60,7 @@ TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushConsumer *consumer,
CORBA::Environment &ACE_TRY_ENV)
{
ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
- ace_mon, this->busy_lock_,
+ ace_mon, this->busy_lock (),
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
SupplierSetIterator end = this->end ();
@@ -67,20 +74,17 @@ TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushConsumer *consumer,
}
void
-TAO_EC_ConsumerAdmin::connected_i (TAO_EC_ProxyPushSupplier *supplier,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
- if (this->all_suppliers_.insert (supplier) != 0)
- ACE_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
+ this->supplier_set_->connected (supplier, ACE_TRY_ENV);
}
void
-TAO_EC_ConsumerAdmin::disconnected_i (TAO_EC_ProxyPushSupplier *supplier,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
- if (this->all_suppliers_.remove (supplier) != 0)
- ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR ());
- supplier->_decr_refcnt ();
+ this->supplier_set_->disconnected (supplier, ACE_TRY_ENV);
}
RtecEventChannelAdmin::ProxyPushSupplier_ptr
@@ -102,27 +106,8 @@ TAO_EC_ConsumerAdmin::_default_POA (CORBA::Environment&)
return PortableServer::POA::_duplicate (this->default_POA_.in ());
}
-void
-TAO_EC_ConsumerAdmin::execute_delayed_operations (void)
-{
-}
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Node<TAO_EC_ProxyPushSupplier*>;
-template class ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>;
-template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>;
-template class TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin>;
-template class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
-template class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
-
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Node<TAO_EC_ProxyPushSupplier*>
-#pragma instantiate ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>
-#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>
-#pragma instantiate TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin>
-#pragma instantiate TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>
-#pragma instantiate TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>
-
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
index 14ddaa8a074..4b04d79581c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
@@ -23,7 +23,6 @@
// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
//
-//
// ============================================================================
#ifndef TAO_EC_CONSUMERADMIN_H
@@ -36,14 +35,11 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "orbsvcs/RtecEventChannelAdminS.h"
-#include "orbsvcs/Event/EC_Busy_Lock.h"
-#include "orbsvcs/Event/EC_Filter.h"
+#include "EC_ProxyPushSupplier_Set.h"
class TAO_EC_Event_Channel;
class TAO_EC_ProxyPushSupplier;
class TAO_EC_ProxyPushConsumer;
-template<class Target,class Object> class TAO_EC_Connected_Command;
-template<class Target,class Object> class TAO_EC_Disconnected_Command;
class TAO_EC_ConsumerAdmin : public POA_RtecEventChannelAdmin::ConsumerAdmin
{
@@ -55,7 +51,9 @@ class TAO_EC_ConsumerAdmin : public POA_RtecEventChannelAdmin::ConsumerAdmin
// ProxyPushSupplier objects.
//
// = MEMORY MANAGMENT
- // It does not assume ownership of the TAO_EC_Event_Channel object
+ // It does not assume ownership of the TAO_EC_Event_Channel
+ // object; but it *does* assume ownership of the
+ // TAO_EC_ProxyPushSupplier_Set object.
//
// = LOCKING
// No provisions for locking, access must be serialized
@@ -64,30 +62,31 @@ class TAO_EC_ConsumerAdmin : public POA_RtecEventChannelAdmin::ConsumerAdmin
// = TODO
//
public:
- TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel* event_channel);
- // constructor...
+ TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel* event_channel,
+ TAO_EC_ProxyPushSupplier_Set* supplier_set = 0);
+ // constructor. If <supplier_set> is nil then it builds one using
+ // the <event_channel> argument.
+ // In any case it assumes ownership.
virtual ~TAO_EC_ConsumerAdmin (void);
// destructor...
- typedef ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*> SupplierSet;
- typedef ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*> SupplierSetIterator;
-
- virtual int busy (void) = 0;
- virtual int idle (void) = 0;
- // Before using the iterators the clients should invoke this
- // methods, that ensures that no changes to the underlying data
- // structure will occur.
-
- void busy_hwm (CORBA::ULong hwm);
- CORBA::ULong busy_hwm (void) const;
- // This attribute is used to control the maximum number of threads
- // that can be running on the
+ typedef TAO_EC_ProxyPushSupplier_Set::SupplierSet SupplierSet;
+ typedef TAO_EC_ProxyPushSupplier_Set::SupplierSetIterator SupplierSetIterator;
SupplierSetIterator begin (void);
SupplierSetIterator end (void);
// Iterators over the set of ProxyPushSuppliers
+ typedef TAO_EC_ProxyPushSupplier_Set::Busy_Lock Busy_Lock;
+ Busy_Lock& busy_lock (void);
+
+ void busy_hwm (CORBA::ULong hwm);
+ CORBA::ULong busy_hwm (void) const;
+ void max_write_delay (CORBA::ULong hwm);
+ CORBA::ULong max_write_delay (void) const;
+ // Delegate on the EC_ProxyPushSupplier....
+
void set_default_POA (PortableServer::POA_ptr poa);
// Set this servant's default POA
@@ -102,9 +101,9 @@ public:
// disconnected from it.
virtual void connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&) = 0;
+ CORBA::Environment&);
virtual void disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&) = 0;
+ CORBA::Environment&);
// Used to inform the EC that a Supplier has connected or
// disconnected from it.
@@ -112,51 +111,15 @@ public:
virtual RtecEventChannelAdmin::ProxyPushSupplier_ptr
obtain_push_supplier (CORBA::Environment &);
- typedef TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> Busy_Lock;
- Busy_Lock& busy_lock (void);
- // This object is an adapter to the busy/idle protocol.
-
-protected:
- virtual void connected_i (TAO_EC_ProxyPushSupplier* supplier,
- CORBA::Environment &env);
- // The implementation of connected(), without locking.
- // It does not increase the reference count on the supplier
-
- virtual void disconnected_i (TAO_EC_ProxyPushSupplier* supplier,
- CORBA::Environment &env);
- // The implementation of disconnected(), without locking.
- // It decreases the reference count on the supplier if the operation
- // is successful.
-
- typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Connected_Command;
- typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Disconnected_Command;
-
- friend class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
- friend class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
- // This two classes call the connected_i() and disconnected_i()
- // methods, that's ok because they do while this class is holding
- // its lock.
-
- virtual void execute_delayed_operations (void);
- // Dervied classes that implement delayed disconnects and connects
- // must override this method.
-
- SupplierSet all_suppliers_;
- // The set of all the ProxyPushSupplier objects bound to this
- // ConsumerAdmin.
-
- TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> busy_lock_;
- // The busy lock object
-
private:
TAO_EC_Event_Channel *event_channel_;
// The Event Channel we belong to
+ TAO_EC_ProxyPushSupplier_Set* supplier_set_;
+ // The implementation for the supplier set container.
+
PortableServer::POA_var default_POA_;
// Store the default POA.
-
- CORBA::ULong busy_hwm_;
- // How many threads can simultaneously iterate over the set.
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
index da4752320ec..9d316a56539 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
@@ -3,29 +3,41 @@
ACE_INLINE TAO_EC_ConsumerAdmin::SupplierSetIterator
TAO_EC_ConsumerAdmin::begin (void)
{
- return this->all_suppliers_.begin ();
+ return this->supplier_set_->begin ();
}
ACE_INLINE TAO_EC_ConsumerAdmin::SupplierSetIterator
TAO_EC_ConsumerAdmin::end (void)
{
- return this->all_suppliers_.end ();
+ return this->supplier_set_->end ();
+}
+
+ACE_INLINE TAO_EC_ConsumerAdmin::Busy_Lock&
+TAO_EC_ConsumerAdmin::busy_lock (void)
+{
+ return this->supplier_set_->busy_lock ();
}
ACE_INLINE void
TAO_EC_ConsumerAdmin::busy_hwm (CORBA::ULong hwm)
{
- this->busy_hwm_ = hwm;
+ this->supplier_set_->busy_hwm (hwm);
}
ACE_INLINE CORBA::ULong
TAO_EC_ConsumerAdmin::busy_hwm (void) const
{
- return this->busy_hwm_;
+ return this->supplier_set_->busy_hwm ();
}
-ACE_INLINE TAO_EC_ConsumerAdmin::Busy_Lock&
-TAO_EC_ConsumerAdmin::busy_lock (void)
+ACE_INLINE void
+TAO_EC_ConsumerAdmin::max_write_delay (CORBA::ULong hwm)
+{
+ this->supplier_set_->max_write_delay (hwm);
+}
+
+ACE_INLINE CORBA::ULong
+TAO_EC_ConsumerAdmin::max_write_delay (void) const
{
- return this->busy_lock_;
+ return this->supplier_set_->max_write_delay ();
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h
deleted file mode 100644
index a3312e2efc6..00000000000
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h
+++ /dev/null
@@ -1,188 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-//
-// ============================================================================
-//
-// = LIBRARY
-// ORBSVCS Real-time Event Channel
-//
-// = FILENAME
-// EC_ConsumerAdmin_T
-//
-// = AUTHOR
-// Carlos O'Ryan (coryan@cs.wustl.edu)
-//
-// = DESCRIPTION
-// Implement concrete versions of the EC_ConsumerAdmin class. This
-// concrete versions provide specific locking policies and
-// strategies to handle delayed vs. immediate connections and
-// disconnections.
-//
-// = CREDITS
-// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
-// and other members of the DOC group.
-// More details can be found in:
-// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
-// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
-//
-//
-// ============================================================================
-
-#ifndef TAO_EC_CONSUMERADMIN_T_H
-#define TAO_EC_CONSUMERADMIN_T_H
-
-#include "EC_ConsumerAdmin.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-template<ACE_SYNCH_DECL>
-class TAO_EC_ConsumerAdmin_T : public TAO_EC_ConsumerAdmin
-{
- // = TITLE
- // ConsumerAdmin_T
- //
- // = DESCRIPTION
- // Implements the locking policies for the TAO_EC_ConsumerAdmin
- // class, we use a parametric class to handle the co-variations
- // between mutexes and condition variables.
- //
- // = MEMORY MANAGMENT
- //
- // = LOCKING
- // The kind of locking is specified as the template argument.
- // Clients still should follow the locking protocol: call busy()
- // to use the iterator and idle() once finished, the
- // Busy_Lock_Adapter can help there.
- //
- // = TODO
- //
-public:
- TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel* event_channel);
- // constructor...
-
- // = The TAO_EC_ConsumerAdmin methods
- virtual int busy (void);
- virtual int idle (void);
-
-protected:
- virtual int busy_i (void);
- // Implements the busy() method, but without locking.
-
- virtual int idle_i (void);
- // Implements the busy() method, but without locking.
-
-protected:
- int busy_count (void) const;
- // Return the current value of busy_count [derived classes are not
- // allowed to modify this]
-
- ACE_SYNCH_MUTEX_T lock_;
- // The lock
-
- ACE_SYNCH_CONDITION_T busy_cond_;
- // A condition variable to wait while the object is too busy.
-
-private:
- int busy_count_;
- // The number of threads iterating on us
-
- int reached_hwm_;
- // The set was too busy and reached its HWM, now everybody has to
- // wait until we reach the LWM (0)
-};
-
-// ****************************************************************
-
-template<ACE_SYNCH_DECL>
-class TAO_EC_ConsumerAdmin_Immediate : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>
-{
- // = TITLE
- // ConsumerAdmin_Immediate
- //
- // = DESCRIPTION
- // A concrete version of the EC_ConsumerAdmin class; using the
- // locking strategy in EC_ConsumerAdmin_T and immediate execution
- // for the connected() and disconnected() operations.
- //
- // = MEMORY MANAGMENT
- //
- // = LOCKING
- // The kind of locking is specified as the template argument.
- // Clients still should follow the locking protocol: call busy()
- // to use the iterator and idle() once finished, the
- // Busy_Lock_Adapter can help there.
- //
- // = TODO
- //
-public:
- TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel* event_channel);
- // constructor...
-
- // = The TAO_EC_ConsumerAdmin methods
- virtual void connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
- virtual void disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
-};
-
-// ****************************************************************
-
-template<ACE_SYNCH_DECL>
-class TAO_EC_ConsumerAdmin_Delayed : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>
-{
- // = TITLE
- // ConsumerAdmin_Delayed
- //
- // = DESCRIPTION
- // A concrete version of the EC_ConsumerAdmin class; using the
- // locking strategy in EC_ConsumerAdmin_T and storing the
- // execution of connected() and disconnected() operations as
- // command objects, that are executed once the set is idle.
- //
- // = MEMORY MANAGMENT
- //
- // = LOCKING
- // The kind of locking is specified as the template argument.
- // Clients still should follow the locking protocol: call busy()
- // to use the iterator and idle() once finished, the
- // Busy_Lock_Adapter can help there.
- //
- // = TODO
- //
-public:
- TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel* event_channel);
- // constructor...
-
- // = The TAO_EC_ConsumerAdmin methods
- virtual void connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
- virtual void disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
-
-protected:
- virtual void execute_delayed_operations (void);
- // documented in TAO_EC_ConsumerAdmin
-
-private:
- ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_;
- // The commands that carry the delayed operations are enqueued
- // here.
-};
-
-// ****************************************************************
-
-#if defined (__ACE_INLINE__)
-#include "EC_ConsumerAdmin_T.i"
-#endif /* __ACE_INLINE__ */
-
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "EC_ConsumerAdmin_T.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-
-#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
-#pragma implementation ("EC_ConsumerAdmin_T.cpp")
-#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
-
-#endif /* TAO_EC_CONSUMERADMIN_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i
deleted file mode 100644
index 4e78d7ad7e5..00000000000
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i
+++ /dev/null
@@ -1,31 +0,0 @@
-// $Id$
-
-template<ACE_SYNCH_DECL> ACE_INLINE
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::
- TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel *event_channel)
- : TAO_EC_ConsumerAdmin (event_channel),
- busy_cond_ (lock_),
- busy_count_ (0),
- reached_hwm_ (0)
-{
-}
-
-template<ACE_SYNCH_DECL> ACE_INLINE int
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_count (void) const
-{
- return this->busy_count_;
-}
-
-template<ACE_SYNCH_DECL> ACE_INLINE
-TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::
- TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel *event_channel)
- : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel)
-{
-}
-
-template<ACE_SYNCH_DECL> ACE_INLINE
-TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::
- TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel *event_channel)
- : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel)
-{
-}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp
index 9531f527e43..6e4f2ad1703 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp
@@ -3,7 +3,7 @@
#include "EC_Disjunction_Filter.h"
#if ! defined (__ACE_INLINE__)
-#include "EC_Filter.i"
+#include "EC_Disjunction_Filter.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Event, EC_Disjunction_Filter, "$Id$")
@@ -120,14 +120,17 @@ TAO_EC_Disjunction_Filter::max_event_size (void) const
return n;
}
-void
-TAO_EC_Disjunction_Filter::event_ids(TAO_EC_Filter::Headers& headers)
+int
+TAO_EC_Disjunction_Filter::can_match (
+ const RtecEventComm::EventHeader& header) const
{
ChildrenIterator end = this->end ();
for (ChildrenIterator i = this->begin ();
i != end;
++i)
{
- (*i)->event_ids (headers);
+ if ((*i)->can_match (header) != 0)
+ return 1;
}
+ return 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h
index adf973f29e3..a9777c5d896 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h
@@ -52,7 +52,7 @@ public:
size_t n);
// Constructor. It assumes ownership of both the array and the
// children.
-
+
virtual ~TAO_EC_Disjunction_Filter (void);
// Destructor
@@ -72,7 +72,7 @@ public:
CORBA::Environment& env);
virtual void clear (void);
virtual CORBA::ULong max_event_size (void) const;
- virtual void event_ids (TAO_EC_Filter::Headers& headers);
+ virtual int can_match (const RtecEventComm::EventHeader& header) const;
typedef TAO_EC_Filter* value_type;
typedef TAO_EC_Filter* const const_value_type;
@@ -81,11 +81,11 @@ public:
ChildrenIterator end (void) const;
// STL-like iterators...
-private:
+private:
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Disjunction_Filter
(const TAO_EC_Disjunction_Filter&))
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Disjunction_Filter& operator=
- (const TAO_EC_Disjunction_Filter&))
+ (const TAO_EC_Disjunction_Filter&))
private:
TAO_EC_Filter** children_;
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
index 1822319c95c..37444d20528 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
@@ -89,6 +89,10 @@ public:
void destroy_proxy_push_consumer (TAO_EC_ProxyPushConsumer*);
// Create and destroy a ProxyPushConsumer
+ TAO_EC_ProxyPushSupplier_Set* create_proxy_push_supplier_set (void);
+ void destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set*);
+ // Create and destroy a ProxyPushSupplier_Set
+
PortableServer::POA_ptr supplier_poa (CORBA::Environment&);
PortableServer::POA_ptr consumer_poa (CORBA::Environment&);
// Access the supplier and consumer POAs from the factory.
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
index 5bcb4a7d3ce..2192a5382bd 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
@@ -48,6 +48,18 @@ TAO_EC_Event_Channel::create_proxy_push_consumer (void)
return this->factory_->create_proxy_push_consumer (this);
}
+ACE_INLINE TAO_EC_ProxyPushSupplier_Set*
+TAO_EC_Event_Channel::create_proxy_push_supplier_set (void)
+{
+ return this->factory_->create_proxy_push_supplier_set (this);
+}
+
+ACE_INLINE void
+TAO_EC_Event_Channel::destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set* x)
+{
+ this->factory_->destroy_proxy_push_supplier_set (x);
+}
+
ACE_INLINE void
TAO_EC_Event_Channel::destroy_proxy_push_consumer (TAO_EC_ProxyPushConsumer* consumer)
{
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h
index c150763d669..b8d77bc6fcd 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h
@@ -46,6 +46,7 @@ class TAO_EC_ProxyPushConsumer;
class TAO_EC_ProxyPushSupplier;
class TAO_EC_Timer_Module;
class TAO_EC_ObserverStrategy;
+class TAO_EC_ProxyPushSupplier_Set;
class TAO_EC_Factory
{
@@ -113,6 +114,12 @@ public:
destroy_observer_strategy (TAO_EC_ObserverStrategy*) = 0;
// Create and destroy the observer strategy.
+ virtual TAO_EC_ProxyPushSupplier_Set*
+ create_proxy_push_supplier_set (TAO_EC_Event_Channel*) = 0;
+ virtual void
+ destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set*) = 0;
+ // Create and destroy a ProxyPushSupplier_Set
+
virtual PortableServer::POA_ptr
consumer_poa (CORBA::Environment& env) = 0;
virtual PortableServer::POA_ptr
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp
index 3eb19082e32..f27a7d6e9bf 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp
@@ -69,28 +69,20 @@ TAO_EC_Null_Filter::clear (void)
CORBA::ULong
TAO_EC_Null_Filter::max_event_size (void) const
{
+ // @@ Is there a better way to express this?
return 0;
}
-void
-TAO_EC_Null_Filter::event_ids(TAO_EC_Filter::Headers& headers)
+int
+TAO_EC_Null_Filter::can_match (const RtecEventComm::EventHeader&) const
{
- // @@ TODO maybe we should add the AnyType/AnySource header?
- // right now we do nothing...
+ return 1;
}
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_RB_Tree<RtecEventComm::EventHeader,int,TAO_EC_Filter::Header_Compare,ACE_Null_Mutex>;
-template class ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,TAO_EC_Filter::Header_Compare,ACE_Null_Mutex>;
-template class ACE_RB_Tree_Node<RtecEventComm::EventHeader,int>;
-
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_RB_Tree<RtecEventComm::EventHeader,int,TAO_EC_Filter::Header_Compare,ACE_Null_Mutex>
-#pragma instantiate ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,TAO_EC_Filter::Header_Compare,ACE_Null_Mutex>
-#pragma instantiate ACE_RB_Tree_Node<RtecEventComm::EventHeader,int>
-
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h
index 388f563de2f..9188e5f94e5 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h
@@ -38,14 +38,12 @@
#ifndef TAO_EC_FILTER_H
#define TAO_EC_FILTER_H
-#include "ace/RB_Tree.h"
+#include "orbsvcs/RtecEventCommC.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "orbsvcs/RtecEventCommC.h"
-
class TAO_EC_QOS_Info;
class TAO_EC_Filter
@@ -106,17 +104,10 @@ public:
virtual CORBA::ULong max_event_size (void) const = 0;
// Returns the maximum size of the events pushed by this filter.
- struct Header_Compare {
- int operator () (const RtecEventComm::EventHeader& lhs,
- const RtecEventComm::EventHeader& rhs) const;
- };
-
- typedef ACE_RB_Tree<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> Headers;
- typedef ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> HeadersIterator;
-
- virtual void event_ids (Headers& headers) = 0;
- // Compute the disjunction of all the event types that could be of
- // interest for this filter (and its children).
+ virtual int can_match (const RtecEventComm::EventHeader& header) const = 0;
+ // Returns 0 if an event with that header could never be accepted.
+ // This can used by the suppliers to filter out consumers that
+ // couldn't possibly be interested in their events.
private:
TAO_EC_Filter* parent_;
@@ -161,7 +152,7 @@ public:
CORBA::Environment& env);
virtual void clear (void);
virtual CORBA::ULong max_event_size (void) const;
- virtual void event_ids (TAO_EC_Filter::Headers& headers);
+ virtual int can_match (const RtecEventComm::EventHeader& header) const;
};
// ****************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.i b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.i
index 7f937618246..4b95832a627 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.i
@@ -31,18 +31,6 @@ TAO_EC_Filter::matches (const RtecEventComm::EventHeader& rhs,
// ****************************************************************
-ACE_INLINE int
-TAO_EC_Filter::Header_Compare::
- operator () (const RtecEventComm::EventHeader& lhs,
- const RtecEventComm::EventHeader& rhs) const
-{
- if (lhs.source == rhs.source)
- return lhs.type < rhs.type;
- return lhs.source < rhs.source;
-}
-
-// ****************************************************************
-
ACE_INLINE
TAO_EC_Null_Filter::TAO_EC_Null_Filter (void)
{
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
index 3f3ba58be52..38deffa196f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
@@ -3,12 +3,13 @@
#include "EC_Null_Factory.h"
#include "EC_Dispatching.h"
#include "EC_Filter_Builder.h"
-#include "EC_ConsumerAdmin_T.h"
+#include "EC_ConsumerAdmin.h"
#include "EC_SupplierAdmin.h"
#include "EC_ProxyConsumer.h"
#include "EC_ProxySupplier.h"
#include "EC_SupplierFiltering.h"
#include "EC_ObserverStrategy.h"
+#include "EC_ProxyPushSupplier_Set_T.h"
#include "Timer_Module.h"
#if ! defined (__ACE_INLINE__)
@@ -50,7 +51,7 @@ TAO_EC_Null_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x)
TAO_EC_ConsumerAdmin*
TAO_EC_Null_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec)
{
- return new TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH> (ec);
+ return new TAO_EC_ConsumerAdmin (ec);
}
void
@@ -125,6 +126,18 @@ TAO_EC_Null_Factory::destroy_observer_strategy (TAO_EC_ObserverStrategy *x)
delete x;
}
+TAO_EC_ProxyPushSupplier_Set*
+TAO_EC_Null_Factory::create_proxy_push_supplier_set (TAO_EC_Event_Channel *)
+{
+ return new TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_Null_Mutex> ();
+}
+
+void
+TAO_EC_Null_Factory::destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set *x)
+{
+ delete x;
+}
+
PortableServer::POA_ptr
TAO_EC_Null_Factory::consumer_poa (CORBA::Environment&)
{
@@ -187,12 +200,10 @@ TAO_EC_Null_Factory::destroy_supplier_admin_lock (ACE_Lock* x)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH>;
-template class TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH>;
+template class TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_Null_Mutex>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH>
-#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH>
+#pragma instantiate TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_Null_Mutex>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h
index 0fe5ec994f8..68b2ee663a4 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h
@@ -93,6 +93,11 @@ public:
create_observer_strategy (TAO_EC_Event_Channel*);
virtual void
destroy_observer_strategy (TAO_EC_ObserverStrategy*);
+ virtual TAO_EC_ProxyPushSupplier_Set*
+ create_proxy_push_supplier_set (TAO_EC_Event_Channel*) = 0;
+ virtual void
+ destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set*) = 0;
+
virtual PortableServer::POA_ptr
consumer_poa (CORBA::Environment& env);
virtual PortableServer::POA_ptr
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
index 5aadeff799c..3f13a6ab2a8 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
@@ -1,7 +1,5 @@
// $Id$
-#include "ace/Synch.h"
-
#include "EC_ObserverStrategy.h"
#include "EC_Event_Channel.h"
#include "EC_ProxySupplier.h"
@@ -9,6 +7,7 @@
#include "EC_ConsumerAdmin.h"
#include "EC_SupplierAdmin.h"
#include "orbsvcs/Event_Service_Constants.h"
+#include "ace/Synch.h"
#if ! defined (__ACE_INLINE__)
#include "EC_ObserverStrategy.i"
@@ -156,12 +155,15 @@ TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*,
void
TAO_EC_Basic_ObserverStrategy::fill_qos (
RtecEventChannelAdmin::ConsumerQOS &qos,
- CORBA::Environment &)
+ CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_Filter::Headers headers;
+ Headers headers;
{
- // @@ TODO locking in the consumer admin?
+ ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
+ ace_mon, this->event_channel_->consumer_admin ()->busy_lock (),
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
TAO_EC_ConsumerAdmin::SupplierSetIterator end =
this->event_channel_->consumer_admin ()->end ();
for (TAO_EC_ConsumerAdmin::SupplierSetIterator i =
@@ -170,13 +172,25 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
++i)
{
TAO_EC_ProxyPushSupplier* supplier = *i;
- if (supplier->subscriptions ().is_gateway)
+
+ const RtecEventChannelAdmin::ConsumerQOS& sub =
+ supplier->subscriptions ();
+ if (sub.is_gateway)
continue;
- supplier->event_ids (headers);
+ for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
+ {
+ const RtecEventComm::Event& event =
+ sub.dependencies[j].event;
+ RtecEventComm::EventType type = event.header.type;
+
+ if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED)
+ continue;
+ headers.insert (event.header, 1);
+ }
}
}
CORBA::ULong count = 1;
- TAO_EC_Filter::HeadersIterator i (headers);
+ HeadersIterator i (headers);
for (i.first (); !i.is_done (); i.next ())
{
count++;
@@ -202,7 +216,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
RtecEventChannelAdmin::SupplierQOS &qos,
CORBA::Environment &)
{
- TAO_EC_Filter::Headers headers;
+ Headers headers;
{
// @@ TODO locking in the consumer admin?
@@ -231,7 +245,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
}
}
CORBA::ULong count = 0;
- TAO_EC_Filter::HeadersIterator i (headers);
+ HeadersIterator i (headers);
for (i.first (); !i.is_done (); i.next ())
{
count++;
@@ -251,6 +265,10 @@ template class ACE_Map_Iterator<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Ba
template class ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry,ACE_Null_Mutex>;
template class ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry>;
+template class ACE_RB_Tree<RtecEventComm::EventHeader,int,TAO_EC_Basic_ObserverStrategy::Header_Compare,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,TAO_EC_Basic_ObserverStrategy::Header_Compare,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Node<RtecEventComm::EventHeader,int>;
+
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry,ACE_Null_Mutex>
@@ -258,5 +276,8 @@ template class ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic
#pragma instantiate ACE_Map_Iterator_Base<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry,ACE_Null_Mutex>
#pragma instantiate ACE_Map_Entry<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry>
+#pragma instantiate ACE_RB_Tree<RtecEventComm::EventHeader,int,TAO_EC_Basic_ObserverStrategy::Header_Compare,ACE_Null_Mutex>
+#pragma instantiate ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,TAO_EC_Basic_ObserverStrategy::Header_Compare,ACE_Null_Mutex>
+#pragma instantiate ACE_RB_Tree_Node<RtecEventComm::EventHeader,int>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
index 56640e06b47..3422f36264c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
@@ -35,6 +35,7 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "ace/RB_Tree.h"
#include "orbsvcs/RtecEventChannelAdminC.h"
class ACE_Lock;
@@ -201,6 +202,12 @@ public:
// The observer
};
+ struct Header_Compare
+ {
+ int operator () (const RtecEventComm::EventHeader& lhs,
+ const RtecEventComm::EventHeader& rhs) const;
+ };
+
private:
void fill_qos (RtecEventChannelAdmin::ConsumerQOS &qos,
CORBA::Environment &env);
@@ -224,6 +231,9 @@ private:
Observer_Map observers_;
// Keep the set of Observers
+
+ typedef ACE_RB_Tree<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> Headers;
+ typedef ACE_RB_Tree_Iterator<RtecEventComm::EventHeader,int,Header_Compare,ACE_Null_Mutex> HeadersIterator;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
index 6a677aa122a..1a58864e03d 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
@@ -33,3 +33,16 @@ TAO_EC_Basic_ObserverStrategy::Observer_Entry::
observer (o)
{
}
+
+// ****************************************************************
+
+ACE_INLINE int
+TAO_EC_Basic_ObserverStrategy::Header_Compare::
+ operator () (const RtecEventComm::EventHeader& lhs,
+ const RtecEventComm::EventHeader& rhs) const
+{
+ if (lhs.source == rhs.source)
+ return lhs.type < rhs.type;
+ return lhs.source < rhs.source;
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
new file mode 100644
index 00000000000..69fa337e346
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
@@ -0,0 +1,109 @@
+// $Id$
+
+#include "EC_Per_Supplier_Filter.h"
+#include "EC_Event_Channel.h"
+#include "EC_ProxyPushSupplier_Set.h"
+#include "EC_ProxySupplier.h"
+#include "EC_ProxyConsumer.h"
+#include "EC_QOS_Info.h"
+#include "orbsvcs/Event_Service_Constants.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "EC_Per_Supplier_Filter.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_Per_Supplier_Filter, "$Id$")
+
+TAO_EC_Per_Supplier_Filter::
+ TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel* ec)
+ : event_channel_ (ec)
+{
+ this->supplier_set_ =
+ this->event_channel_->create_proxy_push_supplier_set ();
+}
+
+TAO_EC_Per_Supplier_Filter::~TAO_EC_Per_Supplier_Filter (void)
+{
+ this->event_channel_->destroy_proxy_push_supplier_set (this->supplier_set_);
+ this->supplier_set_ = 0;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::bind (TAO_EC_ProxyPushConsumer* consumer)
+{
+ this->consumer_ = consumer;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::unbind (TAO_EC_ProxyPushConsumer* consumer)
+{
+ if (this->consumer_ != 0 && this->consumer_ != consumer)
+ return;
+
+ this->consumer_ = 0;
+ delete this;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::connected (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ if (this->consumer_ == 0)
+ return;
+
+ const RtecEventChannelAdmin::SupplierQOS& pub =
+ this->consumer_->publications ();
+
+ for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
+ {
+ const RtecEventComm::Event& event =
+ pub.publications[j].event;
+ RtecEventComm::EventType type = event.header.type;
+
+ if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED)
+ continue;
+ if (supplier->can_match (event.header))
+ {
+ this->supplier_set_->connected (supplier, ACE_TRY_ENV);
+ return;
+ }
+ }
+}
+
+void
+TAO_EC_Per_Supplier_Filter::disconnected (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ this->supplier_set_->disconnected (supplier, ACE_TRY_ENV);
+}
+
+void
+TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ for (CORBA::ULong j = 0; j < event.length (); ++j)
+ {
+ const RtecEventComm::Event& e = event[j];
+ RtecEventComm::Event* buffer =
+ ACE_const_cast(RtecEventComm::Event*, &e);
+ RtecEventComm::EventSet single_event (1, 1, buffer, 0);
+
+ ACE_GUARD_THROW (TAO_EC_ProxyPushSupplier_Set::Busy_Lock,
+ ace_mon, this->supplier_set_->busy_lock (),
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
+ TAO_EC_ProxyPushSupplier_Set::SupplierSetIterator end =
+ this->supplier_set_->end ();
+
+ for (TAO_EC_ProxyPushSupplier_Set::SupplierSetIterator i =
+ this->supplier_set_->begin ();
+ i != end;
+ ++i)
+ {
+ TAO_EC_QOS_Info qos_info;
+
+ (*i)->filter (single_event, qos_info, ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ }
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h
new file mode 100644
index 00000000000..837db0ab77c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h
@@ -0,0 +1,86 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_Per_Supplier_Filter
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Another implementation for the TAO_EC_SupplierFiltering strategy,
+// this one keeps a list of consumer (proxies) for each supplier.
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+//
+// ============================================================================
+
+#ifndef TAO_EC_PER_SUPPLIER_FILTER_H
+#define TAO_EC_PER_SUPPLIER_FILTER_H
+
+#include "EC_SupplierFiltering.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class TAO_EC_ProxyPushSupplier_Set;
+
+class TAO_EC_Per_Supplier_Filter : public TAO_EC_SupplierFiltering
+{
+ // = TITLE
+ // Filter the events on each supplier.
+ //
+ // = DESCRIPTION
+ // This is a filtering strategy for the suppliers. In this
+ // particular case we keep a collection of the consumers that
+ // could potentially be interested in any event generated by a
+ // particular supplier.
+ // This minimizes the amount of consumers touched by the EC when
+ // dispatching an event.
+ //
+public:
+ TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel* ec);
+ // Constructor
+
+ virtual ~TAO_EC_Per_Supplier_Filter (void);
+ // Destructor
+
+ // = The TAO_EC_SupplierFiltering methods.
+ virtual void bind (TAO_EC_ProxyPushConsumer* consumer);
+ virtual void unbind (TAO_EC_ProxyPushConsumer* consumer);
+ virtual void connected (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ virtual void disconnected (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ virtual void push (const RtecEventComm::EventSet& event,
+ CORBA::Environment &);
+
+private:
+ TAO_EC_Event_Channel *event_channel_;
+ // The event channel, used to locate the set of consumers.
+
+ TAO_EC_ProxyPushConsumer* consumer_;
+ // The proxy for the supplier we are bound to.
+
+ TAO_EC_ProxyPushSupplier_Set* supplier_set_;
+ // Keep the list of proxies for the consumers that may be interested
+ // in our events.
+};
+
+#if defined (__ACE_INLINE__)
+#include "EC_Per_Supplier_Filter.i"
+#endif /* __ACE_INLINE__ */
+
+#endif /* TAO_EC_PER_SUPPLIER_FILTER_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.i b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.i
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.i
@@ -0,0 +1 @@
+// $Id$
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.cpp
new file mode 100644
index 00000000000..12aab4a6272
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.cpp
@@ -0,0 +1,66 @@
+// $Id$
+
+#include "EC_ProxyPushSupplier_Set.h"
+#include "EC_ProxySupplier.h"
+#include "EC_Command.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "EC_ProxyPushSupplier_Set.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_ProxyPushSupplier_Set, "$Id$")
+
+TAO_EC_ProxyPushSupplier_Set::TAO_EC_ProxyPushSupplier_Set (void)
+ : busy_lock_ (this),
+ busy_hwm_ (1),
+ max_write_delay_ (1)
+{
+}
+
+TAO_EC_ProxyPushSupplier_Set::~TAO_EC_ProxyPushSupplier_Set (void)
+{
+}
+
+void
+TAO_EC_ProxyPushSupplier_Set::connected_i (
+ TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ if (this->all_suppliers_.insert (supplier) != 0)
+ ACE_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
+}
+
+void
+TAO_EC_ProxyPushSupplier_Set::disconnected_i (
+ TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ if (this->all_suppliers_.remove (supplier) != 0)
+ ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR ());
+ supplier->_decr_refcnt ();
+}
+
+void
+TAO_EC_ProxyPushSupplier_Set::execute_delayed_operations (void)
+{
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Node<TAO_EC_ProxyPushSupplier*>;
+template class ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>;
+template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>;
+template class TAO_EC_Busy_Lock_Adapter<TAO_EC_ProxyPushSupplier_Set>;
+template class TAO_EC_Connected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>;
+template class TAO_EC_Disconnected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate ACE_Node<TAO_EC_ProxyPushSupplier*>
+#pragma instantiate ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>
+#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>
+#pragma instantiate TAO_EC_Busy_Lock_Adapter<TAO_EC_ProxyPushSupplier_Set>
+#pragma instantiate TAO_EC_Connected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>
+#pragma instantiate TAO_EC_Disconnected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.h
new file mode 100644
index 00000000000..3973b6f92f6
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.h
@@ -0,0 +1,259 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_ProxyPushSupplier_Set
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Defines the interface for a collection of ProxyPushSuppliers that
+// can handle the concurrency requirements of the Event Channel.
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+// ============================================================================
+
+#ifndef TAO_EC_PROXYPUSHSUPPLIER_SET_H
+#define TAO_EC_PROXYPUSHSUPPLIER_SET_H
+
+#include "ace/Containers.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "tao/corba.h"
+#include "EC_Busy_Lock.h"
+
+class TAO_EC_ProxyPushSupplier;
+template<class Target,class Object> class TAO_EC_Connected_Command;
+template<class Target,class Object> class TAO_EC_Disconnected_Command;
+
+class TAO_EC_ProxyPushSupplier_Set
+{
+ // = TITLE
+ // ProxyPushSupplier_Set
+ //
+ // = DESCRIPTION
+ // Many components in the Event Channel need to keep a collection
+ // of ProxyPushSuppliers; this collections must be able to cope
+ // with several concurrency issues:
+ // + Some threads may need to iterate over the collection and
+ // invoke a method on each element. Locking the collection
+ // while this is done is not feasible in all cases: under some
+ // configurations the same thread that is iterating over the
+ // collection may need to make changes.
+ // + A recursive lock does not solve the concurrency problems
+ // correctly because in the common case we don't want to stop
+ // other threads from doing the same iteration, and changes to
+ // the collection could be very uncommon wrt dispatching over
+ // the set of ProxyPushSuppliers.
+ // + Using a reader-write lock with upgrades does not solve the
+ // problem either: upgrading the lock can fail and still
+ // invalidates the iterators that the thread has.
+ // + Copying the collection to iterate over it is a good solution
+ // for non-real-time systems, but again the copying could be
+ // non-deterministic, and is expensive since most iterations
+ // will not result in changes.
+ //
+ // This class encapsulates a protocol to solve this concurrency
+ // issues (and other minor ones). The first solution is to delay
+ // changes by putting them on an "operation queue", the operations
+ // are stored as command objects in this queue and executed once
+ // the system is quiescent (i.e. no threads are iterating over the
+ // collection).
+ // This solves the major problems, but there are other issues to
+ // be addressed:
+ // + How do we ensure that the operations are eventually executed?
+ // + How do we simplify the execution of the locking protocol for
+ // clients of this class?
+ // + How do we minimize overhead for single threaded execution?
+ // + How do we minimize the overhead for the cases where the
+ // threads dispatching events don't post changes to the
+ // collection?
+ //
+ // = VARIANTS
+ //
+ // We identify several sources of variation:
+ //
+ // + Immediate changes without delay (only locking). This is only
+ // useful in configurations where a separate thread dispatches
+ // the events, and thus, can only be used with real locks.
+ // The busy()/idle() methods use a read acquire/release, the
+ // connected()/disconnected() methods use a write
+ // acquire/release. We can either use RW or regular mutexes.
+ // IMPLEMENTATION: a derived class with strategized locking
+ // (ACE_Lock? or templates?).
+ //
+ // + Copying data for iteration:
+ // Can use a regular mutex (as in the previous case), or even a
+ // null lock.
+ // @@ Must stablish if there is a simple way to encapsulate this
+ // in the Set class, as opposed to the clients of the class.
+ // @@ The implementation is essentially the same as above, but
+ // the clients must make the copy.
+ // IMPLEMENTATION: As above.
+ //
+ // + Delayed operations:
+ // Only makes sense if the thread dispatching is the same thread
+ // where the upcalls run.
+ // Can require regular locks or null locks (for ST
+ // implementations); notice that this will require templates
+ // to parametrize the mutexes and condition variables.
+ // There are two main variations:
+ //
+ // - An upcall can result in new dispatches: in this case we
+ // have to keep track of a the list of current threads using
+ // a Set, to avoid dead-locks.
+ // IMPLEMENTATION: the design is not complete, probably
+ // similar to the next one.
+ //
+ // - Otherwise we just need to control the concurrency using the
+ // algorithm described below.
+ // IMPLEMENTATION: a derived parametric class (the arguments
+ // are the types of locks).
+ //
+ // = DELAYED OPERATIONS AND CONCURRENCY
+ //
+ // The algorithm proposed so far is:
+ // - If a thread is using the set then it increases the busy
+ // count, this is done by calling the busy() method. Once the
+ // thread has stopped using the collection the idle() method is
+ // invoked and the busy count is decreased.
+ // A helper class (Busy_Lock) is used to hide this protocol
+ // behind the familiar GUARD idiom.
+ // - If the busy count reaches the busy_hwm then the thread must
+ // wait until the count reaches 0 again.
+ // This can be used to control the maximum concurrency in the
+ // EC, matching it (for example) with the number of
+ // processors. Setting the concurrency to a high value (say one
+ // million) allows for an arbitrary number of threads to execute
+ // concurrently.
+ // - If a modification is posted to the collection we need to
+ // execute it at some point.
+ // Just using the busy_hwm would not work, the HWM may not be
+ // reached ever, so another form of control is needed.
+ // Instead we use another counter, that keeps track of how many
+ // threads have used the set since the modification was
+ // posted. If this number of threads reaches max_write_delay then
+ // we don't allow any more threads to go in, eventually the
+ // thread count reaches 0 and we can proceed with the operations.
+ //
+ // - There is one aspect of concurrency that can be problematic: if
+ // thread pushes events as part of an upcall then the same thread
+ // could be counted twice, we need to keep track of the threads
+ // that are dispatching events and not increase (or decrease) the
+ // reference count when a thread iterates twice over the same
+ // set.
+ //
+ // = MEMORY MANAGMENT
+ // It assumes ownership of the ProxyPushSuppliers added to the
+ // collection: simply by increasing their reference count.
+ //
+ // = LOCKING
+ // Locking is provided by derived classes.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ProxyPushSupplier_Set (void);
+ // Constructor.
+
+ virtual ~TAO_EC_ProxyPushSupplier_Set (void);
+ // Destructor.
+
+ typedef ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*> SupplierSet;
+ typedef ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*> SupplierSetIterator;
+ // The actual implementation of the collection, notice that only
+ // iteration is exposed, methods to insert and remove objects are
+ // hidden behind proper interfaces.
+
+ SupplierSetIterator begin (void);
+ SupplierSetIterator end (void);
+ // Iterators over the set of ProxyPushSuppliers
+ // Remember that all clients of this class must call busy() before
+ // starting an iteration and call idle() once the iteration is
+ // finished. Otherwise the iterators may be invalidated by other
+ // threads.
+ // A helper object (the busy_lock) is provided to simplify this task
+ // and make it exception safe.
+
+ typedef TAO_EC_Busy_Lock_Adapter<TAO_EC_ProxyPushSupplier_Set> Busy_Lock;
+ Busy_Lock& busy_lock (void);
+ // This object is an adapter to the busy/idle protocol.
+
+ void busy_hwm (CORBA::ULong hwm);
+ CORBA::ULong busy_hwm (void) const;
+ void max_write_delay (CORBA::ULong hwm);
+ CORBA::ULong max_write_delay (void) const;
+ // This two attributes control the maximum number of concurrent
+ // readers allowed in the set (busy_hwm) and the maximum number of
+ // threads that can proceed after there is a modification posted.
+
+ virtual int busy (void) = 0;
+ virtual int idle (void) = 0;
+ // The implementation of this methods is provided by derived
+ // classes, that provide appropiate locking.
+
+ virtual void connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&) = 0;
+ virtual void disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&) = 0;
+ // Used to inform the EC that a Supplier has connected or
+ // disconnected from it.
+
+protected:
+ virtual void connected_i (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ // The implementation of connected(), without locking.
+ // It does not increase the reference count on the supplier
+
+ virtual void disconnected_i (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ // The implementation of disconnected(), without locking.
+ // It decreases the reference count on the supplier if the operation
+ // is successful.
+
+ typedef TAO_EC_Connected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier> Connected_Command;
+ typedef TAO_EC_Connected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier> Disconnected_Command;
+
+ friend class TAO_EC_Connected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>;
+ friend class TAO_EC_Disconnected_Command<TAO_EC_ProxyPushSupplier_Set,TAO_EC_ProxyPushSupplier>;
+ // This two classes call the connected_i() and disconnected_i()
+ // methods, that's ok because they do while this class is holding
+ // its lock.
+
+ virtual void execute_delayed_operations (void);
+ // Derived classes that implement delayed disconnects and connects
+ // must override this method.
+
+protected:
+ SupplierSet all_suppliers_;
+ // The collection of ProxyPushSupplier objects.
+
+ TAO_EC_Busy_Lock_Adapter<TAO_EC_ProxyPushSupplier_Set> busy_lock_;
+ // The busy lock object
+
+private:
+ CORBA::ULong busy_hwm_;
+ CORBA::ULong max_write_delay_;
+ // Control variables for the concurrency policies.
+};
+
+#if defined (__ACE_INLINE__)
+#include "EC_ProxyPushSupplier_Set.i"
+#endif /* __ACE_INLINE__ */
+
+#endif /* TAO_EC_PROXYPUSHSUPPLIER_SET_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.i
new file mode 100644
index 00000000000..dda5992d8a4
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set.i
@@ -0,0 +1,43 @@
+// $Id$
+
+ACE_INLINE TAO_EC_ProxyPushSupplier_Set::SupplierSetIterator
+TAO_EC_ProxyPushSupplier_Set::begin (void)
+{
+ return this->all_suppliers_.begin ();
+}
+
+ACE_INLINE TAO_EC_ProxyPushSupplier_Set::SupplierSetIterator
+TAO_EC_ProxyPushSupplier_Set::end (void)
+{
+ return this->all_suppliers_.end ();
+}
+
+ACE_INLINE void
+TAO_EC_ProxyPushSupplier_Set::busy_hwm (CORBA::ULong hwm)
+{
+ this->busy_hwm_ = hwm;
+}
+
+ACE_INLINE CORBA::ULong
+TAO_EC_ProxyPushSupplier_Set::busy_hwm (void) const
+{
+ return this->busy_hwm_;
+}
+
+ACE_INLINE void
+TAO_EC_ProxyPushSupplier_Set::max_write_delay (CORBA::ULong mwd)
+{
+ this->max_write_delay_ = mwd;
+}
+
+ACE_INLINE CORBA::ULong
+TAO_EC_ProxyPushSupplier_Set::max_write_delay (void) const
+{
+ return this->max_write_delay_;
+}
+
+ACE_INLINE TAO_EC_ProxyPushSupplier_Set::Busy_Lock&
+TAO_EC_ProxyPushSupplier_Set::busy_lock (void)
+{
+ return this->busy_lock_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp
index 491e1ba64be..e2a6029e376 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp
@@ -1,79 +1,47 @@
// $Id$
-#ifndef TAO_EC_CONSUMERADMIN_T_CPP
-#define TAO_EC_CONSUMERADMIN_T_CPP
+#ifndef TAO_EC_PROXYPUSHSUPPLIER_SET_T_CPP
+#define TAO_EC_PROXYPUSHSUPPLIER_SET_T_CPP
-#include "EC_ConsumerAdmin_T.h"
+#include "EC_ProxyPushSupplier_Set_T.h"
#include "EC_Command.h"
#if ! defined (__ACE_INLINE__)
-#include "EC_ConsumerAdmin_T.i"
+#include "EC_ProxyPushSupplier_Set_T.i"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(Event, EC_ConsumerAdmin_T, "$Id$")
+ACE_RCSID(Event, EC_ProxyPushSupplier_Set_T, "$Id$")
-template<ACE_SYNCH_DECL>int
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy (void)
-{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->busy_i ();
-}
-
-template<ACE_SYNCH_DECL> int
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle (void)
+template<class ACE_LOCK>int
+TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_LOCK>::busy (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->idle_i ();
-}
-
-template<ACE_SYNCH_DECL> int
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_i (void)
-{
- if (this->busy_count_ >= this->busy_hwm ())
- {
- this->reached_hwm_ = 1;
- while (this->reached_hwm_ != 0)
- this->busy_cond_.wait ();
- }
- this->busy_count_++;
- return 0;
+ return this->lock_.acquire_read ();
}
-template<ACE_SYNCH_DECL> int
-TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle_i (void)
+template<class ACE_LOCK> int
+TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_LOCK>::idle (void)
{
- this->busy_count_--;
- if (this->busy_count_ == 0)
- {
- this->reached_hwm_ = 0;
- this->execute_delayed_operations ();
- this->busy_cond_.broadcast ();
- }
- return 0;
+ return this->lock_.release ();
}
-// ****************************************************************
-
-template<ACE_SYNCH_DECL> void
-TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::connected (
+template<class ACE_LOCK> void
+TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_LOCK>::connected (
TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment& ACE_TRY_ENV)
{
- ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ ACE_GUARD_THROW (ACE_LOCK, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
supplier->_incr_refcnt ();
this->connected_i (supplier, ACE_TRY_ENV);
}
-template<ACE_SYNCH_DECL> void
-TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::disconnected (
+template<class ACE_LOCK> void
+TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_LOCK>::disconnected (
TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment& ACE_TRY_ENV)
{
- ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ ACE_GUARD_THROW (ACE_LOCK, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
this->disconnected_i (supplier, ACE_TRY_ENV);
@@ -81,15 +49,43 @@ TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::disconnected (
// ****************************************************************
+template<ACE_SYNCH_DECL>int
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::busy (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ while (this->busy_count_ >= this->busy_hwm ()
+ || this->write_delay_ >= this->max_write_delay ())
+ this->busy_cond_.wait ();
+ this->busy_count_++;
+
+ return 0;
+}
+
+template<ACE_SYNCH_DECL> int
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::idle (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ this->busy_count_--;
+ if (this->busy_count_ == 0)
+ {
+ this->write_delay_ = 0;
+ this->execute_delayed_operations ();
+ this->busy_cond_.broadcast ();
+ }
+ return 0;
+}
+
template<ACE_SYNCH_DECL> void
-TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::connected (
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::connected (
TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment& ACE_TRY_ENV)
{
ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- if (this->busy_count () == 0)
+ if (this->busy_count_ == 0)
{
// We can add the object immediately
this->connected_i (supplier, ACE_TRY_ENV);
@@ -99,7 +95,7 @@ TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::connected (
supplier->_incr_refcnt ();
ACE_Command_Base* command;
ACE_NEW (command,
- TAO_EC_ConsumerAdmin::Connected_Command (this,
+ TAO_EC_ProxyPushSupplier_Set::Connected_Command (this,
supplier));
ACE_DEBUG ((LM_DEBUG,
@@ -107,18 +103,19 @@ TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::connected (
command));
this->command_queue_.enqueue_tail (command);
+ this->write_delay_++;
}
}
template<ACE_SYNCH_DECL> void
-TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::disconnected (
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::disconnected (
TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment& ACE_TRY_ENV)
{
ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- if (this->busy_count () == 0)
+ if (this->busy_count_ == 0)
{
// We can remove the object immediately
this->disconnected_i (supplier, ACE_TRY_ENV);
@@ -127,18 +124,19 @@ TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::disconnected (
{
ACE_Command_Base* command;
ACE_NEW (command,
- TAO_EC_ConsumerAdmin::Disconnected_Command (this,
+ TAO_EC_ProxyPushSupplier_Set::Disconnected_Command (this,
supplier));
ACE_DEBUG ((LM_DEBUG,
"EC (%P|%t) Delayed disconnection command = %x\n",
command));
this->command_queue_.enqueue_tail (command);
+ this->write_delay_++;
}
}
template<ACE_SYNCH_DECL> void
-TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::execute_delayed_operations (void)
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::execute_delayed_operations (void)
{
// LOCKING: the lock is taken by the idle() function
while (!this->command_queue_.is_empty ())
@@ -156,4 +154,4 @@ TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::execute_delayed_operations (void)
}
}
-#endif /* TAO_EC_CONSUMERADMIN_T_CPP */
+#endif /* TAO_EC_PROXYPUSHSUPPLIER_SET_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.h
new file mode 100644
index 00000000000..0cb78bf6bb3
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.h
@@ -0,0 +1,135 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_ProxyPushSupplier_Set_T
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Parametric classes related to EC_ProxyPushSupplier_Set
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+// ============================================================================
+
+#ifndef TAO_EC_PROXYPUSHSUPPLIER_SET_T_H
+#define TAO_EC_PROXYPUSHSUPPLIER_SET_T_H
+
+#include "EC_ProxyPushSupplier_Set.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class ACE_LOCK>
+class TAO_EC_ProxyPushSupplier_Set_Immediate : public TAO_EC_ProxyPushSupplier_Set
+{
+ // = TITLE
+ // ProxyPushSupplier_Set_Immediate
+ //
+ // = DESCRIPTION
+ // A concrete implementation of EC_ProxyPushSupplier_Set that
+ // propagate changes to the set immediately.
+ //
+ // = LOCKING
+ // Uses parametric types to provide locking.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ProxyPushSupplier_Set_Immediate (void);
+ // Constructor.
+
+ virtual int busy (void);
+ virtual int idle (void);
+ // The implementation of this methods is provided by derived
+ // classes, that provide appropiate locking.
+
+ virtual void connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+ virtual void disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+ // Used to inform the EC that a Supplier has connected or
+ // disconnected from it.
+
+private:
+ ACE_LOCK lock_;
+ // The lock.
+};
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL>
+class TAO_EC_ProxyPushSupplier_Set_Delayed : public TAO_EC_ProxyPushSupplier_Set
+{
+ // = TITLE
+ // ProxyPushSupplier_Set_Delayed
+ //
+ // = DESCRIPTION
+ // A concrete implementation of EC_ProxyPushSupplier_Set that
+ // delays changes to the set until no threads are using the set.
+ //
+ // = LOCKING
+ // Uses parametric types to provide locking.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ProxyPushSupplier_Set_Delayed (void);
+ // Constructor.
+
+ // = Read the descriptions in EC_ProxyPushSupplier_Set
+ virtual int busy (void);
+ virtual int idle (void);
+ virtual void connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+ virtual void disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+
+protected:
+ virtual void execute_delayed_operations (void);
+
+private:
+ ACE_SYNCH_MUTEX_T lock_;
+ // The lock.
+
+ ACE_SYNCH_CONDITION_T busy_cond_;
+ // A condition variable to wait while the object is too busy.
+
+ int busy_count_;
+ // Keep track of the number of threads using the set
+
+ int write_delay_;
+ // Keep track of the number of threads that have used the set since
+ // the last change was posted.
+
+ ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_;
+ // The commands that carry the delayed operations are enqueued
+ // here.
+};
+
+#if defined (__ACE_INLINE__)
+#include "EC_ProxyPushSupplier_Set_T.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "EC_ProxyPushSupplier_Set_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("EC_ProxyPushSupplier_Set_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_EC_PROXYPUSHSUPPLIER_SET_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.i
new file mode 100644
index 00000000000..f65b57d2cc7
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.i
@@ -0,0 +1,19 @@
+// $Id$
+
+template<class ACE_LOCK> ACE_INLINE
+TAO_EC_ProxyPushSupplier_Set_Immediate<ACE_LOCK>::
+ TAO_EC_ProxyPushSupplier_Set_Immediate (void)
+{
+}
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL> ACE_INLINE
+TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::
+ TAO_EC_ProxyPushSupplier_Set_Delayed (void)
+ : busy_cond_ (lock_),
+ busy_count_ (0),
+ write_delay_ (0)
+{
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index 4b9a7367dc2..8818aae7f88 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -235,11 +235,11 @@ TAO_EC_ProxyPushSupplier::max_event_size (void) const
return this->child_->max_event_size ();
}
-void
-TAO_EC_ProxyPushSupplier::event_ids (TAO_EC_Filter::Headers& headerset)
+int
+TAO_EC_ProxyPushSupplier::can_match (
+ const RtecEventComm::EventHeader &header) const
{
- ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- // @@ TODO cache this result....
- this->child_->event_ids (headerset);
+ return this->child_->can_match (header);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
index 8ce01cc3585..75418dbf683 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
@@ -138,7 +138,7 @@ public:
CORBA::Environment& env);
virtual void clear (void);
virtual CORBA::ULong max_event_size (void) const;
- virtual void event_ids (TAO_EC_Filter::Headers& headerset);
+ virtual int can_match (const RtecEventComm::EventHeader &header) const;
private:
CORBA::Boolean is_connected_i (void) const;
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp
index 0eef922ab1c..93b624eb363 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp
@@ -3,7 +3,7 @@
#include "EC_Type_Filter.h"
#if ! defined (__ACE_INLINE__)
-#include "EC_Filter.i"
+#include "EC_Type_Filter.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Event, EC_Type_Filter, "$Id$")
@@ -76,8 +76,9 @@ TAO_EC_Type_Filter::max_event_size (void) const
return 1;
}
-void
-TAO_EC_Type_Filter::event_ids(TAO_EC_Filter::Headers& headers)
+int
+TAO_EC_Type_Filter::can_match (
+ const RtecEventComm::EventHeader& header) const
{
- headers.insert (this->header_, 1);
+ return TAO_EC_Filter::matches (this->header_, header);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h
index 334150aea78..49614078aae 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h
@@ -49,7 +49,7 @@ class TAO_EC_Type_Filter : public TAO_EC_Filter
public:
TAO_EC_Type_Filter (const RtecEventComm::EventHeader& header);
// Constructor.
-
+
// = The TAO_EC_Filter methods, please check the documentation in
// TAO_EC_Filter.
virtual int filter (const RtecEventComm::EventSet& event,
@@ -66,13 +66,13 @@ public:
CORBA::Environment& env);
virtual void clear (void);
virtual CORBA::ULong max_event_size (void) const;
- virtual void event_ids (TAO_EC_Filter::Headers& headers);
+ virtual int can_match (const RtecEventComm::EventHeader& header) const;
private:
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Type_Filter
(const TAO_EC_Type_Filter&))
ACE_UNIMPLEMENTED_FUNC (TAO_EC_Type_Filter& operator=
- (const TAO_EC_Type_Filter&))
+ (const TAO_EC_Type_Filter&))
private:
RtecEventComm::EventHeader header_;
diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile
index 9bb0777e3f8..5ad662089fb 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile
+++ b/TAO/orbsvcs/orbsvcs/Makefile
@@ -166,6 +166,8 @@ ifneq (,$(findstring Event2,$(TAO_ORBSVCS)))
Event/EC_Basic_Filter_Builder \
Event/EC_Basic_Factory \
Event/EC_ObserverStrategy \
+ Event/EC_ProxyPushSupplier_Set \
+ Event/EC_Per_Supplier_Filter \
endif # Event2
diff --git a/TAO/tao/Any.h b/TAO/tao/Any.h
index e4e6318527b..9867cb507c3 100644
--- a/TAO/tao/Any.h
+++ b/TAO/tao/Any.h
@@ -75,7 +75,7 @@ public:
// = TAO extension
CORBA_Any (CORBA::TypeCode_ptr type,
const ACE_Message_Block* mb);
- // Constructor. Used by DynAny to compose/decompose
+ // Constructor. Used by DynAny to compose/decompose
// complex types using a CDR.
CORBA_Any (const CORBA_Any &a);
@@ -170,11 +170,11 @@ public:
// = Special types.
// These are needed for insertion and extraction of booleans,
- // octets, chars, and bounded strings. CORBA spec requires
+ // octets, chars, and bounded strings. CORBA spec requires
// that they be here, we just typedef to the already-defined
// ACE_OutputCDR types.
- typedef ACE_OutputCDR::from_boolean from_boolean;
+ typedef ACE_OutputCDR::from_boolean from_boolean;
typedef ACE_OutputCDR::from_octet from_octet;
typedef ACE_OutputCDR::from_char from_char;
typedef ACE_OutputCDR::from_wchar from_wchar;
@@ -240,14 +240,14 @@ public:
void replace (CORBA::TypeCode_ptr type,
const void *value,
CORBA::Boolean any_owns_data,
- CORBA_Environment &TAO_IN_ENV =
+ CORBA_Environment &TAO_IN_ENV =
CORBA::default_environment ());
// Replace the current typecode and data with the specified one -
// unsafe.
void replace (CORBA::TypeCode_ptr type,
const void *value,
- CORBA_Environment &TAO_IN_ENV =
+ CORBA_Environment &TAO_IN_ENV =
CORBA::default_environment ());
// Replace the current typecode and data with the specified one -
// unsafe. This uses a default value for the "any_owns_data" parameter
@@ -256,7 +256,7 @@ public:
// Return TypeCode of the element stored in the Any.
void type (CORBA::TypeCode_ptr type,
- CORBA_Environment &TAO_IN_ENV =
+ CORBA_Environment &TAO_IN_ENV =
CORBA::default_environment ());
// For use along with <<= of a value of aliased type when the alias must
// be preserved.
@@ -287,7 +287,7 @@ public:
void _tao_replace (CORBA::TypeCode_ptr,
const ACE_Message_Block *mb,
CORBA::Boolean any_owns_data,
- CORBA::Environment &TAO_IN_ENV =
+ CORBA::Environment &TAO_IN_ENV =
CORBA::default_environment ());
// Replace via message block instead of <value_>.
@@ -373,7 +373,7 @@ class TAO_Export CORBA_Any_out
// CORBA_Any_out
//
// = DESCRIPTION
- // The _out class for CORBA_Any. This is used to help in
+ // The _out class for CORBA_Any. This is used to help in
// managing the out parameters.
public:
// = operations.
diff --git a/TAO/tao/MProfile.i b/TAO/tao/MProfile.i
index 88cbf1c9879..3ed402c0131 100644
--- a/TAO/tao/MProfile.i
+++ b/TAO/tao/MProfile.i
@@ -89,7 +89,7 @@ TAO_MProfile::get_prev (void)
return 0;
if (current_ > 1)
current_--;
-
+
return pfiles_[current_ - 1];
}
@@ -103,11 +103,11 @@ TAO_MProfile::get_profile (TAO_PHandle handle)
else
return 0;
}
-
+
ACE_INLINE TAO_Profile *
TAO_MProfile::get_current_profile (void)
{
- if (last_ == 0)
+ if (last_ == 0)
return 0;
if (current_ == 0)
// means list has not been read before.
@@ -116,44 +116,44 @@ TAO_MProfile::get_current_profile (void)
return pfiles_[current_ - 1];
}
-ACE_INLINE TAO_PHandle
+ACE_INLINE TAO_PHandle
TAO_MProfile::get_current_handle (void)
{
if (current_ > 0)
return current_ - 1;
- else
+ else
return 0;
}
-ACE_INLINE void
+ACE_INLINE void
TAO_MProfile::rewind (void)
{
current_ = 0;
}
-ACE_INLINE int
+ACE_INLINE int
TAO_MProfile::add_profile (TAO_Profile *pfile)
{
// skip by the used slots
if (last_ == size_) // full!
return -1;
-
+
pfiles_[last_++] = pfile;
- if (pfile && pfile->_incr_refcnt () == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%P|%t) Unable to increment reference count in add_profile!\n"),
+ if (pfile && pfile->_incr_refcnt () == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%P|%t) Unable to increment reference count in add_profile!\n"),
-1);
return last_ - 1;
}
-ACE_INLINE int
+ACE_INLINE int
TAO_MProfile::give_profile (TAO_Profile *pfile)
{
// skip by the used slots
if (last_ == size_) // full!
return -1;
-
+
pfiles_[last_++] = pfile;
return last_ - 1;
@@ -186,7 +186,7 @@ TAO_MProfile::pfiles (void) const
}
ACE_INLINE CORBA::Boolean
-TAO_MProfile::is_equivalent (TAO_MProfile *first,
+TAO_MProfile::is_equivalent (TAO_MProfile *first,
TAO_MProfile *second,
CORBA::Environment &env)
{
@@ -197,12 +197,12 @@ TAO_MProfile::is_equivalent (TAO_MProfile *first,
TAO_Profile_ptr *pfiles2 = second->pfiles ();
TAO_PHandle first_cnt = first->profile_count ();
TAO_PHandle second_cnt = second->profile_count ();
-
+
for (TAO_PHandle h1 = 0; h1 < first_cnt;h1++)
for (TAO_PHandle h2 = 0; h2 < second_cnt; h2++ )
if (pfiles1[h1]->is_equivalent (pfiles2[h2], env))
return 1;
-
+
return 0;
}
@@ -211,13 +211,13 @@ TAO_MProfile::hash (CORBA::ULong max, CORBA::Environment &env)
{
CORBA::ULong hashval = 0;
- if (last_ == 0)
+ if (last_ == 0)
return 0;
- for (TAO_PHandle h=0; h < last_ ; h++)
+ for (TAO_PHandle h=0; h < last_ ; h++)
hashval += pfiles_[h]->hash (max, env);
// The above hash function return an ULong between 0 and max here we
// simply take the average value and round.
- return hashval / last_;
+ return hashval / last_;
}
diff --git a/TAO/tao/orbconf.h b/TAO/tao/orbconf.h
index e71fff43b3a..65a16ac82eb 100644
--- a/TAO/tao/orbconf.h
+++ b/TAO/tao/orbconf.h
@@ -60,6 +60,12 @@
// without timestamps, transient and persistent POA cannot be
// distinguished
+//#define TAO_USE_DOTTED_DECIMAL_ADDRESSES
+//
+// If set the ORB will use dotted decimal addresses in the IORs it
+// exports, this is useful for platforms or environments that cannot
+// depend on a DNS beign available.
+
// The default arguments of the resource factory for the fake service
// configurator
#if !defined (TAO_DEFAULT_RESOURCE_FACTORY_ARGS)