summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-04-18 16:12:29 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-04-18 16:12:29 +0000
commit0e2e2f96605b5c2e8127e6e8becb8ec3b672bcf5 (patch)
tree02983a2f269a9608879831e8dc037d2259a39deb
parentd46a0d285ecec084ada0b132cf1d0bf09ad0a30e (diff)
downloadATCD-0e2e2f96605b5c2e8127e6e8becb8ec3b672bcf5.tar.gz
ChangeLogTag:Tue Apr 18 08:52:27 2000 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--TAO/ChangeLogs/ChangeLog-02a114
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h7
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i2
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp8
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp8
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp101
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h47
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i8
-rw-r--r--TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp159
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h13
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i22
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp3
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp27
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h79
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i12
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h64
-rw-r--r--TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp1
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h3
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp236
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h52
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i15
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp102
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile.CosNotification2
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Basic.dsw12
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Makefile3
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Random.cpp565
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Random.dsp102
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Random.h193
-rwxr-xr-xTAO/orbsvcs/tests/Event/Basic/run_test.pl10
-rw-r--r--TAO/orbsvcs/tests/Event/Event.dsw15
37 files changed, 1731 insertions, 322 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a
index 84857f675f7..06ce057ffd9 100644
--- a/TAO/ChangeLogs/ChangeLog-02a
+++ b/TAO/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,91 @@
+Tue Apr 18 08:52:27 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h:
+ * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i:
+ * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp:
+ * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h:
+ * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i:
+ * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp:
+ New implementations of the Guard idiom that deal with safely
+ incrementing and decrementing the reference count on a proxy.
+
+ * orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp:
+ Cosmetic fixes.
+
+ * orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp:
+ The proxy should be deactivated *before* removing it from the
+ collection.
+
+ * orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp:
+ The reconnected() call was leaking resources when the object was
+ not in the set.
+
+ * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h:
+ * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i:
+ * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp:
+ The Write_Guard was copying not only the contents of the
+ original collection, but also the reference count value.
+ There was a subtle race condition, the destructor has to wait
+ until all the pending writes have completed before removing the
+ object.
+
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i:
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp:
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp:
+ Use Guard idioms to safely remove reference count objects and
+ destroy some entities when their reference count reaches 0.
+ In some cases resources where leaked because the destructor was
+ assuming that shutdown() or something similar was invoked. In
+ general this is correct, but in concurrent scenarios a thread
+ may invoke shutdown() triggering the destruction of the object,
+ while another thread is still making changes. In such a case
+ the destructor is responsible for the final cleanup operations,
+ because the shutdown() method is not invoked again.
+
+ * orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp:
+ Removed bogus inline.
+
+ * orbsvcs/tests/Event/Event.dsw:
+ * orbsvcs/tests/Event/Basic/Basic.dsw:
+ * orbsvcs/tests/Event/Basic/Makefile:
+ * orbsvcs/tests/Event/Basic/Random.dsp:
+ * orbsvcs/tests/Event/Basic/Random.h:
+ * orbsvcs/tests/Event/Basic/Random.cpp:
+ * orbsvcs/tests/Event/Basic/Random.dsp:
+ * orbsvcs/tests/Event/Basic/run_test.pl:
+ New test for the Event service.
+ The test performs random operations on the event service, adding
+ consumers, suppliers, then removing them, pushing events, etc.
+ The operations are performed by multiple concurrent threads, in
+ many cases in the context of an upcall to some consumer.
+ This is the worst (or is it best?) torture test for the event
+ channel.
+
+ * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h:
+ * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp:
+ Add an optional argument to the ECG_UDP_EH::open() method,
+ allowing the user to reuse the same socket address in multiple
+ calls. Thanks to Tom Ziomek <tomz@cc.comm.mot.com> for
+ providing the patch for this feature.
+
+ * orbsvcs/orbsvcs/Makefile.CosNotification:
+ More missing libraries.
+
+ * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i:
+ * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp:
+ * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp:
+ Systematically use Guards to increment and decrement reference
+ counts, and to destroy objects once their count reaches 0.
+ Simplified the memory management of some objects by clearly
+ delineating who is responsible for their destruction.
+
Tue Apr 18 08:19:20 2000 Carlos O'Ryan <coryan@uci.edu>
* orbsvcs/tests/Concurrency/CC_command.h:
@@ -8,19 +96,19 @@ Tue Apr 18 08:19:20 2000 Carlos O'Ryan <coryan@uci.edu>
Tue Apr 18 10:14:05 2000 Jeff Parsons <parsons@cs.wustl.edu>
- * InterfaceS.h:
- * InterfaceS.cpp:
- Removed from TAO, code moved to the IFR_Service
- directory.
-
- * Makefile:
- * TAO.dsp:
- * TAO_static.dsp:
- * POA_CORBA.h:
- 'TAO_Export' removed from declaration of IRObject,
- since it is never instantiated inside TAO, and
- declared there only because it must be in the
- CORBA namespace.
+ * InterfaceS.h:
+ * InterfaceS.cpp:
+ Removed from TAO, code moved to the IFR_Service
+ directory.
+
+ * Makefile:
+ * TAO.dsp:
+ * TAO_static.dsp:
+ * POA_CORBA.h:
+ 'TAO_Export' removed from declaration of IRObject,
+ since it is never instantiated inside TAO, and
+ declared there only because it must be in the
+ CORBA namespace.
Mon Apr 17 20:12:13 2000 Vishal <vishal@cs.wustl.edu>
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp
index 210e3a91126..b10a3e7974d 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp
@@ -1,7 +1,6 @@
// $Id$
#include "CEC_Dispatching_Task.h"
-#include "CEC_ProxyPushSupplier.h"
#if ! defined (__ACE_INLINE__)
#include "CEC_Dispatching_Task.i"
@@ -90,6 +89,11 @@ TAO_CEC_Shutdown_Task_Command::execute (CORBA::Environment&)
// ****************************************************************
+TAO_CEC_Push_Command::~TAO_CEC_Push_Command (void)
+{
+ this->proxy_->_decr_refcnt ();
+}
+
int
TAO_CEC_Push_Command::execute (CORBA::Environment& ACE_TRY_ENV)
{
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h
index 6acb7ea5260..35752cb37f3 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h
@@ -26,9 +26,7 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "orbsvcs/CosEvent/event_export.h"
-#include "tao/corba.h"
-
-class TAO_CEC_ProxyPushSupplier;
+#include "CEC_ProxyPushSupplier.h"
class TAO_Event_Export TAO_CEC_Dispatching_Task : public ACE_Task<ACE_SYNCH>
{
@@ -99,6 +97,9 @@ public:
ACE_Allocator *mb_allocator);
// Constructor
+ virtual ~TAO_CEC_Push_Command (void);
+ // Destructor
+
virtual int execute (CORBA::Environment&);
// Command callback
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i
index cecff7e37f2..577483a03ab 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i
@@ -50,4 +50,6 @@ TAO_CEC_Push_Command::TAO_CEC_Push_Command (TAO_CEC_ProxyPushSupplier* proxy,
//
// @@ TODO
this->event_ = event;
+
+ this->proxy_->_incr_refcnt ();
}
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
index 3ae07eb5abd..742cc3caf53 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp
@@ -209,17 +209,15 @@ TAO_CEC_ProxyPullConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV)
// @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
-
supplier = this->supplier_._retn ();
-
- this->cleanup_i ();
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (supplier.in ()))
+ return;
+
ACE_TRY
{
supplier->disconnect_pull_supplier (ACE_TRY_ENV);
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
index f568a5a818a..9134639d795 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp
@@ -86,17 +86,15 @@ TAO_CEC_ProxyPullSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
// @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
-
consumer = this->consumer_._retn ();
-
- this->cleanup_i ();
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (consumer.in ()))
+ return;
+
ACE_TRY
{
consumer->disconnect_pull_consumer (ACE_TRY_ENV);
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
index a845882f7d7..77e16b469e3 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp
@@ -110,17 +110,15 @@ TAO_CEC_ProxyPushConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV)
// @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
-
supplier = this->supplier_._retn ();
-
- this->cleanup_i ();
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (supplier.in ()))
+ return;
+
ACE_TRY
{
supplier->disconnect_push_supplier (ACE_TRY_ENV);
@@ -218,39 +216,16 @@ TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- // @@ The following code is not exception safe, must fix, but the
- // canonical tricks don't work: the destroy_push_consumer () method
- // must be invoked only once the mutex is released, but after the
- // refcount get to zero, seems hard...
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return; // @@ THROW something???
-
- this->refcount_++;
- }
+ TAO_CEC_ProxyPushConsumer_Guard ace_mon (this->lock_,
+ this->refcount_,
+ this->event_channel_,
+ this);
+ if (!ace_mon.locked ())
+ return;
this->event_channel_->consumer_admin ()->push (event,
ACE_TRY_ENV);
ACE_CHECK;
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- this->refcount_--;
- if (this->refcount_ != 0)
- return;
- }
- this->event_channel_->destroy_proxy (this);
}
void
@@ -315,3 +290,61 @@ TAO_CEC_ProxyPushConsumer::_remove_ref (CORBA::Environment &)
{
this->_decr_refcnt ();
}
+
+// ****************************************************************
+
+TAO_CEC_ProxyPushConsumer_Guard::
+ TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock,
+ CORBA::ULong &refcount,
+ TAO_CEC_EventChannel *ec,
+ TAO_CEC_ProxyPushConsumer *proxy)
+ : lock_ (lock),
+ refcount_ (refcount),
+ event_channel_ (ec),
+ proxy_ (proxy),
+ locked_ (0)
+{
+ ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
+ // If the guard fails there is not much we can do, raising an
+ // exception is wrong, the client has *no* way to handle that kind
+ // of error. Even worse, there is no exception to raise in that
+ // case.
+ // @@ Returning something won't work either, the error should be
+ // logged though!
+
+ if (proxy->is_connected_i () == 0)
+ return;
+
+ this->locked_ = 1;
+ this->refcount_++;
+}
+
+TAO_CEC_ProxyPushConsumer_Guard::
+ ~TAO_CEC_ProxyPushConsumer_Guard (void)
+{
+ // This access is safe because guard objects are created on the
+ // stack, only one thread has access to them
+ if (!this->locked_)
+ return;
+
+ {
+ ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
+ // If the guard fails there is not much we can do, raising an
+ // exception is wrong, the client has *no* way to handle that kind
+ // of error. Even worse, there is no exception to raise in that
+ // case.
+ // @@ Returning something won't work either, the error should be
+ // logged though!
+
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ this->event_channel_->destroy_proxy (this->proxy_);
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
index dc856f9f783..9cee6bb948a 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h
@@ -108,6 +108,9 @@ protected:
// Set the supplier, used by some implementations to change the
// policies used when invoking operations on the supplier.
+ friend class TAO_CEC_ProxyPushConsumer_Guard;
+ // The guard needs access to the following protected methods.
+
CORBA::Boolean is_connected_i (void) const;
// The private version (without locking) of is_connected().
@@ -131,6 +134,50 @@ private:
// Store the default POA.
};
+// ****************************************************************
+
+class TAO_Event_Export TAO_CEC_ProxyPushConsumer_Guard
+{
+ // = TITLE
+ // A Guard for the ProxyPushConsumer reference count
+ //
+ // = DESCRIPTION
+ // This is a helper class used in the implementation of
+ // ProxyPushConumer. It provides a Guard mechanism to increment
+ // the reference count on the proxy, eliminating the need to hold
+ // mutexes during long operations.
+ //
+public:
+ TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock,
+ CORBA::ULong &refcount,
+ TAO_CEC_EventChannel *ec,
+ TAO_CEC_ProxyPushConsumer *proxy);
+ // Constructor
+
+ ~TAO_CEC_ProxyPushConsumer_Guard (void);
+ // Destructor
+
+ int locked (void) const;
+ // Returns 1 if the reference count successfully acquired
+
+private:
+ ACE_Lock *lock_;
+ // The lock used to protect the reference count
+
+ CORBA::ULong &refcount_;
+ // The reference count
+
+ TAO_CEC_EventChannel *event_channel_;
+ // The event channel used to destroy the proxy
+
+ TAO_CEC_ProxyPushConsumer *proxy_;
+ // The proxy whose lifetime is controlled by the reference count
+
+ int locked_;
+ // This flag is set to 1 if the reference count was successfully
+ // acquired.
+};
+
#if defined (__ACE_INLINE__)
#include "CEC_ProxyPushConsumer.i"
#endif /* __ACE_INLINE__ */
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i
index b6b7f318acc..535b34f58cd 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i
@@ -34,3 +34,11 @@ TAO_CEC_ProxyPushConsumer::supplier (CosEventComm::PushSupplier_ptr supplier)
this->supplier_i (supplier);
}
+
+// ****************************************************************
+
+ACE_INLINE int
+TAO_CEC_ProxyPushConsumer_Guard::locked (void) const
+{
+ return this->locked_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
index 415c877aa0c..77e1fe55db3 100644
--- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp
@@ -4,6 +4,8 @@
#include "CEC_Dispatching.h"
#include "CEC_EventChannel.h"
#include "CEC_ConsumerControl.h"
+#include "orbsvcs/ESF/ESF_RefCount_Guard.h"
+#include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
#if ! defined (__ACE_INLINE__)
#include "CEC_ProxyPushSupplier.i"
@@ -86,17 +88,15 @@ TAO_CEC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
// @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
-
consumer = this->consumer_._retn ();
-
- this->cleanup_i ();
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (consumer.in ()))
+ return;
+
ACE_TRY
{
consumer->disconnect_push_consumer (ACE_TRY_ENV);
@@ -110,28 +110,62 @@ TAO_CEC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
ACE_ENDTRY;
}
+typedef TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier> Destroy_Guard;
+
void
TAO_CEC_ProxyPushSupplier::push (const CORBA::Any &event,
CORBA::Environment &ACE_TRY_ENV)
{
- ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
+ Destroy_Guard auto_destroy (this->refcount_,
+ this->event_channel_,
+ this);
+
+ {
+ ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
- this->refcount_++;
- this->event_channel_->dispatching ()->push (this,
- event,
- ACE_TRY_ENV);
+ if (this->is_connected_i () == 0)
+ return;
+
+ TAO_ESF_RefCount_Guard<CORBA::ULong> cnt_mon (this->refcount_);
+
+ {
+ TAO_CEC_Unlock reverse_lock (*this->lock_);
+
+ ACE_GUARD (TAO_CEC_Unlock, ace_mon, reverse_lock);
+ this->event_channel_->dispatching ()->push (this,
+ event,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ }
}
void
TAO_CEC_ProxyPushSupplier::push_nocopy (CORBA::Any &event,
CORBA::Environment &ACE_TRY_ENV)
{
- ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
+ Destroy_Guard auto_destroy (this->refcount_,
+ this->event_channel_,
+ this);
+
+ {
+ ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
- this->refcount_++;
- this->event_channel_->dispatching ()->push_nocopy (this,
- event,
- ACE_TRY_ENV);
+ if (this->is_connected_i () == 0)
+ return;
+
+ TAO_ESF_RefCount_Guard<CORBA::ULong> cnt_mon (this->refcount_);
+
+ {
+ TAO_CEC_Unlock reverse_lock (*this->lock_);
+
+ ACE_GUARD (TAO_CEC_Unlock, ace_mon, reverse_lock);
+ this->event_channel_->dispatching ()->push_nocopy (this,
+ event,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ }
}
void
@@ -271,26 +305,11 @@ TAO_CEC_ProxyPushSupplier::push_to_consumer (const CORBA::Any& event,
// @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- // The reference count was increased just before pushing to the
- // dispatching module, we must decrease here. But if we get
- // removed then we abort. We don't want to call _decr_refcnt()
- // because that will require two locks.
- this->refcount_--;
- if (this->refcount_ == 0)
- {
- ace_mon.release ();
- this->event_channel_->destroy_proxy (this);
- return;
- }
-
if (this->is_connected_i () == 0)
return; // ACE_THROW (CosEventComm::Disconnected ());????
consumer =
CosEventComm::PushConsumer::_duplicate (this->consumer_.in ());
-
- // The refcount cannot be zero, because we have at least two
- // references,
}
ACE_TRY
@@ -328,54 +347,44 @@ TAO_CEC_ProxyPushSupplier::reactive_push_to_consumer (
const CORBA::Any& event,
CORBA::Environment& ACE_TRY_ENV)
{
- if (this->is_connected_i () == 0)
- return; // TAO_THROW (CosEventComm::Disconnected ());????
-
- CosEventComm::PushConsumer_var consumer =
- CosEventComm::PushConsumer::_duplicate (this->consumer_.in ());
-
+ CosEventComm::PushConsumer_var consumer;
{
- TAO_CEC_Unlock reverse_lock (*this->lock_);
-
- ACE_GUARD_THROW_EX (TAO_CEC_Unlock, ace_mon, reverse_lock,
- CORBA::INTERNAL ());
- // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
+ ACE_GUARD (ACE_Lock, ace_mon, *this->lock_);
+ if (this->is_connected_i () == 0)
+ return; // TAO_THROW (CosEventComm::Disconnected ());????
- ACE_TRY
- {
- consumer->push (event, ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
- {
- TAO_CEC_ConsumerControl *control =
- this->event_channel_->consumer_control ();
+ consumer =
+ CosEventComm::PushConsumer::_duplicate (this->consumer_.in ());
+ }
- control->consumer_not_exist (this, ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- ACE_CATCH (CORBA::SystemException, sysex)
- {
- TAO_CEC_ConsumerControl *control =
- this->event_channel_->consumer_control ();
+ ACE_TRY
+ {
+ consumer->push (event, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used)
+ {
+ TAO_CEC_ConsumerControl *control =
+ this->event_channel_->consumer_control ();
- control->system_exception (this,
- sysex,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- // Shouldn't happen, but does not hurt
- }
- ACE_ENDTRY;
- }
+ control->consumer_not_exist (this, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCH (CORBA::SystemException, sysex)
+ {
+ TAO_CEC_ConsumerControl *control =
+ this->event_channel_->consumer_control ();
- // The reference count was incremented just before delegating on the
- // dispatching strategy, in this can we need to decrement it *now*.
- this->refcount_--;
- // @@ What if it reaches 0???
+ control->system_exception (this,
+ sysex,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ // Shouldn't happen, but does not hurt
+ }
+ ACE_ENDTRY;
}
CORBA::Boolean
@@ -423,6 +432,10 @@ TAO_CEC_ProxyPushSupplier::_remove_ref (CORBA::Environment &)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier>;
+
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier>
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp
index 268b10ab444..f2f32b69fcc 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp
@@ -43,7 +43,8 @@ TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL>
TAO_ESF_Copy_On_Write<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>::
TAO_ESF_Copy_On_Write (void)
- : writing_ (0),
+ : pending_writes_ (0),
+ writing_ (0),
cond_ (mutex_)
{
ACE_NEW (this->collection_, Collection);
@@ -53,7 +54,13 @@ template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL>
TAO_ESF_Copy_On_Write<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>::
~TAO_ESF_Copy_On_Write (void)
{
+ ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_);
+
+ while (this->pending_writes_ != 0)
+ this->cond_.wait ();
+
this->collection_->_decr_refcnt ();
+ this->collection_ = 0;
}
template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> void
@@ -79,6 +86,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>::
{
Write_Guard ace_mon (this->mutex_,
this->cond_,
+ this->pending_writes_,
this->writing_,
this->collection_);
@@ -93,6 +101,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>::
{
Write_Guard ace_mon (this->mutex_,
this->cond_,
+ this->pending_writes_,
this->writing_,
this->collection_);
@@ -107,6 +116,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>::
{
Write_Guard ace_mon (this->mutex_,
this->cond_,
+ this->pending_writes_,
this->writing_,
this->collection_);
@@ -117,10 +127,10 @@ template<class PROXY, class C, class I, ACE_SYNCH_DECL> void
TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>::
shutdown (CORBA::Environment &ACE_TRY_ENV)
{
- // @@ Do we really need to perform a copy here?
- // I believe so, but i don't have a good scenario for it.
+ // We need to perform a copy to follow the protocol.
Write_Guard ace_mon (this->mutex_,
this->cond_,
+ this->pending_writes_,
this->writing_,
this->collection_);
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h
index d90b8596a1a..e7fd96273af 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h
@@ -62,7 +62,7 @@ class TAO_ESF_Copy_On_Write_Read_Guard
public:
typedef TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
TAO_ESF_Copy_On_Write_Read_Guard (ACE_LOCK &mutex,
- Collection*& collection);
+ Collection *&collection);
// Constructor
~TAO_ESF_Copy_On_Write_Read_Guard (void);
@@ -92,9 +92,10 @@ class TAO_ESF_Copy_On_Write_Write_Guard
public:
typedef TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
TAO_ESF_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX_T &mutex,
- ACE_SYNCH_CONDITION_T &cond,
- int &writing_flag,
- Collection*& collection);
+ ACE_SYNCH_CONDITION_T &cond,
+ int &pending_writes,
+ int &writing_flag,
+ Collection*& collection);
// Constructor
~TAO_ESF_Copy_On_Write_Write_Guard (void);
@@ -105,6 +106,7 @@ public:
private:
ACE_SYNCH_MUTEX_T &mutex;
ACE_SYNCH_CONDITION_T &cond;
+ int &pending_writes;
int &writing_flag;
Collection *&collection;
};
@@ -150,6 +152,9 @@ private:
ACE_SYNCH_MUTEX_T mutex_;
// A mutex to serialize access to the collection pointer.
+ int pending_writes_;
+ // Number of pending writes
+
int writing_;
// If non-zero then a thread is changing the collection. Many
// threads can use the collection simulatenously, but only one
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i
index a4a30b965c1..12f8525ba09 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i
@@ -12,7 +12,7 @@ TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR>::
template<class COLLECTION, class ITERATOR, class ACE_LOCK> ACE_INLINE
TAO_ESF_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR,ACE_LOCK>::
TAO_ESF_Copy_On_Write_Read_Guard (ACE_LOCK &m,
- Collection*& collection_ref)
+ Collection*& collection_ref)
: collection (0),
mutex (m)
{
@@ -37,18 +37,22 @@ TAO_ESF_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR,ACE_LOCK>::
template<class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> ACE_INLINE
TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>::
TAO_ESF_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX_T &m,
- ACE_SYNCH_CONDITION_T &c,
- int &w,
- Collection*& cr)
+ ACE_SYNCH_CONDITION_T &c,
+ int &p,
+ int &w,
+ Collection*& cr)
: copy (0),
mutex (m),
cond (c),
+ pending_writes (p),
writing_flag (w),
collection (cr)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex);
+ this->pending_writes++;
+
while (this->writing_flag != 0)
this->cond.wait ();
@@ -58,7 +62,13 @@ TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>::
// Copy outside the mutex, because it may take a long time.
// Nobody can change it, because it is protected by the
// writing_flag.
- ACE_NEW (this->copy, Collection (*this->collection));
+
+ // First initialize it (with the correct reference count
+ ACE_NEW (this->copy, Collection);
+ // Copy the contents
+ this->copy->collection = this->collection->collection;
+
+ // Increase the reference counts
ITERATOR end = this->copy->collection.end ();
for (ITERATOR i = this->copy->collection.begin (); i != end; ++i)
{
@@ -77,6 +87,8 @@ TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>::
tmp = this->collection;
this->collection = this->copy;
this->writing_flag = 0;
+ this->pending_writes--;
+
this->cond.signal ();
}
// Delete outside the mutex, because it may take a long time.
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp
index bc73c7d56cd..61668be4601 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp
@@ -54,7 +54,8 @@ TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>::
{
worker->work (*i, ACE_TRY_ENV);
ACE_CHECK;
- }
+
+ }
}
template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> int
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp
index 144f7ff7756..d44befa55c0 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp
@@ -77,6 +77,9 @@ TAO_ESF_Proxy_Admin<EC,P,I>::disconnected (P *proxy,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC (())
{
+ proxy->deactivate (ACE_TRY_ENV);
+ ACE_CHECK; // Cannot happen, just following the discipline.
+
ACE_TRY
{
this->collection_->disconnected (proxy, ACE_TRY_ENV);
@@ -93,9 +96,6 @@ TAO_ESF_Proxy_Admin<EC,P,I>::disconnected (P *proxy,
// that has an exception for "could not acquire a mutex".
}
ACE_ENDTRY;
-
- proxy->deactivate (ACE_TRY_ENV);
- ACE_CHECK; // Cannot happen, just following the discipline.
}
#endif /* TAO_ESF_PROXY_ADMIN_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp
index 5fd6f3adc5b..f1244c91cca 100644
--- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp
@@ -43,7 +43,10 @@ TAO_ESF_Proxy_List<PROXY>::reconnected (PROXY *proxy,
CORBA::Environment &)
{
int r = this->impl_.insert (proxy);
- if (r == 0 || r == 1)
+ if (r == 0)
+ return;
+
+ if (r == 1)
{
// Reference count is incremented by the callers to [re]connected.
// @@ Find out if the protocol could be simplified, and decoupling
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp
new file mode 100644
index 00000000000..83a1490530f
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp
@@ -0,0 +1,27 @@
+// $Id$
+
+#ifndef TAO_ESF_PROXY_REFCOUNT_GUARD_CPP
+#define TAO_ESF_PROXY_REFCOUNT_GUARD_CPP
+
+#include "ESF_Proxy_RefCount_Guard.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ESF_Proxy_RefCount_Guard.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ESF, ESF_Proxy_RefCount_Guard, "$Id$")
+
+template<class EC, class P>
+TAO_ESF_Proxy_RefCount_Guard<EC,P>::~TAO_ESF_Proxy_RefCount_Guard (void)
+{
+ // Checking for 0 is safe, once the variable reaches 0 the value
+ // will stay there.
+ // @@ But what if the thread is switched to another processor just
+ // after release the mutex?
+ if (this->refcount_ == 0)
+ {
+ this->event_channel_->destroy_proxy (this->proxy_);
+ }
+}
+
+#endif /* TAO_ESF_PROXY_REFCOUNT_GUARD_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h
new file mode 100644
index 00000000000..4ecc951fc8c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h
@@ -0,0 +1,79 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Event Service Framework
+//
+// = FILENAME
+// ESF_Proxy_RefCount_Guard
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// ============================================================================
+
+#ifndef TAO_ESF_PROXY_REFCOUNT_GUARD_H
+#define TAO_ESF_PROXY_REFCOUNT_GUARD_H
+
+#include "tao/corbafwd.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class EVENT_CHANNEL, class PROXY>
+class TAO_ESF_Proxy_RefCount_Guard
+{
+ // = TITLE
+ // Reference count based guard.
+ //
+ // = DESCRIPTION
+ // A common idiom used on event services is to increment a
+ // reference count before starting a long running operation.
+ // The system can then execute the operation without any risk of
+ // having the underlying object destroyed. The advantage of using
+ // a reference count is that no mutex or lock needs to be held
+ // while the operation is beign executed.
+ // This class implements that common idiom, but it also adds hooks
+ // to handle scenarios where more than one operation is performed
+ // while holding the reference count.
+ //
+ // = TODO
+ // @@ The type of lock could be parametric
+ //
+public:
+ TAO_ESF_Proxy_RefCount_Guard (CORBA::ULong &refcount,
+ EVENT_CHANNEL *ec,
+ PROXY *proxy);
+ // Constructor
+
+ ~TAO_ESF_Proxy_RefCount_Guard (void);
+ // Destructor
+
+protected:
+ CORBA::ULong &refcount_;
+ // The reference count, if it gets to zero then the object must be
+ // destroyed
+
+ EVENT_CHANNEL *event_channel_;
+ // The event channel used to destroy the proxy
+
+ PROXY *proxy_;
+ // The proxy whose lifetime is controlled by the reference count
+};
+
+#if defined (__ACE_INLINE__)
+#include "ESF_Proxy_RefCount_Guard.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ESF_Proxy_RefCount_Guard.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("ESF_Proxy_RefCount_Guard.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_ESF_PROXY_REFCOUNT_GUARD_H */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i
new file mode 100644
index 00000000000..7639a89ee12
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i
@@ -0,0 +1,12 @@
+// $Id$
+
+template<class EC, class P> ACE_INLINE
+TAO_ESF_Proxy_RefCount_Guard<EC,P>::
+ TAO_ESF_Proxy_RefCount_Guard (CORBA::ULong &refcount,
+ EC *ec,
+ P *proxy)
+ : refcount_ (refcount),
+ event_channel_ (ec),
+ proxy_ (proxy)
+{
+}
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp
new file mode 100644
index 00000000000..432189332bb
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp
@@ -0,0 +1,14 @@
+// $Id$
+
+#ifndef TAO_ESF_REFCOUNT_GUARD_CPP
+#define TAO_ESF_REFCOUNT_GUARD_CPP
+
+#include "ESF_RefCount_Guard.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "ESF_RefCount_Guard.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ESF, ESF_RefCount_Guard, "$Id$")
+
+#endif /* TAO_ESF_REFCOUNT_GUARD_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h
new file mode 100644
index 00000000000..05731bb9fa7
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Event Service Framework
+//
+// = FILENAME
+// ESF_RefCount_Guard
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// ============================================================================
+
+#ifndef TAO_ESF_REFCOUNT_GUARD_H
+#define TAO_ESF_REFCOUNT_GUARD_H
+
+#include "ace/OS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class T>
+class TAO_ESF_RefCount_Guard
+{
+ // = TITLE
+ // Reference count based guard.
+ //
+ // = DESCRIPTION
+ // A common idiom used on event services is to increment a
+ // reference count before starting a long running operation.
+ // The system can then execute the operation without any risk of
+ // having the underlying object destroyed. The advantage of using
+ // a reference count is that no mutex or lock needs to be held
+ // while the operation is beign executed.
+ //
+public:
+ TAO_ESF_RefCount_Guard (T &refcount);
+ // Constructor
+
+ ~TAO_ESF_RefCount_Guard (void);
+ // Destructor
+
+protected:
+ T &refcount_;
+ // The reference count
+};
+
+#if defined (__ACE_INLINE__)
+#include "ESF_RefCount_Guard.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ESF_RefCount_Guard.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("ESF_RefCount_Guard.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_ESF_REFCOUNT_GUARD_H */
diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i
new file mode 100644
index 00000000000..b69511a8382
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i
@@ -0,0 +1,16 @@
+// $Id$
+
+template<class T> ACE_INLINE
+TAO_ESF_RefCount_Guard<T>::
+ TAO_ESF_RefCount_Guard (T &refcount)
+ : refcount_ (refcount)
+{
+ this->refcount_++;
+}
+
+template<class T> ACE_INLINE
+TAO_ESF_RefCount_Guard<T>::
+ ~TAO_ESF_RefCount_Guard (void)
+{
+ this->refcount_--;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
index 0c9ba5b26e0..f1d15c62827 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp
@@ -100,7 +100,6 @@ TAO_EC_Shutdown_Task_Command::execute (CORBA::Environment&)
// ****************************************************************
-ACE_INLINE
TAO_EC_Push_Command::~TAO_EC_Push_Command (void)
{
this->proxy_->_decr_refcnt ();
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
index 92593b21b5d..a5a1006f2bc 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp
@@ -1008,9 +1008,10 @@ TAO_ECG_UDP_EH::TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv)
}
int
-TAO_ECG_UDP_EH::open (const ACE_INET_Addr& ipaddr)
+TAO_ECG_UDP_EH::open (const ACE_INET_Addr& ipaddr,
+ int reuse_addr)
{
- if (this->dgram_.open (ipaddr) == -1)
+ if (this->dgram_.open (ipaddr, PF_INET, 0, reuse_addr) == -1)
return -1;
return this->reactor ()->register_handler (this,
ACE_Event_Handler::READ_MASK);
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
index 6d1d6cdb9b6..6379b6c7073 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h
@@ -524,7 +524,8 @@ class TAO_RTEvent_Export TAO_ECG_UDP_EH : public ACE_Event_Handler
public:
TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv);
- int open (const ACE_INET_Addr& ipaddr);
+ int open (const ACE_INET_Addr& ipaddr,
+ int reuse_addr = 0);
// Open the datagram and register with this->reactor()
int close (void);
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
index 2652eaec36d..f710ff20c03 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
@@ -24,13 +24,17 @@ TAO_EC_ProxyPushConsumer::
this->default_POA_ =
this->event_channel_->consumer_poa ();
+
+ this->qos_.is_gateway = 0;
}
TAO_EC_ProxyPushConsumer::~TAO_EC_ProxyPushConsumer (void)
{
this->event_channel_->destroy_consumer_lock (this->lock_);
+ this->cleanup_i ();
}
+
CORBA::Boolean
TAO_EC_ProxyPushConsumer::supplier_non_existent (
CORBA::Boolean_out disconnected,
@@ -60,91 +64,42 @@ void
TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_Supplier_Filter* filter = 0;
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return;
-
- filter = this->filter_;
- filter->_incr_refcnt ();
- }
-
- filter->connected (supplier, ACE_TRY_ENV);
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- filter->_decr_refcnt ();
- }
+ TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
+ this->refcount_,
+ this->event_channel_,
+ this);
+ if (!ace_mon.locked ())
+ return;
+
+ ace_mon.filter->connected (supplier, ACE_TRY_ENV);
}
void
TAO_EC_ProxyPushConsumer::reconnected (TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_Supplier_Filter* filter = 0;
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return;
-
- filter = this->filter_;
- filter->_incr_refcnt ();
- }
-
- filter->reconnected (supplier, ACE_TRY_ENV);
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- filter->_decr_refcnt ();
- }
+ TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
+ this->refcount_,
+ this->event_channel_,
+ this);
+ if (!ace_mon.locked ())
+ return;
+
+ ace_mon.filter->reconnected (supplier, ACE_TRY_ENV);
}
void
TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_Supplier_Filter* filter = 0;
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return;
-
- filter = this->filter_;
- filter->_incr_refcnt ();
- }
-
- filter->disconnected (supplier, ACE_TRY_ENV);
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- filter->_decr_refcnt ();
- }
+ TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
+ this->refcount_,
+ this->event_channel_,
+ this);
+ if (!ace_mon.locked ())
+ return;
+
+ ace_mon.filter->disconnected (supplier, ACE_TRY_ENV);
}
void
@@ -176,20 +131,23 @@ TAO_EC_ProxyPushConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV)
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
-
supplier = this->supplier_._retn ();
- this->filter_->shutdown (ACE_TRY_ENV);
- ACE_CHECK;
+ if (this->filter_ != 0)
+ {
+ this->filter_->shutdown (ACE_TRY_ENV);
+ ACE_CHECK;
- this->cleanup_i ();
+ this->cleanup_i ();
+ }
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (supplier.in ()))
+ return;
+
ACE_TRY
{
supplier->disconnect_push_supplier (ACE_TRY_ENV);
@@ -209,9 +167,12 @@ TAO_EC_ProxyPushConsumer::cleanup_i (void)
this->supplier_ =
RtecEventComm::PushSupplier::_nil ();
- this->filter_->unbind (this);
- this->filter_->_decr_refcnt ();
- this->filter_ = 0;
+ if (this->filter_ != 0)
+ {
+ this->filter_->unbind (this);
+ this->filter_->_decr_refcnt ();
+ this->filter_ = 0;
+ }
}
RtecEventChannelAdmin::ProxyPushConsumer_ptr
@@ -273,7 +234,7 @@ TAO_EC_ProxyPushConsumer::_decr_refcnt (void)
return this->refcount_;
}
- // Notify the event channel
+ // Use the event channel
this->event_channel_->destroy_proxy (this);
return 0;
}
@@ -346,44 +307,17 @@ TAO_EC_ProxyPushConsumer::push (const RtecEventComm::EventSet& event,
CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- // @@ The following code is not exception safe, must fix, but the
- // canonical tricks don't work: the destroy_push_consumer () method
- // must be invoked only once the mutex is released, but after the
- // refcount get to zero, seems hard...
-
- TAO_EC_Supplier_Filter* filter = 0;
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- if (this->is_connected_i () == 0)
- return; // @@ THROW something???
-
- filter = this->filter_;
- filter->_incr_refcnt ();
-
- this->refcount_++;
- }
+ TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_,
+ this->refcount_,
+ this->event_channel_,
+ this);
+ if (!ace_mon.locked ())
+ return;
// No need to keep the lock, the filter_ class is supposed to be
// thread safe....
- filter->push (event, ACE_TRY_ENV);
-
- {
- ACE_GUARD_THROW_EX (
- ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
- filter->_decr_refcnt ();
- this->refcount_--;
- if (this->refcount_ != 0)
- return;
- }
- this->event_channel_->destroy_proxy (this);
+ ace_mon.filter->push (event, ACE_TRY_ENV);
+ ACE_CHECK;
}
void
@@ -408,9 +342,6 @@ TAO_EC_ProxyPushConsumer::disconnect_push_consumer (
this->cleanup_i ();
}
- this->deactivate (ACE_TRY_ENV);
- ACE_CHECK;
-
// Notify the event channel...
this->event_channel_->disconnected (this, ACE_TRY_ENV);
ACE_CHECK;
@@ -453,3 +384,66 @@ TAO_EC_ProxyPushConsumer::_remove_ref (CORBA::Environment &)
{
this->_decr_refcnt ();
}
+
+// ****************************************************************
+
+TAO_EC_ProxyPushConsumer_Guard::
+ TAO_EC_ProxyPushConsumer_Guard (ACE_Lock *lock,
+ CORBA::ULong &refcount,
+ TAO_EC_Event_Channel *ec,
+ TAO_EC_ProxyPushConsumer *proxy)
+ : lock_ (lock),
+ refcount_ (refcount),
+ event_channel_ (ec),
+ proxy_ (proxy),
+ locked_ (0)
+{
+ ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
+ // If the guard fails there is not much we can do, raising an
+ // exception is wrong, the client has *no* way to handle that kind
+ // of error. Even worse, there is no exception to raise in that
+ // case.
+ // @@ Returning something won't work either, the error should be
+ // logged though!
+
+ if (proxy->is_connected_i () == 0)
+ return;
+
+ this->filter = this->proxy_->filter_i ();
+ this->filter->_incr_refcnt ();
+
+ this->locked_ = 1;
+ this->refcount_++;
+}
+
+TAO_EC_ProxyPushConsumer_Guard::
+ ~TAO_EC_ProxyPushConsumer_Guard (void)
+{
+ // This access is safe because guard objects are created on the
+ // stack, only one thread has access to them
+ if (!this->locked_)
+ return;
+
+ {
+ ACE_Guard<ACE_Lock> ace_mon (*this->lock_);
+ // If the guard fails there is not much we can do, raising an
+ // exception is wrong, the client has *no* way to handle that kind
+ // of error. Even worse, there is no exception to raise in that
+ // case.
+ // @@ Returning something won't work either, the error should be
+ // logged though!
+
+ this->filter->_decr_refcnt ();
+
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ this->event_channel_->destroy_proxy (this->proxy_);
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
index 967db669471..513d923809c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
@@ -147,9 +147,15 @@ protected:
// Set the supplier, used by some implementations to change the
// policies used when invoking operations on the supplier.
+ friend class TAO_EC_ProxyPushConsumer_Guard;
+ // The guard needs access to the following protected methods.
+
CORBA::Boolean is_connected_i (void) const;
// The private version (without locking) of is_connected().
+ TAO_EC_Supplier_Filter *filter_i (void) const;
+ // Return the current filter, assumes the locks are held.
+
void cleanup_i (void);
// Release the filter and the supplier
@@ -176,6 +182,52 @@ private:
// The strategy to do filtering close to the supplier
};
+// ****************************************************************
+
+class TAO_RTEvent_Export TAO_EC_ProxyPushConsumer_Guard
+{
+ // = TITLE
+ // A Guard for the ProxyPushConsumer reference count
+ //
+ // = DESCRIPTION
+ // This is a helper class used in the implementation of
+ // ProxyPushConumer. It provides a Guard mechanism to increment
+ // the reference count on the proxy and its filter, eliminating
+ // the need to hold mutexes during long operations.
+ //
+public:
+ TAO_EC_ProxyPushConsumer_Guard (ACE_Lock *lock,
+ CORBA::ULong &refcount,
+ TAO_EC_Event_Channel *ec,
+ TAO_EC_ProxyPushConsumer *proxy);
+ // Constructor
+
+ ~TAO_EC_ProxyPushConsumer_Guard (void);
+ // Destructor
+
+ int locked (void) const;
+ // Returns 1 if the reference count successfully acquired
+
+ TAO_EC_Supplier_Filter *filter;
+
+private:
+ ACE_Lock *lock_;
+ // The lock used to protect the reference count
+
+ CORBA::ULong &refcount_;
+ // The reference count
+
+ TAO_EC_Event_Channel *event_channel_;
+ // The event channel used to destroy the proxy
+
+ TAO_EC_ProxyPushConsumer *proxy_;
+ // The proxy whose lifetime is controlled by the reference count
+
+ int locked_;
+ // This flag is set to 1 if the reference count was successfully
+ // acquired.
+};
+
#if defined (__ACE_INLINE__)
#include "EC_ProxyConsumer.i"
#endif /* __ACE_INLINE__ */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
index caf77ce2e03..ace563c49e7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
@@ -49,3 +49,18 @@ TAO_EC_ProxyPushConsumer::publications_i (void) const
{
return this->qos_;
}
+
+ACE_INLINE TAO_EC_Supplier_Filter *
+TAO_EC_ProxyPushConsumer::filter_i (void) const
+{
+ return this->filter_;
+}
+
+// ****************************************************************
+
+ACE_INLINE int
+TAO_EC_ProxyPushConsumer_Guard::locked (void) const
+{
+ return this->locked_;
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index 7cc09c11a6a..6200c54beb4 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -7,6 +7,8 @@
#include "EC_Event_Channel.h"
#include "EC_Scheduling_Strategy.h"
#include "EC_ConsumerControl.h"
+#include "orbsvcs/ESF/ESF_RefCount_Guard.h"
+#include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h"
#if ! defined (__ACE_INLINE__)
#include "EC_ProxySupplier.i"
@@ -32,6 +34,7 @@ TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec)
TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
{
this->event_channel_->destroy_supplier_lock (this->lock_);
+ this->cleanup_i ();
}
void
@@ -95,17 +98,20 @@ TAO_EC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV)
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK;
- if (this->is_connected_i () == 0)
- return;
+ int connected = this->is_connected_i ();
consumer = this->consumer_._retn ();
- this->cleanup_i ();
+ if (connected)
+ this->cleanup_i ();
}
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
+ if (CORBA::is_nil (consumer.in ()))
+ return;
+
ACE_TRY
{
consumer->disconnect_push_consumer (ACE_TRY_ENV);
@@ -293,9 +299,6 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
this->cleanup_i ();
}
- this->deactivate (ACE_TRY_ENV);
- ACE_CHECK;
-
// Notify the event channel....
this->event_channel_->disconnected (this, ACE_TRY_ENV);
ACE_CHECK;
@@ -349,13 +352,18 @@ TAO_EC_ProxyPushSupplier::resume_connection (CORBA::Environment &ACE_TRY_ENV)
this->suspended_ = 0;
}
+typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier> Destroy_Guard;
+
int
TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
+ Destroy_Guard auto_destroy (this->refcount_,
+ this->event_channel_,
+ this);
+ int result = 0;
{
ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
@@ -367,11 +375,8 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
result =
this->child_->filter (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
+ ACE_CHECK_RETURN (0);
}
-
- this->event_channel_->destroy_proxy (this);
return result;
}
@@ -380,8 +385,11 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- int result = 0;
+ Destroy_Guard auto_destroy (this->refcount_,
+ this->event_channel_,
+ this);
+ int result = 0;
{
ACE_GUARD_THROW_EX (
ACE_Lock, ace_mon, *this->lock_,
@@ -393,11 +401,8 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
result =
this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
- if (this->refcount_ > 0)
- return result;
+ ACE_CHECK_RETURN (0);
}
-
- this->event_channel_->destroy_proxy (this);
return result;
}
@@ -406,20 +411,30 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // No need to grab the lock, it is beign held already by the
- // filter() method
- this->refcount_++;
-
+ // The mutex is already held by the caller (usually the filter()
+ // method)
if (this->is_connected_i () == 0)
return; // TAO_THROW (RtecEventComm::Disconnected ());????
if (this->suspended_ != 0)
return;
+ TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
+ // The guard will decrement the reference count, notice that the
+ // reference count can become 0, but this is not the right spot to
+ // check for that and destroy the object.
+ // If we did so then we would destroy the object, and consequently
+ // the mutex, but the mutex is used later when the stack unwinds and
+ // the filter() method tries to destroy the mutex (that originally
+ // acquired the mutex in the first place).
+ // So the correct thing to do is to just decrement the reference
+ // count and let the filter() method do the destruction.
+
RtecEventComm::PushConsumer_var consumer =
RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
{
+ // We have to release the lock to avoid dead-locks.
TAO_EC_Unlock reverse_lock (*this->lock_);
ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock,
@@ -431,19 +446,11 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
event,
qos_info,
ACE_TRY_ENV);
+ ACE_CHECK;
}
+
if (this->child_ != 0)
this->child_->clear ();
-
- // The reference count was incremented just before delegating on the
- // dispatching strategy, in this can we need to decrement it *now*.
- this->refcount_--;
- // @@ What if it reaches 0???
- if (this->refcount_ == 0)
- {
- this->lock_->release ();
- this->event_channel_->destroy_proxy (this);
- }
}
void
@@ -451,16 +458,25 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // No need to grab the lock, it is beign held already by the
- // filter() method
- this->refcount_++;
-
+ // The mutex is already held by the caller (usually the filter()
+ // method)
if (this->is_connected_i () == 0)
return; // TAO_THROW (RtecEventComm::Disconnected ());????
if (this->suspended_ != 0)
return;
+ TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_);
+ // The guard will decrement the reference count, notice that the
+ // reference count can become 0, but this is not the right spot to
+ // check for that and destroy the object.
+ // If we did so then we would destroy the object, and consequently
+ // the mutex, but the mutex is used later when the stack unwinds and
+ // the filter() method tries to destroy the mutex (that originally
+ // acquired the mutex in the first place).
+ // So the correct thing to do is to just decrement the reference
+ // count and let the filter() method do the destruction.
+
RtecEventComm::PushConsumer_var consumer =
RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ());
@@ -476,19 +492,11 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
event,
qos_info,
ACE_TRY_ENV);
+ ACE_CHECK;
}
+
if (this->child_ != 0)
this->child_->clear ();
-
- // The reference count was incremented just before delegating on the
- // dispatching strategy, in this can we need to decrement it *now*.
- this->refcount_--;
- // @@ What if it reaches 0???
- if (this->refcount_ == 0)
- {
- this->lock_->release ();
- this->event_channel_->destroy_proxy (this);
- }
}
void
@@ -681,6 +689,12 @@ TAO_EC_ProxyPushSupplier::_remove_ref (CORBA::Environment &)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class TAO_ESF_RefCount_Guard<CORBA::ULong>;
+template class TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier>;
+
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate TAO_ESF_RefCount_Guard<CORBA::ULong>
+#pragma instantiate TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier>
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Makefile.CosNotification b/TAO/orbsvcs/orbsvcs/Makefile.CosNotification
index b6f863a7608..47bce482d26 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile.CosNotification
+++ b/TAO/orbsvcs/orbsvcs/Makefile.CosNotification
@@ -14,7 +14,7 @@ SHLIB = $(LIBNAME).$(SOEXT)
VPATH=.:Notify
-ACE_SHLIBS = -lTAO_CosEvent -lTAO_CosTrading -lTAO_Svc_Utils -lTAO -lACE
+ACE_SHLIBS = -lTAO_CosEvent -lTAO_RTEvent -lTAO_RTSched -lTAO_CosTrading -lTAO_CosNaming -lTAO_Svc_Utils -lTAO -lACE
#----------------------------------------------------------------------------
# Include macros and targets
diff --git a/TAO/orbsvcs/tests/Event/Basic/Basic.dsw b/TAO/orbsvcs/tests/Event/Basic/Basic.dsw
index f8155f1d14d..47199426b9a 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Basic.dsw
+++ b/TAO/orbsvcs/tests/Event/Basic/Basic.dsw
@@ -123,6 +123,18 @@ Package=<4>
###############################################################################
+Project: "Random"=.\Random.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "Reconnect"=.\Reconnect.dsp - Package Owner=<4>
Package=<5>
diff --git a/TAO/orbsvcs/tests/Event/Basic/Makefile b/TAO/orbsvcs/tests/Event/Basic/Makefile
index 0a8887466db..0aa8c74c1dc 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Makefile
+++ b/TAO/orbsvcs/tests/Event/Basic/Makefile
@@ -26,7 +26,8 @@ BIN2 = Reconnect \
Bitmask \
Complex \
Gateway \
- Control
+ Control \
+ Random
#### If the orbsvcs library wasn't built with all components, don't
#### try to build certain tests.
diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.cpp b/TAO/orbsvcs/tests/Event/Basic/Random.cpp
new file mode 100644
index 00000000000..330677662e7
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/Random.cpp
@@ -0,0 +1,565 @@
+// $Id$
+
+#include "Random.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Default_Factory.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Time_Utilities.h"
+#include "ace/Arg_Shifter.h"
+
+ACE_RCSID(EC_Tests, Random, "$Id$")
+
+int
+main (int argc, char* argv[])
+{
+ RND_Driver driver;
+ return driver.run (argc, argv);
+}
+
+// ****************************************************************
+
+const int base_type = 20;
+
+void
+deactivate_servant (PortableServer::Servant servant,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ PortableServer::POA_var poa =
+ servant->_default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (servant, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (oid.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+
+RND_Driver::RND_Driver (void)
+ : timer_ (this),
+ nsuppliers_ (4),
+ nconsumers_ (4),
+ max_recursion_ (1)
+{
+ TAO_EC_Default_Factory::init_svcs ();
+}
+
+int
+RND_Driver::run (int argc, char *argv[])
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ ACE_Arg_Shifter arg_shifter (argc, argv);
+
+ while (arg_shifter.is_anything_left ())
+ {
+ char *arg = arg_shifter.get_current ();
+
+ if (ACE_OS::strcasecmp (arg, "-suppliers") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 1)
+ this->nsuppliers_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else if (ACE_OS::strcasecmp (arg, "-consumers") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 1)
+ this->nconsumers_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else if (ACE_OS::strcasecmp (arg, "-max_recursion") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ char* opt = arg_shifter.get_current ();
+ int n = ACE_OS::atoi (opt);
+ if (n >= 0)
+ this->max_recursion_ = n;
+ arg_shifter.consume_arg ();
+ }
+ }
+ else
+ arg_shifter.ignore_arg ();
+ }
+
+ // ****************************************************************
+
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ PortableServer::POA_var poa =
+ PortableServer::POA::_narrow (object.in (), ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ PortableServer::POAManager_var poa_manager =
+ poa->the_POAManager (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ poa_manager->activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (),
+ poa.in ());
+ attributes.consumer_reconnect = 1;
+ attributes.supplier_reconnect = 1;
+
+ TAO_EC_Event_Channel ec_impl (attributes);
+ ec_impl.activate (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ RtecEventChannelAdmin::EventChannel_var event_channel =
+ ec_impl._this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ // Obtain the consumer admin..
+ this->consumer_admin_ =
+ event_channel->for_consumers (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // Obtain the supplier admin..
+ this->supplier_admin_ =
+ event_channel->for_suppliers (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ {
+ // Let's say that the execution time for event 2 is 1
+ // milliseconds...
+ ACE_Time_Value tv (0, 50000);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are
+ // reserved for the EC...
+ qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
+ time,
+ 0);
+
+ this->timer_.connect (this->consumer_admin_.in (),
+ qos.get_ConsumerQOS (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ {
+ ACE_SupplierQOS_Factory qos;
+ qos.insert (0, base_type, 0, 1);
+
+ this->supplier_.connect (this->supplier_admin_.in (),
+ qos.get_SupplierQOS (),
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_NEW_RETURN (this->consumers_,
+ RND_Consumer*[this->nconsumers_],
+ 1);
+ for (int i = 0; i != this->nconsumers_; ++i)
+ {
+ ACE_NEW_RETURN (this->consumers_[i],
+ RND_Consumer (this),
+ 1);
+
+ CORBA::Object_var obj =
+ this->consumers_[i]->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_NEW_RETURN (this->suppliers_,
+ RND_Supplier*[this->nsuppliers_],
+ 1);
+ for (int j = 0; j != this->nsuppliers_; ++j)
+ {
+ ACE_NEW_RETURN (this->suppliers_[j],
+ RND_Supplier,
+ 1);
+ this->suppliers_[j]->activate ();
+
+ CORBA::Object_var obj =
+ this->suppliers_[j]->_this (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+
+ // ****************************************************************
+
+ ACE_Time_Value tv (30, 0);
+ orb->run (tv);
+
+ ACE_Thread_Manager::instance ()->wait ();
+
+ // ****************************************************************
+
+ {
+ for (int k = 0; k != this->nsuppliers_; ++k)
+ {
+ deactivate_servant (this->suppliers_[k],
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->suppliers_[k]->_remove_ref (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ delete[] this->suppliers_;
+ this->suppliers_ = 0;
+ }
+
+ // ****************************************************************
+
+ // We destroy now to verify that the callbacks work and do not
+ // produce any problems.
+ event_channel->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ {
+ for (int k = 0; k != this->nconsumers_; ++k)
+ {
+ deactivate_servant (this->consumers_[k],
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ this->consumers_[k]->_remove_ref (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ delete[] this->consumers_;
+ this->consumers_ = 0;
+ }
+
+ // ****************************************************************
+
+ deactivate_servant (&ec_impl,
+ ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ poa->destroy (1, 1, ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ // ****************************************************************
+
+ orb->destroy (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Random");
+ return 1;
+ }
+ ACE_ENDTRY;
+ return 0;
+}
+
+void
+RND_Driver::timer (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ int r = ACE_OS::rand ();
+ if (r < 0)
+ r = -r;
+
+ int n = r% 20;
+
+ switch (n)
+ {
+ case 0:
+ case 1:
+ {
+ // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n"));
+ if (e.header.source < this->max_recursion_)
+ {
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0] = e;
+ event[0].header.source ++;
+ this->supplier_.push (event, ACE_TRY_ENV);
+ }
+ }
+ break;
+
+ default:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ // ACE_DEBUG ((LM_DEBUG, "Received event\n"));
+ break;
+
+ case 6:
+ {
+ int n = ACE_OS::rand () % this->nsuppliers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n));
+
+ ACE_SupplierQOS_Factory qos;
+ qos.insert (0, base_type, 0, 1);
+
+ this->suppliers_[n]->connect (this->supplier_admin_.in (),
+ qos.get_SupplierQOS (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 7:
+ {
+ int n = ACE_OS::rand () % this->nconsumers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n));
+
+ ACE_ConsumerQOS_Factory qos;
+ qos.start_disjunction_group ();
+ qos.insert_type (base_type, 0);
+
+ this->consumers_[n]->connect (this->consumer_admin_.in (),
+ qos.get_ConsumerQOS (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 8:
+ {
+ int n = ACE_OS::rand () % this->nsuppliers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n));
+
+ this->suppliers_[n]->disconnect (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+
+ case 9:
+ {
+ int n = ACE_OS::rand () % this->nconsumers_;
+
+ // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n));
+
+ this->consumers_[n]->disconnect (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ break;
+ }
+}
+
+void
+RND_Driver::event (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ this->timer (e, ACE_TRY_ENV);
+}
+
+// ****************************************************************
+
+void
+RND_Timer::push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_TRY
+ {
+ this->driver_->timer (event[0], ACE_TRY_ENV);
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+}
+
+// ****************************************************************
+
+void
+RND_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin,
+ const RtecEventChannelAdmin::ConsumerQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ {
+ this->proxy_ = admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+ proxy =
+ RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_.in ());
+ }
+ RtecEventComm::PushConsumer_var me =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+ proxy->connect_push_consumer (me.in (),
+ qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+RND_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+ this->proxy_->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
+
+void
+RND_Consumer::push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ this->driver_->event (event[0], ACE_TRY_ENV);
+}
+
+void
+RND_Consumer::disconnect_push_consumer (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+// ****************************************************************
+
+void
+RND_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin,
+ const RtecEventChannelAdmin::SupplierQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV)
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ {
+ this->proxy_ = admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
+
+ proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
+ }
+ RtecEventComm::PushSupplier_var me =
+ this->_this (ACE_TRY_ENV);
+ ACE_CHECK;
+ proxy->connect_push_supplier (me.in (),
+ qos,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+}
+
+void
+RND_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+ this->proxy_->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+}
+
+void
+RND_Supplier::push_new_event (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+ event[0].header.type = base_type;
+ event[0].header.source = 0;
+
+ this->push (event, ACE_TRY_ENV);
+}
+
+void
+RND_Supplier::push (RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (CORBA::is_nil (this->proxy_.in ()))
+ return;
+
+ proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
+ }
+
+ proxy->push (event, ACE_TRY_ENV);
+}
+
+void
+RND_Supplier::disconnect_push_supplier (CORBA::Environment &)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+int
+RND_Supplier::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "Thread %t started\n"));
+ int percent = 10;
+ int niterations = 5000;
+ for (int i = 0; i != niterations; ++i)
+ {
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ this->push_new_event (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+ if (i * 100 / niterations >= percent)
+ {
+ ACE_DEBUG ((LM_DEBUG, "Thread %t %d%%\n", percent));
+ percent += 10;
+ }
+ }
+ ACE_DEBUG ((LM_DEBUG, "Thread %t completed\n"));
+ return 0;
+}
diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.dsp b/TAO/orbsvcs/tests/Event/Basic/Random.dsp
new file mode 100644
index 00000000000..51b53b40e3b
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/Random.dsp
@@ -0,0 +1,102 @@
+# Microsoft Developer Studio Project File - Name="Random" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=Random - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "Random.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "Random.mak" CFG="Random - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "Random - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "Random - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "Random - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\lib" /I "..\..\.." /I "..\..\..\.." /I "..\..\..\..\.." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ECTest.lib TAO.lib ace.lib TAO_CosNaming.lib TAO_RTEvent.lib TAO_RTSched.lib TAO_Svc_Utils.lib /nologo /subsystem:console /machine:I386 /libpath:"..\lib" /libpath:"..\..\..\orbsvcs" /libpath:"..\..\..\..\tao" /libpath:"..\..\..\..\..\ace"
+
+!ELSEIF "$(CFG)" == "Random - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Random"
+# PROP BASE Intermediate_Dir "Random"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\lib" /I "..\..\.." /I "..\..\..\.." /I "..\..\..\..\.." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ECTestd.lib TAOd.lib aced.lib TAO_CosNamingd.lib TAO_RTEventd.lib TAO_RTSchedd.lib TAO_Svc_Utilsd.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept /libpath:"..\lib" /libpath:"..\..\..\orbsvcs" /libpath:"..\..\..\..\tao" /libpath:"..\..\..\..\..\ace"
+
+!ENDIF
+
+# Begin Target
+
+# Name "Random - Win32 Release"
+# Name "Random - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter ".cpp"
+# Begin Source File
+
+SOURCE=.\Random.cpp
+# End Source File
+# End Group
+# Begin Group "Header Files"
+
+# PROP Default_Filter ".h"
+# Begin Source File
+
+SOURCE=.\Random.h
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.h b/TAO/orbsvcs/tests/Event/Basic/Random.h
new file mode 100644
index 00000000000..54fb8518cfe
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/Random.h
@@ -0,0 +1,193 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel tests
+//
+// = FILENAME
+// Random.h
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// ============================================================================
+
+#ifndef EC_RANDOM_H
+#define EC_RANDOM_H
+
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "ace/Task.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#if defined(_MSC_VER)
+#if (_MSC_VER >= 1200)
+#pragma warning(push)
+#endif /* _MSC_VER >= 1200 */
+#pragma warning(disable:4250)
+#endif /* _MSC_VER */
+
+class RND_Driver;
+
+class RND_Consumer
+ : public POA_RtecEventComm::PushConsumer
+ , public PortableServer::RefCountServantBase
+{
+ // = TITLE
+ // Simple consumer object
+ //
+ // = DESCRIPTION
+ //
+public:
+ RND_Consumer (RND_Driver *driver);
+ // Constructor
+
+ void push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ void disconnect_push_consumer (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ void connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin,
+ const RtecEventChannelAdmin::ConsumerQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV);
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+protected:
+ RND_Driver *driver_;
+ // The driver
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var proxy_;
+ // The supplier.
+
+ ACE_SYNCH_MUTEX lock_;
+ // Synch
+};
+
+inline
+RND_Consumer::RND_Consumer (RND_Driver *driver)
+ : driver_ (driver)
+{
+}
+
+// ****************************************************************
+
+class RND_Timer : public RND_Consumer
+{
+public:
+ RND_Timer (RND_Driver *driver);
+
+ void push (const RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+};
+
+inline
+RND_Timer::RND_Timer (RND_Driver *driver)
+ : RND_Consumer (driver)
+{
+}
+
+// ****************************************************************
+
+class RND_Supplier
+ : public POA_RtecEventComm::PushSupplier
+ , public PortableServer::RefCountServantBase
+ , public ACE_Task_Base
+{
+ // = TITLE
+ // Simple supplier object
+ //
+ // = DESCRIPTION
+ //
+public:
+ RND_Supplier (void);
+ // Constructor
+
+ void connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin,
+ const RtecEventChannelAdmin::SupplierQOS &qos,
+ CORBA::Environment &ACE_TRY_ENV);
+ void disconnect (CORBA::Environment &ACE_TRY_ENV);
+
+ void push_new_event (CORBA::Environment &ACE_TRY_ENV);
+ void push (RtecEventComm::EventSet &event,
+ CORBA::Environment &ACE_TRY_ENV);
+ // Push a single event...
+
+ virtual void disconnect_push_supplier (CORBA::Environment &ACE_TRY_ENV)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual int svc (void);
+ // Active method
+
+private:
+ RtecEventChannelAdmin::ProxyPushConsumer_var proxy_;
+ // The supplier.
+
+ ACE_SYNCH_MUTEX lock_;
+ // Synch
+};
+
+inline
+RND_Supplier::RND_Supplier (void)
+{
+}
+
+// ****************************************************************
+
+class RND_Driver
+{
+public:
+ RND_Driver (void);
+
+ int run (int argc, char *argv[]);
+ // Run the test
+
+ void timer (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV);
+ // The main timer has expired
+
+ void event (const RtecEventComm::Event &e,
+ CORBA::Environment &ACE_TRY_ENV);
+ // One of the consumers has received an event
+
+private:
+ RND_Driver (const RND_Driver &);
+ RND_Driver& operator= (const RND_Driver &);
+
+private:
+ RND_Timer timer_;
+ // The main timer
+
+ RND_Supplier supplier_;
+ // The supplier
+
+ int nsuppliers_;
+ // Number of suppliers
+
+ RND_Supplier **suppliers_;
+ // The suppliers
+
+ int nconsumers_;
+ // Number of consumers
+
+ RND_Consumer **consumers_;
+ // The consumers
+
+ int max_recursion_;
+ // Maximum recursion
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin_;
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin_;
+};
+
+#if defined(_MSC_VER) && (_MSC_VER >= 1200)
+#pragma warning(pop)
+#endif /* _MSC_VER */
+
+#endif /* EC_RANDOM_H */
diff --git a/TAO/orbsvcs/tests/Event/Basic/run_test.pl b/TAO/orbsvcs/tests/Event/Basic/run_test.pl
index 406348d38e9..7be2c926b05 100755
--- a/TAO/orbsvcs/tests/Event/Basic/run_test.pl
+++ b/TAO/orbsvcs/tests/Event/Basic/run_test.pl
@@ -145,4 +145,14 @@ if ($T->TimedWait (60) == -1) {
$T->Kill (); $T->TimedWait (1);
}
+print STDERR "\n\nRandom test\n";
+$T = Process::Create ($EXEPREFIX . "Random".$EXE_EXT,
+ " -ORBSvcConf $cwd$DIR_SEPARATOR" . "svc.conf"
+ . " -suppliers 4 -consumers 4 -max_recursion 1");
+if ($T->TimedWait (60) == -1) {
+ print STDERR "ERROR: Test timedout\n";
+ $status = 1;
+ $T->Kill (); $T->TimedWait (1);
+}
+
exit $status;
diff --git a/TAO/orbsvcs/tests/Event/Event.dsw b/TAO/orbsvcs/tests/Event/Event.dsw
index 7698b5dc1f1..a329ff8a7d7 100644
--- a/TAO/orbsvcs/tests/Event/Event.dsw
+++ b/TAO/orbsvcs/tests/Event/Event.dsw
@@ -195,6 +195,21 @@ Package=<4>
###############################################################################
+Project: "Random"=.\Basic\Random.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+ Begin Project Dependency
+ Project_Dep_Name ECTest
+ End Project Dependency
+}}}
+
+###############################################################################
+
Project: "Reconnect"=.\Basic\Reconnect.dsp - Package Owner=<4>
Package=<5>