summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp383
1 files changed, 75 insertions, 308 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index bbc4163bf0c..9d612e50bd1 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -3,7 +3,6 @@
#include "EC_ProxySupplier.h"
#include "EC_Dispatching.h"
#include "EC_Filter_Builder.h"
-#include "EC_QOS_Info.h"
#include "EC_Event_Channel.h"
#if ! defined (__ACE_INLINE__)
@@ -12,8 +11,6 @@
ACE_RCSID(Event, EC_ProxySupplier, "$Id$")
-typedef ACE_Reverse_Lock<ACE_Lock> TAO_EC_Unlock;
-
TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec)
: event_channel_ (ec),
refcount_ (1),
@@ -22,112 +19,80 @@ TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec)
{
this->lock_ =
this->event_channel_->create_supplier_lock ();
-
- this->default_POA_ =
- this->event_channel_->supplier_poa ();
}
TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
{
+ delete this->child_;
+ this->child_ = 0;
this->event_channel_->destroy_supplier_lock (this->lock_);
}
-void
-TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &)
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
{
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ return this->refcount_++;
}
-void
-TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &)
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
{
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return this->refcount_;
+ }
+
+ // Notify the event channel
+ this->event_channel_->destroy_proxy_push_supplier (this);
+ return 0;
}
void
-TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &)
+TAO_EC_ProxyPushSupplier::set_default_POA (PortableServer::POA_ptr poa)
{
+ ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
+ this->default_POA_ = PortableServer::POA::_duplicate (poa);
}
-void
-TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &)
+PortableServer::POA_ptr
+TAO_EC_ProxyPushSupplier::_default_POA_i ()
{
+ return PortableServer::POA::_duplicate (this->default_POA_.in ());
}
-void
-TAO_EC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
+PortableServer::POA_ptr
+TAO_EC_ProxyPushSupplier::_default_POA (CORBA::Environment&)
{
- RtecEventComm::PushConsumer_var consumer;
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return;
-
- consumer = this->consumer_._retn ();
-
- this->cleanup_i ();
- }
-
- this->deactivate (ACE_TRY_ENV);
- ACE_CHECK;
-
- consumer->disconnect_push_consumer (ACE_TRY_ENV);
-
- this->_decr_refcnt ();
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_,
+ PortableServer::POA::_nil ());
+ return this->_default_POA_i ();
}
void
-TAO_EC_ProxyPushSupplier::cleanup_i (void)
+TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer*,
+ CORBA::Environment &)
{
- this->consumer_ =
- RtecEventComm::PushConsumer::_nil ();
-
- // @@ Why don't we have a destroy() method in the
- // filter_builder?
- delete this->child_;
- this->child_ = 0;
}
void
-TAO_EC_ProxyPushSupplier::deactivate (CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*,
+ CORBA::Environment &)
{
- PortableServer::POA_var poa =
- this->_default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (this, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
}
-CORBA::ULong
-TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
+void
+TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment &)
{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- return this->refcount_++;
}
-CORBA::ULong
-TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
+void
+TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment &)
{
- {
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- this->refcount_--;
- if (this->refcount_ != 0)
- return this->refcount_;
- }
-
- // Notify the event channel
- this->event_channel_->destroy_proxy_push_supplier (this);
- return 0;
}
void
@@ -143,55 +108,14 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer (
ACE_CHECK;
if (this->is_connected_i ())
- {
- if (this->event_channel_->consumer_reconnect () == 0)
- ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
-
- // Re-connections are allowed, go ahead and disconnect the
- // consumer...
- this->cleanup_i ();
-
- // @@ Are there any race conditions here:
- // + The lock is released, but the object is marked as
- // disconnected already, so:
- // - No events will be pushed
- // - Any disconnects will just return
- // + But another thread could invoke connect_push_consumer()
- // again, notice that by the time the lock is acquired
- // again the connected() call may still be running.
- // It seems like we need delayed operations again, or
- // something similar to what the POA does in this
- // scenario.
- // Meanwhile we can tell the users: "if it hurts don't do
- // it".
- //
- TAO_EC_Unlock reverse_lock (*this->lock_);
-
- {
- ACE_GUARD_THROW_EX (
- TAO_EC_Unlock, ace_mon, reverse_lock,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- this->event_channel_->disconnected (this, ACE_TRY_ENV);
- ACE_CHECK;
- }
-
- // What if a second thread connected us after this?
- if (this->is_connected_i ())
- return;
- }
+ ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
this->consumer_ =
RtecEventComm::PushConsumer::_duplicate (push_consumer);
this->qos_ = qos;
this->child_ =
- this->event_channel_->filter_builder ()->build (this,
- this->qos_,
- ACE_TRY_ENV);
- ACE_CHECK;
-
+ this->event_channel_->filter_builder ()->build (this->qos_);
this->adopt_child (this->child_);
}
@@ -209,18 +133,18 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- ACE_THROW (CORBA::BAD_INV_ORDER ());
-
- this->cleanup_i ();
- }
-
- this->deactivate (ACE_TRY_ENV);
- ACE_CHECK;
+ this->consumer_ =
+ RtecEventComm::PushConsumer::_nil ();
- // Notify the event channel....
- this->event_channel_->disconnected (this, ACE_TRY_ENV);
+ PortableServer::POA_var poa =
+ this->_default_POA_i ();
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+ }
this->_decr_refcnt ();
}
@@ -251,25 +175,12 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
-
- {
- ACE_GUARD_THROW_EX (
+ ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- if (this->is_connected_i () == 0)
- return 0;
-
- result =
- this->child_->filter (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
- }
+ ACE_CHECK_RETURN (0);
- this->event_channel_->destroy_proxy_push_supplier (this);
- return result;
+ return this->child_->filter (event, qos_info, ACE_TRY_ENV);
}
int
@@ -277,25 +188,12 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
-
- {
- ACE_GUARD_THROW_EX (
+ ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- if (this->is_connected_i () == 0)
- return 0;
-
- result =
- this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
- }
+ ACE_CHECK_RETURN (0);
- this->event_channel_->destroy_proxy_push_supplier (this);
- return result;
+ return this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
}
void
@@ -303,15 +201,15 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // No need to grab the lock, it is beign held already by the
- // filter() method
- this->refcount_++;
- this->event_channel_->dispatching ()->push (this,
- event,
- qos_info,
- ACE_TRY_ENV);
- if (this->child_ != 0)
- this->child_->clear ();
+ // Do not take a lock, this is a call back from our child filter, so
+ // we are holding the lock already (in the filter() method).
+ if (this->is_connected_i ())
+ this->event_channel_->dispatching ()->push (this,
+ this->consumer_.in (),
+ event,
+ qos_info,
+ ACE_TRY_ENV);
+ this->child_->clear ();
}
void
@@ -319,103 +217,15 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // No need to grab the lock, it is beign held already by the
- // filter() method
- this->refcount_++;
- this->event_channel_->dispatching ()->push_nocopy (this,
- event,
- qos_info,
- ACE_TRY_ENV);
- if (this->child_ != 0)
- this->child_->clear ();
-}
-
-void
-TAO_EC_ProxyPushSupplier::push_to_consumer (const RtecEventComm::EventSet& event,
- CORBA::Environment& ACE_TRY_ENV)
-{
- RtecEventComm::PushConsumer_var consumer;
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- // The reference count was increased just before pushing to the
- // dispatching module, we must decrease here. But if we get
- // removed then we abort. We don't want to call _decr_refcnt()
- // because that will require two locks.
- this->refcount_--;
- if (this->refcount_ == 0)
- {
- ace_mon.release ();
- this->event_channel_->destroy_proxy_push_supplier (this);
- return;
- }
-
- if (this->is_connected_i () == 0)
- return; // ACE_THROW (RtecEventComm::Disconnected ());????
-
- if (this->suspended_ != 0)
- return;
-
- consumer =
- RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
-
- // The refcount cannot be zero, because we have at least two
- // references,
- }
-
- consumer->push (event, ACE_TRY_ENV);
-}
-
-void
-TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
- const RtecEventComm::EventSet& event,
- CORBA::Environment& ACE_TRY_ENV)
-{
- if (this->is_connected_i () == 0)
- return; // TAO_THROW (RtecEventComm::Disconnected ());????
-
- if (this->suspended_ != 0)
- return;
-
- RtecEventComm::PushConsumer_var consumer =
- RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
-
- {
- TAO_EC_Unlock reverse_lock (*this->lock_);
-
- ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- consumer->push (event, ACE_TRY_ENV);
- }
-
- // The reference count was incremented just before delegating on the
- // dispatching strategy, in this can we need to decrement it *now*.
- this->refcount_--;
-}
-
-void
-TAO_EC_ProxyPushSupplier::push_timeout (
- TAO_EC_Filter* timeout_filter,
- const RtecEventComm::EventSet &event,
- TAO_EC_QOS_Info& qos_info,
- CORBA::Environment &ACE_TRY_ENV)
-{
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- timeout_filter->push (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return;
- }
-
- this->event_channel_->destroy_proxy_push_supplier (this);
+ // Do not take a lock, this is a call back from our child filter, so
+ // we are holding the lock already (in the filter() method).
+ if (this->is_connected_i ())
+ this->event_channel_->dispatching ()->push_nocopy (this,
+ this->consumer_.in (),
+ event,
+ qos_info,
+ ACE_TRY_ENV);
+ this->child_->clear ();
}
void
@@ -440,48 +250,5 @@ TAO_EC_ProxyPushSupplier::can_match (
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- if (this->is_connected_i () == 0)
- return 0;
-
return this->child_->can_match (header);
}
-
-int
-TAO_EC_ProxyPushSupplier::add_dependencies (
- const RtecEventComm::EventHeader &header,
- const TAO_EC_QOS_Info &qos_info,
- CORBA::Environment &ACE_TRY_ENV)
-{
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- return this->child_->add_dependencies (header,
- qos_info,
- ACE_TRY_ENV);
-}
-
-PortableServer::POA_ptr
-TAO_EC_ProxyPushSupplier::_default_POA (CORBA::Environment&)
-{
- return PortableServer::POA::_duplicate (this->default_POA_.in ());
-}
-
-void
-TAO_EC_ProxyPushSupplier::_add_ref (CORBA::Environment &)
-{
- this->_incr_refcnt ();
-}
-
-void
-TAO_EC_ProxyPushSupplier::_remove_ref (CORBA::Environment &)
-{
- this->_decr_refcnt ();
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */