diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-04-18 16:12:29 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-04-18 16:12:29 +0000 |
commit | 0e2e2f96605b5c2e8127e6e8becb8ec3b672bcf5 (patch) | |
tree | 02983a2f269a9608879831e8dc037d2259a39deb | |
parent | d46a0d285ecec084ada0b132cf1d0bf09ad0a30e (diff) | |
download | ATCD-0e2e2f96605b5c2e8127e6e8becb8ec3b672bcf5.tar.gz |
ChangeLogTag:Tue Apr 18 08:52:27 2000 Carlos O'Ryan <coryan@uci.edu>
37 files changed, 1731 insertions, 322 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 84857f675f7..06ce057ffd9 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,3 +1,91 @@ +Tue Apr 18 08:52:27 2000 Carlos O'Ryan <coryan@uci.edu> + + * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h: + * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i: + * orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp: + * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h: + * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i: + * orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp: + New implementations of the Guard idiom that deal with safely + incrementing and decrementing the reference count on a proxy. + + * orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp: + Cosmetic fixes. + + * orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp: + The proxy should be deactivated *before* removing it from the + collection. + + * orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp: + The reconnected() call was leaking resources when the object was + not in the set. + + * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h: + * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i: + * orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp: + The Write_Guard was copying not only the contents of the + original collection, but also the reference count value. + There was a subtle race condition, the destructor has to wait + until all the pending writes have completed before removing the + object. + + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h: + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i: + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp: + * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp: + Use Guard idioms to safely remove reference count objects and + destroy some entities when their reference count reaches 0. + In some cases resources where leaked because the destructor was + assuming that shutdown() or something similar was invoked. In + general this is correct, but in concurrent scenarios a thread + may invoke shutdown() triggering the destruction of the object, + while another thread is still making changes. In such a case + the destructor is responsible for the final cleanup operations, + because the shutdown() method is not invoked again. + + * orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp: + Removed bogus inline. + + * orbsvcs/tests/Event/Event.dsw: + * orbsvcs/tests/Event/Basic/Basic.dsw: + * orbsvcs/tests/Event/Basic/Makefile: + * orbsvcs/tests/Event/Basic/Random.dsp: + * orbsvcs/tests/Event/Basic/Random.h: + * orbsvcs/tests/Event/Basic/Random.cpp: + * orbsvcs/tests/Event/Basic/Random.dsp: + * orbsvcs/tests/Event/Basic/run_test.pl: + New test for the Event service. + The test performs random operations on the event service, adding + consumers, suppliers, then removing them, pushing events, etc. + The operations are performed by multiple concurrent threads, in + many cases in the context of an upcall to some consumer. + This is the worst (or is it best?) torture test for the event + channel. + + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h: + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp: + Add an optional argument to the ECG_UDP_EH::open() method, + allowing the user to reuse the same socket address in multiple + calls. Thanks to Tom Ziomek <tomz@cc.comm.mot.com> for + providing the patch for this feature. + + * orbsvcs/orbsvcs/Makefile.CosNotification: + More missing libraries. + + * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h: + * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i: + * orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp: + Systematically use Guards to increment and decrement reference + counts, and to destroy objects once their count reaches 0. + Simplified the memory management of some objects by clearly + delineating who is responsible for their destruction. + Tue Apr 18 08:19:20 2000 Carlos O'Ryan <coryan@uci.edu> * orbsvcs/tests/Concurrency/CC_command.h: @@ -8,19 +96,19 @@ Tue Apr 18 08:19:20 2000 Carlos O'Ryan <coryan@uci.edu> Tue Apr 18 10:14:05 2000 Jeff Parsons <parsons@cs.wustl.edu> - * InterfaceS.h: - * InterfaceS.cpp: - Removed from TAO, code moved to the IFR_Service - directory. - - * Makefile: - * TAO.dsp: - * TAO_static.dsp: - * POA_CORBA.h: - 'TAO_Export' removed from declaration of IRObject, - since it is never instantiated inside TAO, and - declared there only because it must be in the - CORBA namespace. + * InterfaceS.h: + * InterfaceS.cpp: + Removed from TAO, code moved to the IFR_Service + directory. + + * Makefile: + * TAO.dsp: + * TAO_static.dsp: + * POA_CORBA.h: + 'TAO_Export' removed from declaration of IRObject, + since it is never instantiated inside TAO, and + declared there only because it must be in the + CORBA namespace. Mon Apr 17 20:12:13 2000 Vishal <vishal@cs.wustl.edu> diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp index 210e3a91126..b10a3e7974d 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.cpp @@ -1,7 +1,6 @@ // $Id$ #include "CEC_Dispatching_Task.h" -#include "CEC_ProxyPushSupplier.h" #if ! defined (__ACE_INLINE__) #include "CEC_Dispatching_Task.i" @@ -90,6 +89,11 @@ TAO_CEC_Shutdown_Task_Command::execute (CORBA::Environment&) // **************************************************************** +TAO_CEC_Push_Command::~TAO_CEC_Push_Command (void) +{ + this->proxy_->_decr_refcnt (); +} + int TAO_CEC_Push_Command::execute (CORBA::Environment& ACE_TRY_ENV) { diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h index 6acb7ea5260..35752cb37f3 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.h @@ -26,9 +26,7 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "orbsvcs/CosEvent/event_export.h" -#include "tao/corba.h" - -class TAO_CEC_ProxyPushSupplier; +#include "CEC_ProxyPushSupplier.h" class TAO_Event_Export TAO_CEC_Dispatching_Task : public ACE_Task<ACE_SYNCH> { @@ -99,6 +97,9 @@ public: ACE_Allocator *mb_allocator); // Constructor + virtual ~TAO_CEC_Push_Command (void); + // Destructor + virtual int execute (CORBA::Environment&); // Command callback diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i index cecff7e37f2..577483a03ab 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Dispatching_Task.i @@ -50,4 +50,6 @@ TAO_CEC_Push_Command::TAO_CEC_Push_Command (TAO_CEC_ProxyPushSupplier* proxy, // // @@ TODO this->event_ = event; + + this->proxy_->_incr_refcnt (); } diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp index 3ae07eb5abd..742cc3caf53 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp @@ -209,17 +209,15 @@ TAO_CEC_ProxyPullConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV) // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; - supplier = this->supplier_._retn (); - - this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (supplier.in ())) + return; + ACE_TRY { supplier->disconnect_pull_supplier (ACE_TRY_ENV); diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp index f568a5a818a..9134639d795 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullSupplier.cpp @@ -86,17 +86,15 @@ TAO_CEC_ProxyPullSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV) // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; - consumer = this->consumer_._retn (); - - this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (consumer.in ())) + return; + ACE_TRY { consumer->disconnect_pull_consumer (ACE_TRY_ENV); diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp index a845882f7d7..77e16b469e3 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.cpp @@ -110,17 +110,15 @@ TAO_CEC_ProxyPushConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV) // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; - supplier = this->supplier_._retn (); - - this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (supplier.in ())) + return; + ACE_TRY { supplier->disconnect_push_supplier (ACE_TRY_ENV); @@ -218,39 +216,16 @@ TAO_CEC_ProxyPushConsumer::push (const CORBA::Any& event, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ The following code is not exception safe, must fix, but the - // canonical tricks don't work: the destroy_push_consumer () method - // must be invoked only once the mutex is released, but after the - // refcount get to zero, seems hard... - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - if (this->is_connected_i () == 0) - return; // @@ THROW something??? - - this->refcount_++; - } + TAO_CEC_ProxyPushConsumer_Guard ace_mon (this->lock_, + this->refcount_, + this->event_channel_, + this); + if (!ace_mon.locked ()) + return; this->event_channel_->consumer_admin ()->push (event, ACE_TRY_ENV); ACE_CHECK; - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - this->refcount_--; - if (this->refcount_ != 0) - return; - } - this->event_channel_->destroy_proxy (this); } void @@ -315,3 +290,61 @@ TAO_CEC_ProxyPushConsumer::_remove_ref (CORBA::Environment &) { this->_decr_refcnt (); } + +// **************************************************************** + +TAO_CEC_ProxyPushConsumer_Guard:: + TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock, + CORBA::ULong &refcount, + TAO_CEC_EventChannel *ec, + TAO_CEC_ProxyPushConsumer *proxy) + : lock_ (lock), + refcount_ (refcount), + event_channel_ (ec), + proxy_ (proxy), + locked_ (0) +{ + ACE_Guard<ACE_Lock> ace_mon (*this->lock_); + // If the guard fails there is not much we can do, raising an + // exception is wrong, the client has *no* way to handle that kind + // of error. Even worse, there is no exception to raise in that + // case. + // @@ Returning something won't work either, the error should be + // logged though! + + if (proxy->is_connected_i () == 0) + return; + + this->locked_ = 1; + this->refcount_++; +} + +TAO_CEC_ProxyPushConsumer_Guard:: + ~TAO_CEC_ProxyPushConsumer_Guard (void) +{ + // This access is safe because guard objects are created on the + // stack, only one thread has access to them + if (!this->locked_) + return; + + { + ACE_Guard<ACE_Lock> ace_mon (*this->lock_); + // If the guard fails there is not much we can do, raising an + // exception is wrong, the client has *no* way to handle that kind + // of error. Even worse, there is no exception to raise in that + // case. + // @@ Returning something won't work either, the error should be + // logged though! + + this->refcount_--; + if (this->refcount_ != 0) + return; + } + this->event_channel_->destroy_proxy (this->proxy_); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h index dc856f9f783..9cee6bb948a 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.h @@ -108,6 +108,9 @@ protected: // Set the supplier, used by some implementations to change the // policies used when invoking operations on the supplier. + friend class TAO_CEC_ProxyPushConsumer_Guard; + // The guard needs access to the following protected methods. + CORBA::Boolean is_connected_i (void) const; // The private version (without locking) of is_connected(). @@ -131,6 +134,50 @@ private: // Store the default POA. }; +// **************************************************************** + +class TAO_Event_Export TAO_CEC_ProxyPushConsumer_Guard +{ + // = TITLE + // A Guard for the ProxyPushConsumer reference count + // + // = DESCRIPTION + // This is a helper class used in the implementation of + // ProxyPushConumer. It provides a Guard mechanism to increment + // the reference count on the proxy, eliminating the need to hold + // mutexes during long operations. + // +public: + TAO_CEC_ProxyPushConsumer_Guard (ACE_Lock *lock, + CORBA::ULong &refcount, + TAO_CEC_EventChannel *ec, + TAO_CEC_ProxyPushConsumer *proxy); + // Constructor + + ~TAO_CEC_ProxyPushConsumer_Guard (void); + // Destructor + + int locked (void) const; + // Returns 1 if the reference count successfully acquired + +private: + ACE_Lock *lock_; + // The lock used to protect the reference count + + CORBA::ULong &refcount_; + // The reference count + + TAO_CEC_EventChannel *event_channel_; + // The event channel used to destroy the proxy + + TAO_CEC_ProxyPushConsumer *proxy_; + // The proxy whose lifetime is controlled by the reference count + + int locked_; + // This flag is set to 1 if the reference count was successfully + // acquired. +}; + #if defined (__ACE_INLINE__) #include "CEC_ProxyPushConsumer.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i index b6b7f318acc..535b34f58cd 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushConsumer.i @@ -34,3 +34,11 @@ TAO_CEC_ProxyPushConsumer::supplier (CosEventComm::PushSupplier_ptr supplier) this->supplier_i (supplier); } + +// **************************************************************** + +ACE_INLINE int +TAO_CEC_ProxyPushConsumer_Guard::locked (void) const +{ + return this->locked_; +} diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp index 415c877aa0c..77e1fe55db3 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp @@ -4,6 +4,8 @@ #include "CEC_Dispatching.h" #include "CEC_EventChannel.h" #include "CEC_ConsumerControl.h" +#include "orbsvcs/ESF/ESF_RefCount_Guard.h" +#include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h" #if ! defined (__ACE_INLINE__) #include "CEC_ProxyPushSupplier.i" @@ -86,17 +88,15 @@ TAO_CEC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV) // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; - consumer = this->consumer_._retn (); - - this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (consumer.in ())) + return; + ACE_TRY { consumer->disconnect_push_consumer (ACE_TRY_ENV); @@ -110,28 +110,62 @@ TAO_CEC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV) ACE_ENDTRY; } +typedef TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier> Destroy_Guard; + void TAO_CEC_ProxyPushSupplier::push (const CORBA::Any &event, CORBA::Environment &ACE_TRY_ENV) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); + Destroy_Guard auto_destroy (this->refcount_, + this->event_channel_, + this); + + { + ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - this->refcount_++; - this->event_channel_->dispatching ()->push (this, - event, - ACE_TRY_ENV); + if (this->is_connected_i () == 0) + return; + + TAO_ESF_RefCount_Guard<CORBA::ULong> cnt_mon (this->refcount_); + + { + TAO_CEC_Unlock reverse_lock (*this->lock_); + + ACE_GUARD (TAO_CEC_Unlock, ace_mon, reverse_lock); + this->event_channel_->dispatching ()->push (this, + event, + ACE_TRY_ENV); + ACE_CHECK; + } + } } void TAO_CEC_ProxyPushSupplier::push_nocopy (CORBA::Any &event, CORBA::Environment &ACE_TRY_ENV) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); + Destroy_Guard auto_destroy (this->refcount_, + this->event_channel_, + this); + + { + ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - this->refcount_++; - this->event_channel_->dispatching ()->push_nocopy (this, - event, - ACE_TRY_ENV); + if (this->is_connected_i () == 0) + return; + + TAO_ESF_RefCount_Guard<CORBA::ULong> cnt_mon (this->refcount_); + + { + TAO_CEC_Unlock reverse_lock (*this->lock_); + + ACE_GUARD (TAO_CEC_Unlock, ace_mon, reverse_lock); + this->event_channel_->dispatching ()->push_nocopy (this, + event, + ACE_TRY_ENV); + ACE_CHECK; + } + } } void @@ -271,26 +305,11 @@ TAO_CEC_ProxyPushSupplier::push_to_consumer (const CORBA::Any& event, // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - // The reference count was increased just before pushing to the - // dispatching module, we must decrease here. But if we get - // removed then we abort. We don't want to call _decr_refcnt() - // because that will require two locks. - this->refcount_--; - if (this->refcount_ == 0) - { - ace_mon.release (); - this->event_channel_->destroy_proxy (this); - return; - } - if (this->is_connected_i () == 0) return; // ACE_THROW (CosEventComm::Disconnected ());???? consumer = CosEventComm::PushConsumer::_duplicate (this->consumer_.in ()); - - // The refcount cannot be zero, because we have at least two - // references, } ACE_TRY @@ -328,54 +347,44 @@ TAO_CEC_ProxyPushSupplier::reactive_push_to_consumer ( const CORBA::Any& event, CORBA::Environment& ACE_TRY_ENV) { - if (this->is_connected_i () == 0) - return; // TAO_THROW (CosEventComm::Disconnected ());???? - - CosEventComm::PushConsumer_var consumer = - CosEventComm::PushConsumer::_duplicate (this->consumer_.in ()); - + CosEventComm::PushConsumer_var consumer; { - TAO_CEC_Unlock reverse_lock (*this->lock_); - - ACE_GUARD_THROW_EX (TAO_CEC_Unlock, ace_mon, reverse_lock, - CORBA::INTERNAL ()); - // @@ CosEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; + ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); + if (this->is_connected_i () == 0) + return; // TAO_THROW (CosEventComm::Disconnected ());???? - ACE_TRY - { - consumer->push (event, ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) - { - TAO_CEC_ConsumerControl *control = - this->event_channel_->consumer_control (); + consumer = + CosEventComm::PushConsumer::_duplicate (this->consumer_.in ()); + } - control->consumer_not_exist (this, ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCH (CORBA::SystemException, sysex) - { - TAO_CEC_ConsumerControl *control = - this->event_channel_->consumer_control (); + ACE_TRY + { + consumer->push (event, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); - control->system_exception (this, - sysex, - ACE_TRY_ENV); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - // Shouldn't happen, but does not hurt - } - ACE_ENDTRY; - } + control->consumer_not_exist (this, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); - // The reference count was incremented just before delegating on the - // dispatching strategy, in this can we need to decrement it *now*. - this->refcount_--; - // @@ What if it reaches 0??? + control->system_exception (this, + sysex, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Shouldn't happen, but does not hurt + } + ACE_ENDTRY; } CORBA::Boolean @@ -423,6 +432,10 @@ TAO_CEC_ProxyPushSupplier::_remove_ref (CORBA::Environment &) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier>; + #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate TAO_ESF_Proxy_RefCount_Guard<TAO_CEC_EventChannel,TAO_CEC_ProxyPushSupplier> + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp index 268b10ab444..f2f32b69fcc 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.cpp @@ -43,7 +43,8 @@ TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void) template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> TAO_ESF_Copy_On_Write<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>:: TAO_ESF_Copy_On_Write (void) - : writing_ (0), + : pending_writes_ (0), + writing_ (0), cond_ (mutex_) { ACE_NEW (this->collection_, Collection); @@ -53,7 +54,13 @@ template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> TAO_ESF_Copy_On_Write<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>:: ~TAO_ESF_Copy_On_Write (void) { + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_); + + while (this->pending_writes_ != 0) + this->cond_.wait (); + this->collection_->_decr_refcnt (); + this->collection_ = 0; } template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> void @@ -79,6 +86,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>:: { Write_Guard ace_mon (this->mutex_, this->cond_, + this->pending_writes_, this->writing_, this->collection_); @@ -93,6 +101,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>:: { Write_Guard ace_mon (this->mutex_, this->cond_, + this->pending_writes_, this->writing_, this->collection_); @@ -107,6 +116,7 @@ TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>:: { Write_Guard ace_mon (this->mutex_, this->cond_, + this->pending_writes_, this->writing_, this->collection_); @@ -117,10 +127,10 @@ template<class PROXY, class C, class I, ACE_SYNCH_DECL> void TAO_ESF_Copy_On_Write<PROXY,C,I,ACE_SYNCH_USE>:: shutdown (CORBA::Environment &ACE_TRY_ENV) { - // @@ Do we really need to perform a copy here? - // I believe so, but i don't have a good scenario for it. + // We need to perform a copy to follow the protocol. Write_Guard ace_mon (this->mutex_, this->cond_, + this->pending_writes_, this->writing_, this->collection_); diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h index d90b8596a1a..e7fd96273af 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.h @@ -62,7 +62,7 @@ class TAO_ESF_Copy_On_Write_Read_Guard public: typedef TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; TAO_ESF_Copy_On_Write_Read_Guard (ACE_LOCK &mutex, - Collection*& collection); + Collection *&collection); // Constructor ~TAO_ESF_Copy_On_Write_Read_Guard (void); @@ -92,9 +92,10 @@ class TAO_ESF_Copy_On_Write_Write_Guard public: typedef TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; TAO_ESF_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX_T &mutex, - ACE_SYNCH_CONDITION_T &cond, - int &writing_flag, - Collection*& collection); + ACE_SYNCH_CONDITION_T &cond, + int &pending_writes, + int &writing_flag, + Collection*& collection); // Constructor ~TAO_ESF_Copy_On_Write_Write_Guard (void); @@ -105,6 +106,7 @@ public: private: ACE_SYNCH_MUTEX_T &mutex; ACE_SYNCH_CONDITION_T &cond; + int &pending_writes; int &writing_flag; Collection *&collection; }; @@ -150,6 +152,9 @@ private: ACE_SYNCH_MUTEX_T mutex_; // A mutex to serialize access to the collection pointer. + int pending_writes_; + // Number of pending writes + int writing_; // If non-zero then a thread is changing the collection. Many // threads can use the collection simulatenously, but only one diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i index a4a30b965c1..12f8525ba09 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Copy_On_Write.i @@ -12,7 +12,7 @@ TAO_ESF_Copy_On_Write_Collection<COLLECTION,ITERATOR>:: template<class COLLECTION, class ITERATOR, class ACE_LOCK> ACE_INLINE TAO_ESF_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR,ACE_LOCK>:: TAO_ESF_Copy_On_Write_Read_Guard (ACE_LOCK &m, - Collection*& collection_ref) + Collection*& collection_ref) : collection (0), mutex (m) { @@ -37,18 +37,22 @@ TAO_ESF_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR,ACE_LOCK>:: template<class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> ACE_INLINE TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>:: TAO_ESF_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX_T &m, - ACE_SYNCH_CONDITION_T &c, - int &w, - Collection*& cr) + ACE_SYNCH_CONDITION_T &c, + int &p, + int &w, + Collection*& cr) : copy (0), mutex (m), cond (c), + pending_writes (p), writing_flag (w), collection (cr) { { ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex); + this->pending_writes++; + while (this->writing_flag != 0) this->cond.wait (); @@ -58,7 +62,13 @@ TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>:: // Copy outside the mutex, because it may take a long time. // Nobody can change it, because it is protected by the // writing_flag. - ACE_NEW (this->copy, Collection (*this->collection)); + + // First initialize it (with the correct reference count + ACE_NEW (this->copy, Collection); + // Copy the contents + this->copy->collection = this->collection->collection; + + // Increase the reference counts ITERATOR end = this->copy->collection.end (); for (ITERATOR i = this->copy->collection.begin (); i != end; ++i) { @@ -77,6 +87,8 @@ TAO_ESF_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR,ACE_SYNCH_USE>:: tmp = this->collection; this->collection = this->copy; this->writing_flag = 0; + this->pending_writes--; + this->cond.signal (); } // Delete outside the mutex, because it may take a long time. diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp index bc73c7d56cd..61668be4601 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Delayed_Changes.cpp @@ -54,7 +54,8 @@ TAO_ESF_Delayed_Changes<PROXY,COLLECTION,ITERATOR,ACE_SYNCH_USE>:: { worker->work (*i, ACE_TRY_ENV); ACE_CHECK; - } + + } } template<class PROXY, class COLLECTION, class ITERATOR, ACE_SYNCH_DECL> int diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp index 144f7ff7756..d44befa55c0 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_Admin.cpp @@ -77,6 +77,9 @@ TAO_ESF_Proxy_Admin<EC,P,I>::disconnected (P *proxy, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC (()) { + proxy->deactivate (ACE_TRY_ENV); + ACE_CHECK; // Cannot happen, just following the discipline. + ACE_TRY { this->collection_->disconnected (proxy, ACE_TRY_ENV); @@ -93,9 +96,6 @@ TAO_ESF_Proxy_Admin<EC,P,I>::disconnected (P *proxy, // that has an exception for "could not acquire a mutex". } ACE_ENDTRY; - - proxy->deactivate (ACE_TRY_ENV); - ACE_CHECK; // Cannot happen, just following the discipline. } #endif /* TAO_ESF_PROXY_ADMIN_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp index 5fd6f3adc5b..f1244c91cca 100644 --- a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_List.cpp @@ -43,7 +43,10 @@ TAO_ESF_Proxy_List<PROXY>::reconnected (PROXY *proxy, CORBA::Environment &) { int r = this->impl_.insert (proxy); - if (r == 0 || r == 1) + if (r == 0) + return; + + if (r == 1) { // Reference count is incremented by the callers to [re]connected. // @@ Find out if the protocol could be simplified, and decoupling diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp new file mode 100644 index 00000000000..83a1490530f --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.cpp @@ -0,0 +1,27 @@ +// $Id$ + +#ifndef TAO_ESF_PROXY_REFCOUNT_GUARD_CPP +#define TAO_ESF_PROXY_REFCOUNT_GUARD_CPP + +#include "ESF_Proxy_RefCount_Guard.h" + +#if ! defined (__ACE_INLINE__) +#include "ESF_Proxy_RefCount_Guard.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ESF, ESF_Proxy_RefCount_Guard, "$Id$") + +template<class EC, class P> +TAO_ESF_Proxy_RefCount_Guard<EC,P>::~TAO_ESF_Proxy_RefCount_Guard (void) +{ + // Checking for 0 is safe, once the variable reaches 0 the value + // will stay there. + // @@ But what if the thread is switched to another processor just + // after release the mutex? + if (this->refcount_ == 0) + { + this->event_channel_->destroy_proxy (this->proxy_); + } +} + +#endif /* TAO_ESF_PROXY_REFCOUNT_GUARD_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h new file mode 100644 index 00000000000..4ecc951fc8c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h @@ -0,0 +1,79 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Event Service Framework +// +// = FILENAME +// ESF_Proxy_RefCount_Guard +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef TAO_ESF_PROXY_REFCOUNT_GUARD_H +#define TAO_ESF_PROXY_REFCOUNT_GUARD_H + +#include "tao/corbafwd.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class EVENT_CHANNEL, class PROXY> +class TAO_ESF_Proxy_RefCount_Guard +{ + // = TITLE + // Reference count based guard. + // + // = DESCRIPTION + // A common idiom used on event services is to increment a + // reference count before starting a long running operation. + // The system can then execute the operation without any risk of + // having the underlying object destroyed. The advantage of using + // a reference count is that no mutex or lock needs to be held + // while the operation is beign executed. + // This class implements that common idiom, but it also adds hooks + // to handle scenarios where more than one operation is performed + // while holding the reference count. + // + // = TODO + // @@ The type of lock could be parametric + // +public: + TAO_ESF_Proxy_RefCount_Guard (CORBA::ULong &refcount, + EVENT_CHANNEL *ec, + PROXY *proxy); + // Constructor + + ~TAO_ESF_Proxy_RefCount_Guard (void); + // Destructor + +protected: + CORBA::ULong &refcount_; + // The reference count, if it gets to zero then the object must be + // destroyed + + EVENT_CHANNEL *event_channel_; + // The event channel used to destroy the proxy + + PROXY *proxy_; + // The proxy whose lifetime is controlled by the reference count +}; + +#if defined (__ACE_INLINE__) +#include "ESF_Proxy_RefCount_Guard.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ESF_Proxy_RefCount_Guard.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("ESF_Proxy_RefCount_Guard.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* TAO_ESF_PROXY_REFCOUNT_GUARD_H */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i new file mode 100644 index 00000000000..7639a89ee12 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_Proxy_RefCount_Guard.i @@ -0,0 +1,12 @@ +// $Id$ + +template<class EC, class P> ACE_INLINE +TAO_ESF_Proxy_RefCount_Guard<EC,P>:: + TAO_ESF_Proxy_RefCount_Guard (CORBA::ULong &refcount, + EC *ec, + P *proxy) + : refcount_ (refcount), + event_channel_ (ec), + proxy_ (proxy) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp new file mode 100644 index 00000000000..432189332bb --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.cpp @@ -0,0 +1,14 @@ +// $Id$ + +#ifndef TAO_ESF_REFCOUNT_GUARD_CPP +#define TAO_ESF_REFCOUNT_GUARD_CPP + +#include "ESF_RefCount_Guard.h" + +#if ! defined (__ACE_INLINE__) +#include "ESF_RefCount_Guard.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ESF, ESF_RefCount_Guard, "$Id$") + +#endif /* TAO_ESF_REFCOUNT_GUARD_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h new file mode 100644 index 00000000000..05731bb9fa7 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Event Service Framework +// +// = FILENAME +// ESF_RefCount_Guard +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef TAO_ESF_REFCOUNT_GUARD_H +#define TAO_ESF_REFCOUNT_GUARD_H + +#include "ace/OS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class T> +class TAO_ESF_RefCount_Guard +{ + // = TITLE + // Reference count based guard. + // + // = DESCRIPTION + // A common idiom used on event services is to increment a + // reference count before starting a long running operation. + // The system can then execute the operation without any risk of + // having the underlying object destroyed. The advantage of using + // a reference count is that no mutex or lock needs to be held + // while the operation is beign executed. + // +public: + TAO_ESF_RefCount_Guard (T &refcount); + // Constructor + + ~TAO_ESF_RefCount_Guard (void); + // Destructor + +protected: + T &refcount_; + // The reference count +}; + +#if defined (__ACE_INLINE__) +#include "ESF_RefCount_Guard.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ESF_RefCount_Guard.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("ESF_RefCount_Guard.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* TAO_ESF_REFCOUNT_GUARD_H */ diff --git a/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i new file mode 100644 index 00000000000..b69511a8382 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/ESF/ESF_RefCount_Guard.i @@ -0,0 +1,16 @@ +// $Id$ + +template<class T> ACE_INLINE +TAO_ESF_RefCount_Guard<T>:: + TAO_ESF_RefCount_Guard (T &refcount) + : refcount_ (refcount) +{ + this->refcount_++; +} + +template<class T> ACE_INLINE +TAO_ESF_RefCount_Guard<T>:: + ~TAO_ESF_RefCount_Guard (void) +{ + this->refcount_--; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp index 0c9ba5b26e0..f1d15c62827 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp @@ -100,7 +100,6 @@ TAO_EC_Shutdown_Task_Command::execute (CORBA::Environment&) // **************************************************************** -ACE_INLINE TAO_EC_Push_Command::~TAO_EC_Push_Command (void) { this->proxy_->_decr_refcnt (); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp index 92593b21b5d..a5a1006f2bc 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp @@ -1008,9 +1008,10 @@ TAO_ECG_UDP_EH::TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv) } int -TAO_ECG_UDP_EH::open (const ACE_INET_Addr& ipaddr) +TAO_ECG_UDP_EH::open (const ACE_INET_Addr& ipaddr, + int reuse_addr) { - if (this->dgram_.open (ipaddr) == -1) + if (this->dgram_.open (ipaddr, PF_INET, 0, reuse_addr) == -1) return -1; return this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h index 6d1d6cdb9b6..6379b6c7073 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h @@ -524,7 +524,8 @@ class TAO_RTEvent_Export TAO_ECG_UDP_EH : public ACE_Event_Handler public: TAO_ECG_UDP_EH (TAO_ECG_UDP_Receiver *recv); - int open (const ACE_INET_Addr& ipaddr); + int open (const ACE_INET_Addr& ipaddr, + int reuse_addr = 0); // Open the datagram and register with this->reactor() int close (void); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp index 2652eaec36d..f710ff20c03 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp @@ -24,13 +24,17 @@ TAO_EC_ProxyPushConsumer:: this->default_POA_ = this->event_channel_->consumer_poa (); + + this->qos_.is_gateway = 0; } TAO_EC_ProxyPushConsumer::~TAO_EC_ProxyPushConsumer (void) { this->event_channel_->destroy_consumer_lock (this->lock_); + this->cleanup_i (); } + CORBA::Boolean TAO_EC_ProxyPushConsumer::supplier_non_existent ( CORBA::Boolean_out disconnected, @@ -60,91 +64,42 @@ void TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { - TAO_EC_Supplier_Filter* filter = 0; - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - if (this->is_connected_i () == 0) - return; - - filter = this->filter_; - filter->_incr_refcnt (); - } - - filter->connected (supplier, ACE_TRY_ENV); - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - filter->_decr_refcnt (); - } + TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_, + this->refcount_, + this->event_channel_, + this); + if (!ace_mon.locked ()) + return; + + ace_mon.filter->connected (supplier, ACE_TRY_ENV); } void TAO_EC_ProxyPushConsumer::reconnected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { - TAO_EC_Supplier_Filter* filter = 0; - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - if (this->is_connected_i () == 0) - return; - - filter = this->filter_; - filter->_incr_refcnt (); - } - - filter->reconnected (supplier, ACE_TRY_ENV); - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - filter->_decr_refcnt (); - } + TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_, + this->refcount_, + this->event_channel_, + this); + if (!ace_mon.locked ()) + return; + + ace_mon.filter->reconnected (supplier, ACE_TRY_ENV); } void TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { - TAO_EC_Supplier_Filter* filter = 0; - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - if (this->is_connected_i () == 0) - return; - - filter = this->filter_; - filter->_incr_refcnt (); - } - - filter->disconnected (supplier, ACE_TRY_ENV); - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - filter->_decr_refcnt (); - } + TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_, + this->refcount_, + this->event_channel_, + this); + if (!ace_mon.locked ()) + return; + + ace_mon.filter->disconnected (supplier, ACE_TRY_ENV); } void @@ -176,20 +131,23 @@ TAO_EC_ProxyPushConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV) RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; - supplier = this->supplier_._retn (); - this->filter_->shutdown (ACE_TRY_ENV); - ACE_CHECK; + if (this->filter_ != 0) + { + this->filter_->shutdown (ACE_TRY_ENV); + ACE_CHECK; - this->cleanup_i (); + this->cleanup_i (); + } } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (supplier.in ())) + return; + ACE_TRY { supplier->disconnect_push_supplier (ACE_TRY_ENV); @@ -209,9 +167,12 @@ TAO_EC_ProxyPushConsumer::cleanup_i (void) this->supplier_ = RtecEventComm::PushSupplier::_nil (); - this->filter_->unbind (this); - this->filter_->_decr_refcnt (); - this->filter_ = 0; + if (this->filter_ != 0) + { + this->filter_->unbind (this); + this->filter_->_decr_refcnt (); + this->filter_ = 0; + } } RtecEventChannelAdmin::ProxyPushConsumer_ptr @@ -273,7 +234,7 @@ TAO_EC_ProxyPushConsumer::_decr_refcnt (void) return this->refcount_; } - // Notify the event channel + // Use the event channel this->event_channel_->destroy_proxy (this); return 0; } @@ -346,44 +307,17 @@ TAO_EC_ProxyPushConsumer::push (const RtecEventComm::EventSet& event, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { - // @@ The following code is not exception safe, must fix, but the - // canonical tricks don't work: the destroy_push_consumer () method - // must be invoked only once the mutex is released, but after the - // refcount get to zero, seems hard... - - TAO_EC_Supplier_Filter* filter = 0; - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - if (this->is_connected_i () == 0) - return; // @@ THROW something??? - - filter = this->filter_; - filter->_incr_refcnt (); - - this->refcount_++; - } + TAO_EC_ProxyPushConsumer_Guard ace_mon (this->lock_, + this->refcount_, + this->event_channel_, + this); + if (!ace_mon.locked ()) + return; // No need to keep the lock, the filter_ class is supposed to be // thread safe.... - filter->push (event, ACE_TRY_ENV); - - { - ACE_GUARD_THROW_EX ( - ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - // @@ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - filter->_decr_refcnt (); - this->refcount_--; - if (this->refcount_ != 0) - return; - } - this->event_channel_->destroy_proxy (this); + ace_mon.filter->push (event, ACE_TRY_ENV); + ACE_CHECK; } void @@ -408,9 +342,6 @@ TAO_EC_ProxyPushConsumer::disconnect_push_consumer ( this->cleanup_i (); } - this->deactivate (ACE_TRY_ENV); - ACE_CHECK; - // Notify the event channel... this->event_channel_->disconnected (this, ACE_TRY_ENV); ACE_CHECK; @@ -453,3 +384,66 @@ TAO_EC_ProxyPushConsumer::_remove_ref (CORBA::Environment &) { this->_decr_refcnt (); } + +// **************************************************************** + +TAO_EC_ProxyPushConsumer_Guard:: + TAO_EC_ProxyPushConsumer_Guard (ACE_Lock *lock, + CORBA::ULong &refcount, + TAO_EC_Event_Channel *ec, + TAO_EC_ProxyPushConsumer *proxy) + : lock_ (lock), + refcount_ (refcount), + event_channel_ (ec), + proxy_ (proxy), + locked_ (0) +{ + ACE_Guard<ACE_Lock> ace_mon (*this->lock_); + // If the guard fails there is not much we can do, raising an + // exception is wrong, the client has *no* way to handle that kind + // of error. Even worse, there is no exception to raise in that + // case. + // @@ Returning something won't work either, the error should be + // logged though! + + if (proxy->is_connected_i () == 0) + return; + + this->filter = this->proxy_->filter_i (); + this->filter->_incr_refcnt (); + + this->locked_ = 1; + this->refcount_++; +} + +TAO_EC_ProxyPushConsumer_Guard:: + ~TAO_EC_ProxyPushConsumer_Guard (void) +{ + // This access is safe because guard objects are created on the + // stack, only one thread has access to them + if (!this->locked_) + return; + + { + ACE_Guard<ACE_Lock> ace_mon (*this->lock_); + // If the guard fails there is not much we can do, raising an + // exception is wrong, the client has *no* way to handle that kind + // of error. Even worse, there is no exception to raise in that + // case. + // @@ Returning something won't work either, the error should be + // logged though! + + this->filter->_decr_refcnt (); + + this->refcount_--; + if (this->refcount_ != 0) + return; + } + this->event_channel_->destroy_proxy (this->proxy_); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h index 967db669471..513d923809c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h @@ -147,9 +147,15 @@ protected: // Set the supplier, used by some implementations to change the // policies used when invoking operations on the supplier. + friend class TAO_EC_ProxyPushConsumer_Guard; + // The guard needs access to the following protected methods. + CORBA::Boolean is_connected_i (void) const; // The private version (without locking) of is_connected(). + TAO_EC_Supplier_Filter *filter_i (void) const; + // Return the current filter, assumes the locks are held. + void cleanup_i (void); // Release the filter and the supplier @@ -176,6 +182,52 @@ private: // The strategy to do filtering close to the supplier }; +// **************************************************************** + +class TAO_RTEvent_Export TAO_EC_ProxyPushConsumer_Guard +{ + // = TITLE + // A Guard for the ProxyPushConsumer reference count + // + // = DESCRIPTION + // This is a helper class used in the implementation of + // ProxyPushConumer. It provides a Guard mechanism to increment + // the reference count on the proxy and its filter, eliminating + // the need to hold mutexes during long operations. + // +public: + TAO_EC_ProxyPushConsumer_Guard (ACE_Lock *lock, + CORBA::ULong &refcount, + TAO_EC_Event_Channel *ec, + TAO_EC_ProxyPushConsumer *proxy); + // Constructor + + ~TAO_EC_ProxyPushConsumer_Guard (void); + // Destructor + + int locked (void) const; + // Returns 1 if the reference count successfully acquired + + TAO_EC_Supplier_Filter *filter; + +private: + ACE_Lock *lock_; + // The lock used to protect the reference count + + CORBA::ULong &refcount_; + // The reference count + + TAO_EC_Event_Channel *event_channel_; + // The event channel used to destroy the proxy + + TAO_EC_ProxyPushConsumer *proxy_; + // The proxy whose lifetime is controlled by the reference count + + int locked_; + // This flag is set to 1 if the reference count was successfully + // acquired. +}; + #if defined (__ACE_INLINE__) #include "EC_ProxyConsumer.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i index caf77ce2e03..ace563c49e7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i @@ -49,3 +49,18 @@ TAO_EC_ProxyPushConsumer::publications_i (void) const { return this->qos_; } + +ACE_INLINE TAO_EC_Supplier_Filter * +TAO_EC_ProxyPushConsumer::filter_i (void) const +{ + return this->filter_; +} + +// **************************************************************** + +ACE_INLINE int +TAO_EC_ProxyPushConsumer_Guard::locked (void) const +{ + return this->locked_; +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index 7cc09c11a6a..6200c54beb4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -7,6 +7,8 @@ #include "EC_Event_Channel.h" #include "EC_Scheduling_Strategy.h" #include "EC_ConsumerControl.h" +#include "orbsvcs/ESF/ESF_RefCount_Guard.h" +#include "orbsvcs/ESF/ESF_Proxy_RefCount_Guard.h" #if ! defined (__ACE_INLINE__) #include "EC_ProxySupplier.i" @@ -32,6 +34,7 @@ TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec) TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void) { this->event_channel_->destroy_supplier_lock (this->lock_); + this->cleanup_i (); } void @@ -95,17 +98,20 @@ TAO_EC_ProxyPushSupplier::shutdown (CORBA::Environment &ACE_TRY_ENV) RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; - if (this->is_connected_i () == 0) - return; + int connected = this->is_connected_i (); consumer = this->consumer_._retn (); - this->cleanup_i (); + if (connected) + this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; + if (CORBA::is_nil (consumer.in ())) + return; + ACE_TRY { consumer->disconnect_push_consumer (ACE_TRY_ENV); @@ -293,9 +299,6 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier ( this->cleanup_i (); } - this->deactivate (ACE_TRY_ENV); - ACE_CHECK; - // Notify the event channel.... this->event_channel_->disconnected (this, ACE_TRY_ENV); ACE_CHECK; @@ -349,13 +352,18 @@ TAO_EC_ProxyPushSupplier::resume_connection (CORBA::Environment &ACE_TRY_ENV) this->suspended_ = 0; } +typedef TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier> Destroy_Guard; + int TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - int result = 0; + Destroy_Guard auto_destroy (this->refcount_, + this->event_channel_, + this); + int result = 0; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, @@ -367,11 +375,8 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event, result = this->child_->filter (event, qos_info, ACE_TRY_ENV); - if (this->refcount_ > 0) - return result; + ACE_CHECK_RETURN (0); } - - this->event_channel_->destroy_proxy (this); return result; } @@ -380,8 +385,11 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - int result = 0; + Destroy_Guard auto_destroy (this->refcount_, + this->event_channel_, + this); + int result = 0; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, @@ -393,11 +401,8 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event, result = this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV); - if (this->refcount_ > 0) - return result; + ACE_CHECK_RETURN (0); } - - this->event_channel_->destroy_proxy (this); return result; } @@ -406,20 +411,30 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // No need to grab the lock, it is beign held already by the - // filter() method - this->refcount_++; - + // The mutex is already held by the caller (usually the filter() + // method) if (this->is_connected_i () == 0) return; // TAO_THROW (RtecEventComm::Disconnected ());???? if (this->suspended_ != 0) return; + TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_); + // The guard will decrement the reference count, notice that the + // reference count can become 0, but this is not the right spot to + // check for that and destroy the object. + // If we did so then we would destroy the object, and consequently + // the mutex, but the mutex is used later when the stack unwinds and + // the filter() method tries to destroy the mutex (that originally + // acquired the mutex in the first place). + // So the correct thing to do is to just decrement the reference + // count and let the filter() method do the destruction. + RtecEventComm::PushConsumer_var consumer = RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ()); { + // We have to release the lock to avoid dead-locks. TAO_EC_Unlock reverse_lock (*this->lock_); ACE_GUARD_THROW_EX (TAO_EC_Unlock, ace_mon, reverse_lock, @@ -431,19 +446,11 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event, event, qos_info, ACE_TRY_ENV); + ACE_CHECK; } + if (this->child_ != 0) this->child_->clear (); - - // The reference count was incremented just before delegating on the - // dispatching strategy, in this can we need to decrement it *now*. - this->refcount_--; - // @@ What if it reaches 0??? - if (this->refcount_ == 0) - { - this->lock_->release (); - this->event_channel_->destroy_proxy (this); - } } void @@ -451,16 +458,25 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // No need to grab the lock, it is beign held already by the - // filter() method - this->refcount_++; - + // The mutex is already held by the caller (usually the filter() + // method) if (this->is_connected_i () == 0) return; // TAO_THROW (RtecEventComm::Disconnected ());???? if (this->suspended_ != 0) return; + TAO_ESF_RefCount_Guard<CORBA::ULong> ace_mon (this->refcount_); + // The guard will decrement the reference count, notice that the + // reference count can become 0, but this is not the right spot to + // check for that and destroy the object. + // If we did so then we would destroy the object, and consequently + // the mutex, but the mutex is used later when the stack unwinds and + // the filter() method tries to destroy the mutex (that originally + // acquired the mutex in the first place). + // So the correct thing to do is to just decrement the reference + // count and let the filter() method do the destruction. + RtecEventComm::PushConsumer_var consumer = RtecEventComm::PushConsumer::_duplicate (this->consumer_.in ()); @@ -476,19 +492,11 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event, event, qos_info, ACE_TRY_ENV); + ACE_CHECK; } + if (this->child_ != 0) this->child_->clear (); - - // The reference count was incremented just before delegating on the - // dispatching strategy, in this can we need to decrement it *now*. - this->refcount_--; - // @@ What if it reaches 0??? - if (this->refcount_ == 0) - { - this->lock_->release (); - this->event_channel_->destroy_proxy (this); - } } void @@ -681,6 +689,12 @@ TAO_EC_ProxyPushSupplier::_remove_ref (CORBA::Environment &) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class TAO_ESF_RefCount_Guard<CORBA::ULong>; +template class TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier>; + #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate TAO_ESF_RefCount_Guard<CORBA::ULong> +#pragma instantiate TAO_ESF_Proxy_RefCount_Guard<TAO_EC_Event_Channel,TAO_EC_ProxyPushSupplier> + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Makefile.CosNotification b/TAO/orbsvcs/orbsvcs/Makefile.CosNotification index b6f863a7608..47bce482d26 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile.CosNotification +++ b/TAO/orbsvcs/orbsvcs/Makefile.CosNotification @@ -14,7 +14,7 @@ SHLIB = $(LIBNAME).$(SOEXT) VPATH=.:Notify -ACE_SHLIBS = -lTAO_CosEvent -lTAO_CosTrading -lTAO_Svc_Utils -lTAO -lACE +ACE_SHLIBS = -lTAO_CosEvent -lTAO_RTEvent -lTAO_RTSched -lTAO_CosTrading -lTAO_CosNaming -lTAO_Svc_Utils -lTAO -lACE #---------------------------------------------------------------------------- # Include macros and targets diff --git a/TAO/orbsvcs/tests/Event/Basic/Basic.dsw b/TAO/orbsvcs/tests/Event/Basic/Basic.dsw index f8155f1d14d..47199426b9a 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Basic.dsw +++ b/TAO/orbsvcs/tests/Event/Basic/Basic.dsw @@ -123,6 +123,18 @@ Package=<4> ###############################################################################
+Project: "Random"=.\Random.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "Reconnect"=.\Reconnect.dsp - Package Owner=<4>
Package=<5>
diff --git a/TAO/orbsvcs/tests/Event/Basic/Makefile b/TAO/orbsvcs/tests/Event/Basic/Makefile index 0a8887466db..0aa8c74c1dc 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Makefile +++ b/TAO/orbsvcs/tests/Event/Basic/Makefile @@ -26,7 +26,8 @@ BIN2 = Reconnect \ Bitmask \ Complex \ Gateway \ - Control + Control \ + Random #### If the orbsvcs library wasn't built with all components, don't #### try to build certain tests. diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.cpp b/TAO/orbsvcs/tests/Event/Basic/Random.cpp new file mode 100644 index 00000000000..330677662e7 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/Basic/Random.cpp @@ -0,0 +1,565 @@ +// $Id$ + +#include "Random.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Time_Utilities.h" +#include "ace/Arg_Shifter.h" + +ACE_RCSID(EC_Tests, Random, "$Id$") + +int +main (int argc, char* argv[]) +{ + RND_Driver driver; + return driver.run (argc, argv); +} + +// **************************************************************** + +const int base_type = 20; + +void +deactivate_servant (PortableServer::Servant servant, + CORBA::Environment &ACE_TRY_ENV) +{ + PortableServer::POA_var poa = + servant->_default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var oid = + poa->servant_to_id (servant, ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (oid.in (), ACE_TRY_ENV); + ACE_CHECK; +} + + +RND_Driver::RND_Driver (void) + : timer_ (this), + nsuppliers_ (4), + nconsumers_ (4), + max_recursion_ (1) +{ + TAO_EC_Default_Factory::init_svcs (); +} + +int +RND_Driver::run (int argc, char *argv[]) +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "", ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcasecmp (arg, "-suppliers") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + char* opt = arg_shifter.get_current (); + int n = ACE_OS::atoi (opt); + if (n >= 1) + this->nsuppliers_ = n; + arg_shifter.consume_arg (); + } + } + else if (ACE_OS::strcasecmp (arg, "-consumers") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + char* opt = arg_shifter.get_current (); + int n = ACE_OS::atoi (opt); + if (n >= 1) + this->nconsumers_ = n; + arg_shifter.consume_arg (); + } + } + else if (ACE_OS::strcasecmp (arg, "-max_recursion") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + char* opt = arg_shifter.get_current (); + int n = ACE_OS::atoi (opt); + if (n >= 0) + this->max_recursion_ = n; + arg_shifter.consume_arg (); + } + } + else + arg_shifter.ignore_arg (); + } + + // **************************************************************** + + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA", ACE_TRY_ENV); + ACE_TRY_CHECK; + PortableServer::POA_var poa = + PortableServer::POA::_narrow (object.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; + PortableServer::POAManager_var poa_manager = + poa->the_POAManager (ACE_TRY_ENV); + ACE_TRY_CHECK; + poa_manager->activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + attributes.consumer_reconnect = 1; + attributes.supplier_reconnect = 1; + + TAO_EC_Event_Channel ec_impl (attributes); + ec_impl.activate (ACE_TRY_ENV); + ACE_TRY_CHECK; + + RtecEventChannelAdmin::EventChannel_var event_channel = + ec_impl._this (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + // Obtain the consumer admin.. + this->consumer_admin_ = + event_channel->for_consumers (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // Obtain the supplier admin.. + this->supplier_admin_ = + event_channel->for_suppliers (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + { + // Let's say that the execution time for event 2 is 1 + // milliseconds... + ACE_Time_Value tv (0, 50000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are + // reserved for the EC... + qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + time, + 0); + + this->timer_.connect (this->consumer_admin_.in (), + qos.get_ConsumerQOS (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // **************************************************************** + + { + ACE_SupplierQOS_Factory qos; + qos.insert (0, base_type, 0, 1); + + this->supplier_.connect (this->supplier_admin_.in (), + qos.get_SupplierQOS (), + ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // **************************************************************** + + ACE_NEW_RETURN (this->consumers_, + RND_Consumer*[this->nconsumers_], + 1); + for (int i = 0; i != this->nconsumers_; ++i) + { + ACE_NEW_RETURN (this->consumers_[i], + RND_Consumer (this), + 1); + + CORBA::Object_var obj = + this->consumers_[i]->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // **************************************************************** + + ACE_NEW_RETURN (this->suppliers_, + RND_Supplier*[this->nsuppliers_], + 1); + for (int j = 0; j != this->nsuppliers_; ++j) + { + ACE_NEW_RETURN (this->suppliers_[j], + RND_Supplier, + 1); + this->suppliers_[j]->activate (); + + CORBA::Object_var obj = + this->suppliers_[j]->_this (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + + // **************************************************************** + + ACE_Time_Value tv (30, 0); + orb->run (tv); + + ACE_Thread_Manager::instance ()->wait (); + + // **************************************************************** + + { + for (int k = 0; k != this->nsuppliers_; ++k) + { + deactivate_servant (this->suppliers_[k], + ACE_TRY_ENV); + ACE_TRY_CHECK; + this->suppliers_[k]->_remove_ref (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + delete[] this->suppliers_; + this->suppliers_ = 0; + } + + // **************************************************************** + + // We destroy now to verify that the callbacks work and do not + // produce any problems. + event_channel->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + { + for (int k = 0; k != this->nconsumers_; ++k) + { + deactivate_servant (this->consumers_[k], + ACE_TRY_ENV); + ACE_TRY_CHECK; + this->consumers_[k]->_remove_ref (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + delete[] this->consumers_; + this->consumers_ = 0; + } + + // **************************************************************** + + deactivate_servant (&ec_impl, + ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + poa->destroy (1, 1, ACE_TRY_ENV); + ACE_TRY_CHECK; + + // **************************************************************** + + orb->destroy (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Random"); + return 1; + } + ACE_ENDTRY; + return 0; +} + +void +RND_Driver::timer (const RtecEventComm::Event &e, + CORBA::Environment &ACE_TRY_ENV) +{ + int r = ACE_OS::rand (); + if (r < 0) + r = -r; + + int n = r% 20; + + switch (n) + { + case 0: + case 1: + { + // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n")); + if (e.header.source < this->max_recursion_) + { + RtecEventComm::EventSet event (1); + event.length (1); + event[0] = e; + event[0].header.source ++; + this->supplier_.push (event, ACE_TRY_ENV); + } + } + break; + + default: + case 2: + case 3: + case 4: + case 5: + // ACE_DEBUG ((LM_DEBUG, "Received event\n")); + break; + + case 6: + { + int n = ACE_OS::rand () % this->nsuppliers_; + + // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n)); + + ACE_SupplierQOS_Factory qos; + qos.insert (0, base_type, 0, 1); + + this->suppliers_[n]->connect (this->supplier_admin_.in (), + qos.get_SupplierQOS (), + ACE_TRY_ENV); + ACE_CHECK; + } + break; + + case 7: + { + int n = ACE_OS::rand () % this->nconsumers_; + + // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n)); + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert_type (base_type, 0); + + this->consumers_[n]->connect (this->consumer_admin_.in (), + qos.get_ConsumerQOS (), + ACE_TRY_ENV); + ACE_CHECK; + } + break; + + case 8: + { + int n = ACE_OS::rand () % this->nsuppliers_; + + // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n)); + + this->suppliers_[n]->disconnect (ACE_TRY_ENV); + ACE_CHECK; + } + break; + + case 9: + { + int n = ACE_OS::rand () % this->nconsumers_; + + // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n)); + + this->consumers_[n]->disconnect (ACE_TRY_ENV); + ACE_CHECK; + } + break; + } +} + +void +RND_Driver::event (const RtecEventComm::Event &e, + CORBA::Environment &ACE_TRY_ENV) +{ + this->timer (e, ACE_TRY_ENV); +} + +// **************************************************************** + +void +RND_Timer::push (const RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_TRY + { + this->driver_->timer (event[0], ACE_TRY_ENV); + } + ACE_CATCHANY + { + } + ACE_ENDTRY; +} + +// **************************************************************** + +void +RND_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin, + const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushSupplier_var proxy; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (CORBA::is_nil (this->proxy_.in ())) + { + this->proxy_ = admin->obtain_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + } + proxy = + RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_.in ()); + } + RtecEventComm::PushConsumer_var me = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + proxy->connect_push_consumer (me.in (), + qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +RND_Consumer::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (CORBA::is_nil (this->proxy_.in ())) + return; + this->proxy_->disconnect_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + this->proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} + +void +RND_Consumer::push (const RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->driver_->event (event[0], ACE_TRY_ENV); +} + +void +RND_Consumer::disconnect_push_consumer (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +// **************************************************************** + +void +RND_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin, + const RtecEventChannelAdmin::SupplierQOS &qos, + CORBA::Environment &ACE_TRY_ENV) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (CORBA::is_nil (this->proxy_.in ())) + { + this->proxy_ = admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + } + + proxy = + RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ()); + } + RtecEventComm::PushSupplier_var me = + this->_this (ACE_TRY_ENV); + ACE_CHECK; + proxy->connect_push_supplier (me.in (), + qos, + ACE_TRY_ENV); + ACE_CHECK; +} + +void +RND_Supplier::disconnect (CORBA::Environment &ACE_TRY_ENV) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (CORBA::is_nil (this->proxy_.in ())) + return; + this->proxy_->disconnect_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + this->proxy_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); +} + +void +RND_Supplier::push_new_event (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + RtecEventComm::EventSet event (1); + event.length (1); + event[0].header.type = base_type; + event[0].header.source = 0; + + this->push (event, ACE_TRY_ENV); +} + +void +RND_Supplier::push (RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + RtecEventChannelAdmin::ProxyPushConsumer_var proxy; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (CORBA::is_nil (this->proxy_.in ())) + return; + + proxy = + RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ()); + } + + proxy->push (event, ACE_TRY_ENV); +} + +void +RND_Supplier::disconnect_push_supplier (CORBA::Environment &) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +int +RND_Supplier::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, "Thread %t started\n")); + int percent = 10; + int niterations = 5000; + for (int i = 0; i != niterations; ++i) + { + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->push_new_event (ACE_TRY_ENV); + ACE_TRY_CHECK; + + ACE_Time_Value tv (0, 10000); + ACE_OS::sleep (tv); + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + if (i * 100 / niterations >= percent) + { + ACE_DEBUG ((LM_DEBUG, "Thread %t %d%%\n", percent)); + percent += 10; + } + } + ACE_DEBUG ((LM_DEBUG, "Thread %t completed\n")); + return 0; +} diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.dsp b/TAO/orbsvcs/tests/Event/Basic/Random.dsp new file mode 100644 index 00000000000..51b53b40e3b --- /dev/null +++ b/TAO/orbsvcs/tests/Event/Basic/Random.dsp @@ -0,0 +1,102 @@ +# Microsoft Developer Studio Project File - Name="Random" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=Random - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "Random.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "Random.mak" CFG="Random - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "Random - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "Random - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "Random - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\lib" /I "..\..\.." /I "..\..\..\.." /I "..\..\..\..\.." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ECTest.lib TAO.lib ace.lib TAO_CosNaming.lib TAO_RTEvent.lib TAO_RTSched.lib TAO_Svc_Utils.lib /nologo /subsystem:console /machine:I386 /libpath:"..\lib" /libpath:"..\..\..\orbsvcs" /libpath:"..\..\..\..\tao" /libpath:"..\..\..\..\..\ace"
+
+!ELSEIF "$(CFG)" == "Random - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Random"
+# PROP BASE Intermediate_Dir "Random"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\lib" /I "..\..\.." /I "..\..\..\.." /I "..\..\..\..\.." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ECTestd.lib TAOd.lib aced.lib TAO_CosNamingd.lib TAO_RTEventd.lib TAO_RTSchedd.lib TAO_Svc_Utilsd.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept /libpath:"..\lib" /libpath:"..\..\..\orbsvcs" /libpath:"..\..\..\..\tao" /libpath:"..\..\..\..\..\ace"
+
+!ENDIF
+
+# Begin Target
+
+# Name "Random - Win32 Release"
+# Name "Random - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter ".cpp"
+# Begin Source File
+
+SOURCE=.\Random.cpp
+# End Source File
+# End Group
+# Begin Group "Header Files"
+
+# PROP Default_Filter ".h"
+# Begin Source File
+
+SOURCE=.\Random.h
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/TAO/orbsvcs/tests/Event/Basic/Random.h b/TAO/orbsvcs/tests/Event/Basic/Random.h new file mode 100644 index 00000000000..54fb8518cfe --- /dev/null +++ b/TAO/orbsvcs/tests/Event/Basic/Random.h @@ -0,0 +1,193 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel tests +// +// = FILENAME +// Random.h +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// ============================================================================ + +#ifndef EC_RANDOM_H +#define EC_RANDOM_H + +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "ace/Task.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined(_MSC_VER) +#if (_MSC_VER >= 1200) +#pragma warning(push) +#endif /* _MSC_VER >= 1200 */ +#pragma warning(disable:4250) +#endif /* _MSC_VER */ + +class RND_Driver; + +class RND_Consumer + : public POA_RtecEventComm::PushConsumer + , public PortableServer::RefCountServantBase +{ + // = TITLE + // Simple consumer object + // + // = DESCRIPTION + // +public: + RND_Consumer (RND_Driver *driver); + // Constructor + + void push (const RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + void disconnect_push_consumer (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + + void connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin, + const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &ACE_TRY_ENV); + void disconnect (CORBA::Environment &ACE_TRY_ENV); + +protected: + RND_Driver *driver_; + // The driver + + RtecEventChannelAdmin::ProxyPushSupplier_var proxy_; + // The supplier. + + ACE_SYNCH_MUTEX lock_; + // Synch +}; + +inline +RND_Consumer::RND_Consumer (RND_Driver *driver) + : driver_ (driver) +{ +} + +// **************************************************************** + +class RND_Timer : public RND_Consumer +{ +public: + RND_Timer (RND_Driver *driver); + + void push (const RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); +}; + +inline +RND_Timer::RND_Timer (RND_Driver *driver) + : RND_Consumer (driver) +{ +} + +// **************************************************************** + +class RND_Supplier + : public POA_RtecEventComm::PushSupplier + , public PortableServer::RefCountServantBase + , public ACE_Task_Base +{ + // = TITLE + // Simple supplier object + // + // = DESCRIPTION + // +public: + RND_Supplier (void); + // Constructor + + void connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin, + const RtecEventChannelAdmin::SupplierQOS &qos, + CORBA::Environment &ACE_TRY_ENV); + void disconnect (CORBA::Environment &ACE_TRY_ENV); + + void push_new_event (CORBA::Environment &ACE_TRY_ENV); + void push (RtecEventComm::EventSet &event, + CORBA::Environment &ACE_TRY_ENV); + // Push a single event... + + virtual void disconnect_push_supplier (CORBA::Environment &ACE_TRY_ENV) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual int svc (void); + // Active method + +private: + RtecEventChannelAdmin::ProxyPushConsumer_var proxy_; + // The supplier. + + ACE_SYNCH_MUTEX lock_; + // Synch +}; + +inline +RND_Supplier::RND_Supplier (void) +{ +} + +// **************************************************************** + +class RND_Driver +{ +public: + RND_Driver (void); + + int run (int argc, char *argv[]); + // Run the test + + void timer (const RtecEventComm::Event &e, + CORBA::Environment &ACE_TRY_ENV); + // The main timer has expired + + void event (const RtecEventComm::Event &e, + CORBA::Environment &ACE_TRY_ENV); + // One of the consumers has received an event + +private: + RND_Driver (const RND_Driver &); + RND_Driver& operator= (const RND_Driver &); + +private: + RND_Timer timer_; + // The main timer + + RND_Supplier supplier_; + // The supplier + + int nsuppliers_; + // Number of suppliers + + RND_Supplier **suppliers_; + // The suppliers + + int nconsumers_; + // Number of consumers + + RND_Consumer **consumers_; + // The consumers + + int max_recursion_; + // Maximum recursion + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin_; + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin_; +}; + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma warning(pop) +#endif /* _MSC_VER */ + +#endif /* EC_RANDOM_H */ diff --git a/TAO/orbsvcs/tests/Event/Basic/run_test.pl b/TAO/orbsvcs/tests/Event/Basic/run_test.pl index 406348d38e9..7be2c926b05 100755 --- a/TAO/orbsvcs/tests/Event/Basic/run_test.pl +++ b/TAO/orbsvcs/tests/Event/Basic/run_test.pl @@ -145,4 +145,14 @@ if ($T->TimedWait (60) == -1) { $T->Kill (); $T->TimedWait (1); } +print STDERR "\n\nRandom test\n"; +$T = Process::Create ($EXEPREFIX . "Random".$EXE_EXT, + " -ORBSvcConf $cwd$DIR_SEPARATOR" . "svc.conf" + . " -suppliers 4 -consumers 4 -max_recursion 1"); +if ($T->TimedWait (60) == -1) { + print STDERR "ERROR: Test timedout\n"; + $status = 1; + $T->Kill (); $T->TimedWait (1); +} + exit $status; diff --git a/TAO/orbsvcs/tests/Event/Event.dsw b/TAO/orbsvcs/tests/Event/Event.dsw index 7698b5dc1f1..a329ff8a7d7 100644 --- a/TAO/orbsvcs/tests/Event/Event.dsw +++ b/TAO/orbsvcs/tests/Event/Event.dsw @@ -195,6 +195,21 @@ Package=<4> ###############################################################################
+Project: "Random"=.\Basic\Random.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+ Begin Project Dependency
+ Project_Dep_Name ECTest
+ End Project Dependency
+}}}
+
+###############################################################################
+
Project: "Reconnect"=.\Basic\Reconnect.dsp - Package Owner=<4>
Package=<5>
|