summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp213
1 files changed, 213 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
new file mode 100644
index 00000000000..5a222b54ddd
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
@@ -0,0 +1,213 @@
+// $Id$
+
+#include "orbsvcs/Event/EC_Per_Supplier_Filter.h"
+#include "orbsvcs/Event/EC_Event_Channel_Base.h"
+#include "orbsvcs/Event/EC_ProxySupplier.h"
+#include "orbsvcs/Event/EC_ProxyConsumer.h"
+#include "orbsvcs/Event/EC_Scheduling_Strategy.h"
+#include "orbsvcs/Event/EC_QOS_Info.h"
+
+#include "orbsvcs/ESF/ESF_Proxy_Collection.h"
+
+#include "orbsvcs/Event_Service_Constants.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "orbsvcs/Event/EC_Per_Supplier_Filter.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_Per_Supplier_Filter, "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_EC_Per_Supplier_Filter::
+ TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel_Base* ec)
+ : event_channel_ (ec),
+ consumer_ (0),
+ refcnt_ (1)
+{
+ this->event_channel_->create_proxy_collection (this->collection_);
+}
+
+TAO_EC_Per_Supplier_Filter::~TAO_EC_Per_Supplier_Filter (void)
+{
+ this->event_channel_->destroy_proxy_collection (this->collection_);
+ this->collection_ = 0;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::bind (TAO_EC_ProxyPushConsumer* consumer)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->consumer_ != 0)
+ return;
+
+ this->consumer_ = consumer;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::unbind (TAO_EC_ProxyPushConsumer* consumer)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->consumer_ == 0 || this->consumer_ != consumer)
+ return;
+
+ this->consumer_ = 0;
+
+ ACE_TRY_NEW_ENV
+ {
+ this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // @@ Ignore exceptions
+ }
+ ACE_ENDTRY;
+}
+
+void
+TAO_EC_Per_Supplier_Filter::connected (TAO_EC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->consumer_ == 0)
+ return;
+
+ const RtecEventChannelAdmin::SupplierQOS& pub =
+ this->consumer_->publications_i ();
+
+ for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
+ {
+ const RtecEventComm::Event& event =
+ pub.publications[j].event;
+
+#if TAO_EC_ENABLE_DEBUG_MESSAGES
+ ACE_DEBUG ((LM_DEBUG, "Connecting consumer <%x> to <%x>, "
+ "trying event <%d:%d> ",
+ supplier, this,
+ event.header.source, event.header.type));
+#endif /* TAO_EC_ENABLED_DEBUG_MESSAGES */
+ if (supplier->can_match (event.header))
+ {
+#if TAO_EC_ENABLE_DEBUG_MESSAGES
+ ACE_DEBUG ((LM_DEBUG, " matched\n"));
+#endif /* TAO_EC_ENABLED_DEBUG_MESSAGES */
+ this->collection_->connected (supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ return;
+ }
+#if TAO_EC_ENABLE_DEBUG_MESSAGES
+ ACE_DEBUG ((LM_DEBUG, " not matched\n"));
+#endif /* TAO_EC_ENABLED_DEBUG_MESSAGES */
+ }
+}
+
+void
+TAO_EC_Per_Supplier_Filter::reconnected (TAO_EC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->consumer_ == 0)
+ return;
+
+ const RtecEventChannelAdmin::SupplierQOS& pub =
+ this->consumer_->publications_i ();
+
+ for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
+ {
+ const RtecEventComm::Event& event =
+ pub.publications[j].event;
+
+ // ACE_DEBUG ((LM_DEBUG, "Trying %d:%d in %x\n",
+ // event.header.source, event.header.type,
+ // this));
+ if (supplier->can_match (event.header))
+ {
+ // ACE_DEBUG ((LM_DEBUG, " matched %x\n", supplier));
+ this->collection_->connected (supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ return;
+ }
+ }
+ this->collection_->disconnected (supplier ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_EC_Per_Supplier_Filter::disconnected (TAO_EC_ProxyPushSupplier* supplier
+ ACE_ENV_ARG_DECL)
+{
+ this->collection_->disconnected (supplier ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_EC_Per_Supplier_Filter::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+{
+ this->collection_->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event,
+ TAO_EC_ProxyPushConsumer *consumer
+ ACE_ENV_ARG_DECL)
+{
+ TAO_EC_Scheduling_Strategy* scheduling_strategy =
+ this->event_channel_->scheduling_strategy ();
+ scheduling_strategy->schedule_event (event,
+ consumer,
+ this
+ ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_EC_Per_Supplier_Filter::push_scheduled_event (RtecEventComm::EventSet &event,
+ const TAO_EC_QOS_Info &event_info
+ ACE_ENV_ARG_DECL)
+{
+ TAO_EC_Filter_Worker worker (event, event_info);
+ this->collection_->for_each (&worker ACE_ENV_ARG_PARAMETER);
+}
+
+CORBA::ULong
+TAO_EC_Per_Supplier_Filter::_incr_refcnt (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
+
+ this->refcnt_++;
+ return this->refcnt_;
+}
+
+CORBA::ULong
+TAO_EC_Per_Supplier_Filter::_decr_refcnt (void)
+{
+ {
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0);
+
+ this->refcnt_--;
+ if (this->refcnt_ != 0)
+ return this->refcnt_;
+ }
+ this->event_channel_->supplier_filter_builder ()->destroy (this);
+ return 0;
+}
+
+// ****************************************************************
+
+TAO_EC_Supplier_Filter*
+TAO_EC_Per_Supplier_Filter_Builder::create (
+ RtecEventChannelAdmin::SupplierQOS&)
+{
+ return new TAO_EC_Per_Supplier_Filter (this->event_channel_);
+}
+
+void
+TAO_EC_Per_Supplier_Filter_Builder::destroy (
+ TAO_EC_Supplier_Filter* x)
+{
+ delete x;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL