summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp250
1 files changed, 57 insertions, 193 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index bbc4163bf0c..1677ffc9ad0 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -3,7 +3,6 @@
#include "EC_ProxySupplier.h"
#include "EC_Dispatching.h"
#include "EC_Filter_Builder.h"
-#include "EC_QOS_Info.h"
#include "EC_Event_Channel.h"
#if ! defined (__ACE_INLINE__)
@@ -32,102 +31,56 @@ TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
this->event_channel_->destroy_supplier_lock (this->lock_);
}
-void
-TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &)
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
{
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ return this->refcount_++;
}
-void
-TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &)
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
{
-}
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return this->refcount_;
+ }
-void
-TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &)
-{
+ // Notify the event channel
+ this->event_channel_->destroy_proxy_push_supplier (this);
+ return 0;
}
-void
-TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &)
+PortableServer::POA_ptr
+TAO_EC_ProxyPushSupplier::_default_POA (CORBA::Environment&)
{
+ return PortableServer::POA::_duplicate (this->default_POA_.in ());
}
void
-TAO_EC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushConsumer*,
+ CORBA::Environment &)
{
- RtecEventComm::PushConsumer_var consumer;
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return;
-
- consumer = this->consumer_._retn ();
-
- this->cleanup_i ();
- }
-
- this->deactivate (ACE_TRY_ENV);
- ACE_CHECK;
-
- consumer->disconnect_push_consumer (ACE_TRY_ENV);
-
- this->_decr_refcnt ();
}
void
-TAO_EC_ProxyPushSupplier::cleanup_i (void)
+TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushConsumer*,
+ CORBA::Environment &)
{
- this->consumer_ =
- RtecEventComm::PushConsumer::_nil ();
-
- // @@ Why don't we have a destroy() method in the
- // filter_builder?
- delete this->child_;
- this->child_ = 0;
}
void
-TAO_EC_ProxyPushSupplier::deactivate (CORBA::Environment &ACE_TRY_ENV)
-{
- PortableServer::POA_var poa =
- this->_default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (this, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
-}
-
-CORBA::ULong
-TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
+TAO_EC_ProxyPushSupplier::connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment &)
{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- return this->refcount_++;
}
-CORBA::ULong
-TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
+void
+TAO_EC_ProxyPushSupplier::disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment &)
{
- {
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- this->refcount_--;
- if (this->refcount_ != 0)
- return this->refcount_;
- }
-
- // Notify the event channel
- this->event_channel_->destroy_proxy_push_supplier (this);
- return 0;
}
void
@@ -143,44 +96,7 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer (
ACE_CHECK;
if (this->is_connected_i ())
- {
- if (this->event_channel_->consumer_reconnect () == 0)
- ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
-
- // Re-connections are allowed, go ahead and disconnect the
- // consumer...
- this->cleanup_i ();
-
- // @@ Are there any race conditions here:
- // + The lock is released, but the object is marked as
- // disconnected already, so:
- // - No events will be pushed
- // - Any disconnects will just return
- // + But another thread could invoke connect_push_consumer()
- // again, notice that by the time the lock is acquired
- // again the connected() call may still be running.
- // It seems like we need delayed operations again, or
- // something similar to what the POA does in this
- // scenario.
- // Meanwhile we can tell the users: "if it hurts don't do
- // it".
- //
- TAO_EC_Unlock reverse_lock (*this->lock_);
-
- {
- ACE_GUARD_THROW_EX (
- TAO_EC_Unlock, ace_mon, reverse_lock,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- this->event_channel_->disconnected (this, ACE_TRY_ENV);
- ACE_CHECK;
- }
-
- // What if a second thread connected us after this?
- if (this->is_connected_i ())
- return;
- }
+ ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
this->consumer_ =
RtecEventComm::PushConsumer::_duplicate (push_consumer);
@@ -188,10 +104,7 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer (
this->child_ =
this->event_channel_->filter_builder ()->build (this,
- this->qos_,
- ACE_TRY_ENV);
- ACE_CHECK;
-
+ this->qos_);
this->adopt_child (this->child_);
}
@@ -212,13 +125,24 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
if (this->is_connected_i () == 0)
ACE_THROW (CORBA::BAD_INV_ORDER ());
- this->cleanup_i ();
+ this->consumer_ =
+ RtecEventComm::PushConsumer::_nil ();
+
+ // @@ Why don't we have a destroy() method in the filter_builder?
+ delete this->child_;
+ this->child_ = 0;
}
- this->deactivate (ACE_TRY_ENV);
+ PortableServer::POA_var poa =
+ this->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (this, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
ACE_CHECK;
- // Notify the event channel....
+ // Notify the event channel...
this->event_channel_->disconnected (this, ACE_TRY_ENV);
this->_decr_refcnt ();
@@ -251,25 +175,12 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
-
- {
- ACE_GUARD_THROW_EX (
+ ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- if (this->is_connected_i () == 0)
- return 0;
-
- result =
- this->child_->filter (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
- }
+ ACE_CHECK_RETURN (0);
- this->event_channel_->destroy_proxy_push_supplier (this);
- return result;
+ return this->child_->filter (event, qos_info, ACE_TRY_ENV);
}
int
@@ -277,25 +188,12 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
-
- {
- ACE_GUARD_THROW_EX (
+ ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- if (this->is_connected_i () == 0)
- return 0;
-
- result =
- this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
- }
+ ACE_CHECK_RETURN (0);
- this->event_channel_->destroy_proxy_push_supplier (this);
- return result;
+ return this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
}
void
@@ -310,8 +208,7 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
event,
qos_info,
ACE_TRY_ENV);
- if (this->child_ != 0)
- this->child_->clear ();
+ this->child_->clear ();
}
void
@@ -326,8 +223,7 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
event,
qos_info,
ACE_TRY_ENV);
- if (this->child_ != 0)
- this->child_->clear ();
+ this->child_->clear ();
}
void
@@ -374,6 +270,9 @@ TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
const RtecEventComm::EventSet& event,
CORBA::Environment& ACE_TRY_ENV)
{
+ // Just reset the refcount, increased by the push() method.
+ this->refcount_--;
+
if (this->is_connected_i () == 0)
return; // TAO_THROW (RtecEventComm::Disconnected ());????
@@ -391,10 +290,6 @@ TAO_EC_ProxyPushSupplier::reactive_push_to_consumer (
ACE_CHECK;
consumer->push (event, ACE_TRY_ENV);
}
-
- // The reference count was incremented just before delegating on the
- // dispatching strategy, in this can we need to decrement it *now*.
- this->refcount_--;
}
void
@@ -404,18 +299,12 @@ TAO_EC_ProxyPushSupplier::push_timeout (
TAO_EC_QOS_Info& qos_info,
CORBA::Environment &ACE_TRY_ENV)
{
- {
- ACE_GUARD_THROW_EX (
+ ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- timeout_filter->push (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return;
- }
+ ACE_CHECK;
- this->event_channel_->destroy_proxy_push_supplier (this);
+ timeout_filter->push (event, qos_info, ACE_TRY_ENV);
}
void
@@ -440,34 +329,9 @@ TAO_EC_ProxyPushSupplier::can_match (
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
- if (this->is_connected_i () == 0)
- return 0;
-
return this->child_->can_match (header);
}
-int
-TAO_EC_ProxyPushSupplier::add_dependencies (
- const RtecEventComm::EventHeader &header,
- const TAO_EC_QOS_Info &qos_info,
- CORBA::Environment &ACE_TRY_ENV)
-{
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK_RETURN (0);
-
- return this->child_->add_dependencies (header,
- qos_info,
- ACE_TRY_ENV);
-}
-
-PortableServer::POA_ptr
-TAO_EC_ProxyPushSupplier::_default_POA (CORBA::Environment&)
-{
- return PortableServer::POA::_duplicate (this->default_POA_.in ());
-}
-
void
TAO_EC_ProxyPushSupplier::_add_ref (CORBA::Environment &)
{