diff options
24 files changed, 476 insertions, 79 deletions
diff --git a/TAO/ChangeLogs/ChangeLog-02a b/TAO/ChangeLogs/ChangeLog-02a index 01f5cdd6e8c..f57c259a907 100644 --- a/TAO/ChangeLogs/ChangeLog-02a +++ b/TAO/ChangeLogs/ChangeLog-02a @@ -1,8 +1,58 @@ +Wed Feb 23 18:35:41 2000 Carlos O'Ryan <coryan@uci.edu> + + * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp: + Invoke the consumer control strategy as soon as an + exception is raised while pushing to a consumer. + This fixes [BUGID:434] + + * orbsvcs/orbsvcs/Event/EC_Reactive_ConsumerControl.cpp: + * orbsvcs/orbsvcs/Event/EC_Reactive_SupplierControl.cpp: + The comments explaining what we do when we detect a system + exception while pushing an event where very confusing, i think + it is better now. + + * orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.h: + * orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.h: + * orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.cpp: + Include support for detecting misbehaving pull suppliers and + pull consumers. + This fixes [BUGID:433] + + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp: + * orbsvcs/orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.cpp: + Invoke the consumer or supplier control strategies as soon as an + exception is raised while pushing an event to a push consumer or + while pulling an event from a pull supplier. + + * orbsvcs/orbsvcs/Event/EC_Event_Channel.h: + * orbsvcs/orbsvcs/Event/EC_Event_Channel.i: + Provide access to the ConsumerControl and SupplierControl + interfaces. + + * orbsvcs/tests/CosEvent/Basic/run_test.pl: + There Pull_Push_Test had no header explaining its output. + + * orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp: + * orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp: + * orbsvcs/examples/RtEC/Schedule/Service.cpp: + Workaround apparent bugs in the GHS compiler. + It needs some extra #includes to instantiate the templates. + Thanks to Bill Tovrea <gwtovrea@west.raytheon.com> for reporting + this problem and testing potential fixes. + Wed Feb 23 20:36:30 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> - * tests/README: Updated this file to include brief descriptions of - all tests in the subdirectories. Thanks to Rick Hess - <rick.hess@lmco.com> for reporting this. + * tests/README: Updated this file to include brief descriptions of + all tests in the subdirectories. Thanks to Rick Hess + <rick.hess@lmco.com> for reporting this. Wed Feb 23 14:42:16 2000 Carlos O'Ryan <coryan@uci.edu> diff --git a/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp b/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp index 839ebaccdad..8341839c1e9 100644 --- a/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp +++ b/TAO/orbsvcs/examples/RtEC/Schedule/Service.cpp @@ -15,6 +15,7 @@ #include "ace/Get_Opt.h" #include "ace/Sched_Params.h" +#include "ace/Auto_Ptr.h" ACE_RCSID(EC_Examples, Service, "$Id$") @@ -227,8 +228,10 @@ main (int argc, char* argv[]) // lie to the scheduler to obtain right priorities; but we // don't care if the set is schedulable. tv.set (0, 10000); - TimeBase::TimeT rate; - ORBSVCS_Time::Time_Value_to_TimeT (rate, tv); + TimeBase::TimeT tmp; + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + RtecScheduler::Period_t rate = ACE_U64_TO_U32(tmp); + scheduler->set (supplier_rt_info1, RtecScheduler::VERY_HIGH_CRITICALITY, 0, 0, 0, @@ -249,7 +252,9 @@ main (int argc, char* argv[]) // lie to the scheduler to obtain right priorities; but we // don't care if the set is schedulable. tv.set (0, 20000); - ORBSVCS_Time::Time_Value_to_TimeT (rate, tv); + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + rate = ACE_U64_TO_U32(tmp); + scheduler->set (supplier_rt_info2, RtecScheduler::VERY_HIGH_CRITICALITY, 0, 0, 0, @@ -399,7 +404,7 @@ int parse_args (int argc, char *argv[]) default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s " - "-c (config run)" + "-c (config run)" "\n", argv [0]), -1); diff --git a/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp b/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp index 82d7280c387..5df20ad8a45 100644 --- a/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp +++ b/TAO/orbsvcs/examples/RtEC/Simple/Service.cpp @@ -44,7 +44,7 @@ main (int argc, char* argv[]) TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ()); - + TAO_EC_Event_Channel ec_impl (attributes); ec_impl.activate (ACE_TRY_ENV); ACE_TRY_CHECK; @@ -54,25 +54,25 @@ main (int argc, char* argv[]) ACE_TRY_CHECK; CORBA::String_var ior = - orb->object_to_string (event_channel.in (), ACE_TRY_ENV); + orb->object_to_string (event_channel.in (), ACE_TRY_ENV); ACE_TRY_CHECK; ACE_DEBUG ((LM_DEBUG, "Activated as <%s>\n", ior.in ())); // If the ior_output_file exists, output the ior to it if (ior_output_file != 0) - { - FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); - if (output_file == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot open output file for writing IOR: %s", - ior_output_file), - 1); - ACE_OS::fprintf (output_file, "%s", ior.in ()); - ACE_OS::fclose (output_file); - } - - // Wait for events, using work_pending()/perform_work() may help + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + // Wait for events, using work_pending()/perform_work() may help // or using another thread, this example is too simple for that. orb->run (); @@ -103,14 +103,14 @@ int parse_args (int argc, char *argv[]) switch (c) { case 'o': - ior_output_file = get_opts.optarg; - break; + ior_output_file = get_opts.optarg; + break; case '?': default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s " - "-o <iorfile>" + "-o <iorfile>" "\n", argv [0]), -1); diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.cpp index 2ec38673573..3be084955b5 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.cpp @@ -35,6 +35,12 @@ TAO_CEC_ConsumerControl::consumer_not_exist (TAO_CEC_ProxyPushSupplier *, } void +TAO_CEC_ConsumerControl::consumer_not_exist (TAO_CEC_ProxyPullSupplier *, + CORBA::Environment &) +{ +} + +void TAO_CEC_ConsumerControl::system_exception (TAO_CEC_ProxyPushSupplier *, CORBA::SystemException &, CORBA::Environment &) diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.h index dea3154fb23..d9e4026b33d 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ConsumerControl.h @@ -30,6 +30,7 @@ class TAO_CEC_EventChannel; class TAO_CEC_ProxyPushSupplier; +class TAO_CEC_ProxyPullSupplier; class TAO_ORBSVCS_Export TAO_CEC_ConsumerControl { @@ -66,6 +67,12 @@ public: // has been destroyed. The strategy has to (at the very least), // reclaim all the resources attached to that object. + virtual void consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy, + CORBA::Environment &); + // Invoked by helper classes when they detect that a consumer no + // longer exists (i.e. _non_existent() returns true and/or the + // CORBA::OBJECT_NOT_EXIST exception has been raised). + virtual void system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &, CORBA::Environment &); diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp index 3e654e2f33f..1ade08a3c72 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Default_Factory.cpp @@ -13,6 +13,7 @@ #include "CEC_Reactive_ConsumerControl.h" #include "CEC_Reactive_SupplierControl.h" #include "orbsvcs/Event/EC_Proxy_Collection.h" +#include "orbsvcs/Event/EC_Command.h" // Work around bug in GHS compiler #include "orbsvcs/Event/EC_Concrete_Proxy_Set.h" #include "ace/Arg_Shifter.h" #include "ace/Sched_Params.h" diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp index 6efef9ba2f4..46f2df0eb7d 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPullConsumer.cpp @@ -3,6 +3,7 @@ #include "CEC_ProxyPullConsumer.h" #include "CEC_EventChannel.h" #include "CEC_ConsumerAdmin.h" +#include "CEC_SupplierControl.h" #include "CEC_ProxyPullSupplier.h" #if ! defined (__ACE_INLINE__) @@ -65,10 +66,27 @@ TAO_CEC_ProxyPullConsumer::try_pull_from_supplier ( any = supplier->try_pull (has_event, ACE_TRY_ENV); ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) + { + TAO_CEC_SupplierControl *control = + this->event_channel_->supplier_control (); + + control->supplier_not_exist (this, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_CEC_SupplierControl *control = + this->event_channel_->supplier_control (); + + control->system_exception (this, + sysex, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } ACE_CATCHANY { - // @@ This is where the policies for misbehaving suppliers - // should kick in.... for the moment just ignore them. + // @@ Should not happen } ACE_ENDTRY; return any._retn (); diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp index 6b9f603e3f9..301f331589f 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_ProxyPushSupplier.cpp @@ -3,6 +3,7 @@ #include "CEC_ProxyPushSupplier.h" #include "CEC_Dispatching.h" #include "CEC_EventChannel.h" +#include "CEC_ConsumerControl.h" #if ! defined (__ACE_INLINE__) #include "CEC_ProxyPushSupplier.i" @@ -271,10 +272,27 @@ TAO_CEC_ProxyPushSupplier::push_to_consumer (const CORBA::Any& event, consumer->push (event, ACE_TRY_ENV); ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->consumer_not_exist (this, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->system_exception (this, + sysex, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } ACE_CATCHANY { - // @@ This is where the policies for misbehaving consumers - // should kick in.... for the moment just ignore them. + // Shouldn't happen, but does not hurt } ACE_ENDTRY; } @@ -303,10 +321,27 @@ TAO_CEC_ProxyPushSupplier::reactive_push_to_consumer ( consumer->push (event, ACE_TRY_ENV); ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->consumer_not_exist (this, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_CEC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->system_exception (this, + sysex, + ACE_TRY_ENV); + ACE_TRY_CHECK; + } ACE_CATCHANY { - // @@ This is where the policies for misbehaving consumers - // should kick in.... for the moment just ignore them. + // Shouldn't happen, but does not hurt } ACE_ENDTRY; } diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.cpp index ca7b80e8864..16fbaa6d94c 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.cpp @@ -4,6 +4,7 @@ #include "CEC_EventChannel.h" #include "CEC_ConsumerAdmin.h" #include "CEC_ProxyPushSupplier.h" +#include "CEC_ProxyPullSupplier.h" #if ! defined (__ACE_INLINE__) #include "CEC_Reactive_ConsumerControl.i" @@ -32,8 +33,13 @@ void TAO_CEC_Reactive_ConsumerControl::query_consumers ( CORBA::Environment &ACE_TRY_ENV) { - TAO_CEC_Ping_Consumer worker (this); - this->event_channel_->consumer_admin ()->for_each (&worker, + TAO_CEC_Ping_Push_Consumer push_worker (this); + this->event_channel_->consumer_admin ()->for_each (&push_worker, + ACE_TRY_ENV); + ACE_CHECK; + + TAO_CEC_Ping_Pull_Consumer pull_worker (this); + this->event_channel_->consumer_admin ()->for_each (&pull_worker, ACE_TRY_ENV); ACE_CHECK; } @@ -156,6 +162,25 @@ TAO_CEC_Reactive_ConsumerControl::consumer_not_exist ( } void +TAO_CEC_Reactive_ConsumerControl::consumer_not_exist ( + TAO_CEC_ProxyPullSupplier *proxy, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_TRY + { + proxy->disconnect_pull_supplier (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Reactive_ConsumerControl::consumer_not_exist"); + // Ignore all exceptions.. + } + ACE_ENDTRY; +} + +void TAO_CEC_Reactive_ConsumerControl::system_exception ( TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException & /* exception */, @@ -163,8 +188,11 @@ TAO_CEC_Reactive_ConsumerControl::system_exception ( { ACE_TRY { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // // if (CORBA::TRANSIENT::_narrow (&exception) != 0 // && exception->minor () == 0x54410085) // return; @@ -200,8 +228,53 @@ TAO_CEC_ConsumerControl_Adapter::handle_timeout ( // **************************************************************** void -TAO_CEC_Ping_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier, - CORBA::Environment &ACE_TRY_ENV) +TAO_CEC_Ping_Push_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_TRY + { + CORBA::Boolean disconnected; + CORBA::Boolean non_existent = + supplier->consumer_non_existent (disconnected, + ACE_TRY_ENV); + ACE_TRY_CHECK; + if (non_existent && !disconnected) + { + this->control_->consumer_not_exist (supplier, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) + { + this->control_->consumer_not_exist (supplier, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::TRANSIENT, transient) + { + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // + // if (CORBA::TRANSIENT::_narrow (&exception) != 0 + // && exception->minor () == 0x54410085) + // return; + + this->control_->consumer_not_exist (supplier, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Ignore all exceptions + } + ACE_ENDTRY; +} + +// **************************************************************** + +void +TAO_CEC_Ping_Pull_Consumer::work (TAO_CEC_ProxyPullSupplier *supplier, + CORBA::Environment &ACE_TRY_ENV) { ACE_TRY { @@ -223,9 +296,15 @@ TAO_CEC_Ping_Consumer::work (TAO_CEC_ProxyPushSupplier *supplier, } ACE_CATCH (CORBA::TRANSIENT, transient) { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. - // if (transient.minor () == 0x54410085) + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // + // if (CORBA::TRANSIENT::_narrow (&exception) != 0 + // && exception->minor () == 0x54410085) + // return; + this->control_->consumer_not_exist (supplier, ACE_TRY_ENV); ACE_TRY_CHECK; } diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h index 5c14cc1f1cf..58322f6e8c2 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.h @@ -90,6 +90,8 @@ public: virtual int shutdown (void); virtual void consumer_not_exist (TAO_CEC_ProxyPushSupplier *proxy, CORBA::Environment &); + virtual void consumer_not_exist (TAO_CEC_ProxyPullSupplier *proxy, + CORBA::Environment &); virtual void system_exception (TAO_CEC_ProxyPushSupplier *proxy, CORBA::SystemException &, CORBA::Environment &); @@ -124,10 +126,10 @@ private: // **************************************************************** -class TAO_CEC_Ping_Consumer : public TAO_EC_Worker<TAO_CEC_ProxyPushSupplier> +class TAO_CEC_Ping_Push_Consumer : public TAO_EC_Worker<TAO_CEC_ProxyPushSupplier> { public: - TAO_CEC_Ping_Consumer (TAO_CEC_ConsumerControl *control); + TAO_CEC_Ping_Push_Consumer (TAO_CEC_ConsumerControl *control); virtual void work (TAO_CEC_ProxyPushSupplier *supplier, CORBA::Environment &ACE_TRY_ENV); @@ -136,6 +138,20 @@ private: TAO_CEC_ConsumerControl *control_; }; +// **************************************************************** + +class TAO_CEC_Ping_Pull_Consumer : public TAO_EC_Worker<TAO_CEC_ProxyPullSupplier> +{ +public: + TAO_CEC_Ping_Pull_Consumer (TAO_CEC_ConsumerControl *control); + + virtual void work (TAO_CEC_ProxyPullSupplier *supplier, + CORBA::Environment &ACE_TRY_ENV); + +private: + TAO_CEC_ConsumerControl *control_; +}; + #if defined (__ACE_INLINE__) #include "CEC_Reactive_ConsumerControl.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i index a914ecd036e..ce73a433eba 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_ConsumerControl.i @@ -1,7 +1,17 @@ // $Id$ ACE_INLINE -TAO_CEC_Ping_Consumer::TAO_CEC_Ping_Consumer (TAO_CEC_ConsumerControl *control) +TAO_CEC_Ping_Push_Consumer:: + TAO_CEC_Ping_Push_Consumer (TAO_CEC_ConsumerControl *control) + : control_ (control) +{ +} + +// **************************************************************** + +ACE_INLINE +TAO_CEC_Ping_Pull_Consumer:: + TAO_CEC_Ping_Pull_Consumer (TAO_CEC_ConsumerControl *control) : control_ (control) { } diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.cpp index 095f1294357..41b626ad958 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_Pulling_Strategy.cpp @@ -162,19 +162,6 @@ TAO_CEC_Pull_Event::work (TAO_CEC_ProxyPullConsumer *consumer, ACE_TRY_ENV); ACE_TRY_CHECK; } - ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) - { - // @@ this->supplier_control_->supplier_not_exist (consumer, ACE_TRY_ENV); - // @@ ACE_TRY_CHECK; - } - ACE_CATCH (CORBA::TRANSIENT, transient) - { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. - // if (transient.minor () == 0x54410085) - // @@ this->control_->supplier_not_exist (consumer, ACE_TRY_ENV); - // @@ ACE_TRY_CHECK; - } ACE_CATCHANY { // Ignore all exceptions diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.cpp index 818b8d812f0..914e2ea5281 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.cpp @@ -4,6 +4,7 @@ #include "CEC_EventChannel.h" #include "CEC_SupplierAdmin.h" #include "CEC_ProxyPushConsumer.h" +#include "CEC_ProxyPullConsumer.h" #if ! defined (__ACE_INLINE__) #include "CEC_Reactive_SupplierControl.i" @@ -32,8 +33,13 @@ void TAO_CEC_Reactive_SupplierControl::query_suppliers ( CORBA::Environment &ACE_TRY_ENV) { - TAO_CEC_Ping_Supplier worker (this); - this->event_channel_->supplier_admin ()->for_each (&worker, + TAO_CEC_Ping_Push_Supplier push_worker (this); + this->event_channel_->supplier_admin ()->for_each (&push_worker, + ACE_TRY_ENV); + ACE_CHECK; + + TAO_CEC_Ping_Pull_Supplier pull_worker (this); + this->event_channel_->supplier_admin ()->for_each (&pull_worker, ACE_TRY_ENV); ACE_CHECK; } @@ -154,21 +160,41 @@ TAO_CEC_Reactive_SupplierControl::supplier_not_exist ( } void +TAO_CEC_Reactive_SupplierControl::supplier_not_exist ( + TAO_CEC_ProxyPullConsumer *proxy, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_TRY + { + proxy->disconnect_pull_consumer (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Ignore all exceptions.. + } + ACE_ENDTRY; +} + +void TAO_CEC_Reactive_SupplierControl::system_exception ( - TAO_CEC_ProxyPushConsumer *proxy, + TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException & /* exception */, CORBA::Environment &ACE_TRY_ENV) { ACE_TRY { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // // if (CORBA::TRANSIENT::_narrow (&exception) != 0 // && exception->minor () == 0x54410085) // return; // Anything else is serious, including timeouts... - proxy->disconnect_push_consumer (ACE_TRY_ENV); + proxy->disconnect_pull_consumer (ACE_TRY_ENV); ACE_TRY_CHECK; } ACE_CATCHANY @@ -198,8 +224,54 @@ TAO_CEC_SupplierControl_Adapter::handle_timeout ( // **************************************************************** void -TAO_CEC_Ping_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer, - CORBA::Environment &ACE_TRY_ENV) +TAO_CEC_Ping_Push_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_TRY + { + CORBA::Boolean disconnected; + CORBA::Boolean non_existent = + consumer->supplier_non_existent (disconnected, + ACE_TRY_ENV); + ACE_TRY_CHECK; + if (non_existent && !disconnected) + { + this->control_->supplier_not_exist (consumer, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex) + { + this->control_->supplier_not_exist (consumer, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::TRANSIENT, transient) + { + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // + // if (CORBA::TRANSIENT::_narrow (&exception) != 0 + // && exception->minor () == 0x54410085) + // return; + + // Anything else is serious, including timeouts... + this->control_->supplier_not_exist (consumer, ACE_TRY_ENV); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Ignore all exceptions + } + ACE_ENDTRY; +} + +// **************************************************************** + +void +TAO_CEC_Ping_Pull_Supplier::work (TAO_CEC_ProxyPullConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV) { ACE_TRY { @@ -221,9 +293,16 @@ TAO_CEC_Ping_Supplier::work (TAO_CEC_ProxyPushConsumer *consumer, } ACE_CATCH (CORBA::TRANSIENT, transient) { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. - // if (transient.minor () == 0x54410085) + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // + // if (CORBA::TRANSIENT::_narrow (&exception) != 0 + // && exception->minor () == 0x54410085) + // return; + + // Anything else is serious, including timeouts... this->control_->supplier_not_exist (consumer, ACE_TRY_ENV); ACE_TRY_CHECK; } diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h index ec385efb896..c3412f790ba 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.h @@ -90,7 +90,9 @@ public: virtual int shutdown (void); virtual void supplier_not_exist (TAO_CEC_ProxyPushConsumer *proxy, CORBA::Environment &); - virtual void system_exception (TAO_CEC_ProxyPushConsumer *proxy, + virtual void supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy, + CORBA::Environment &); + virtual void system_exception (TAO_CEC_ProxyPullConsumer *proxy, CORBA::SystemException &, CORBA::Environment &); @@ -124,10 +126,10 @@ private: // **************************************************************** -class TAO_CEC_Ping_Supplier : public TAO_EC_Worker<TAO_CEC_ProxyPushConsumer> +class TAO_CEC_Ping_Push_Supplier : public TAO_EC_Worker<TAO_CEC_ProxyPushConsumer> { public: - TAO_CEC_Ping_Supplier (TAO_CEC_SupplierControl *control); + TAO_CEC_Ping_Push_Supplier (TAO_CEC_SupplierControl *control); virtual void work (TAO_CEC_ProxyPushConsumer *consumer, CORBA::Environment &ACE_TRY_ENV); @@ -136,6 +138,20 @@ private: TAO_CEC_SupplierControl *control_; }; +// **************************************************************** + +class TAO_CEC_Ping_Pull_Supplier : public TAO_EC_Worker<TAO_CEC_ProxyPullConsumer> +{ +public: + TAO_CEC_Ping_Pull_Supplier (TAO_CEC_SupplierControl *control); + + virtual void work (TAO_CEC_ProxyPullConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV); + +private: + TAO_CEC_SupplierControl *control_; +}; + #if defined (__ACE_INLINE__) #include "CEC_Reactive_SupplierControl.i" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i index 3f3cb871495..ba080617bce 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_Reactive_SupplierControl.i @@ -1,8 +1,17 @@ // $Id$ ACE_INLINE -TAO_CEC_Ping_Supplier::TAO_CEC_Ping_Supplier (TAO_CEC_SupplierControl *control) +TAO_CEC_Ping_Push_Supplier:: + TAO_CEC_Ping_Push_Supplier (TAO_CEC_SupplierControl *control) : control_ (control) { } +// **************************************************************** + +ACE_INLINE +TAO_CEC_Ping_Pull_Supplier:: + TAO_CEC_Ping_Pull_Supplier (TAO_CEC_SupplierControl *control) + : control_ (control) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.cpp b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.cpp index 612d8586265..c3953e03ebb 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.cpp +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.cpp @@ -34,6 +34,19 @@ TAO_CEC_SupplierControl::supplier_not_exist (TAO_CEC_ProxyPushConsumer *, { } +void +TAO_CEC_SupplierControl::supplier_not_exist (TAO_CEC_ProxyPullConsumer *, + CORBA::Environment &) +{ +} + +void +TAO_CEC_SupplierControl::system_exception (TAO_CEC_ProxyPullConsumer *, + CORBA::SystemException &, + CORBA::Environment &) +{ +} + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) diff --git a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.h b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.h index 268a7df473f..a20b5ba3b27 100644 --- a/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.h +++ b/TAO/orbsvcs/orbsvcs/CosEvent/CEC_SupplierControl.h @@ -30,6 +30,7 @@ class TAO_CEC_EventChannel; class TAO_CEC_ProxyPushConsumer; +class TAO_CEC_ProxyPullConsumer; class TAO_ORBSVCS_Export TAO_CEC_SupplierControl { @@ -64,6 +65,17 @@ public: // Invoked by helper classes when they detect that a supplier does // not exists (i.e. _non_existent() returns true and/or the // CORBA::OBJECT_NOT_EXIST exception has been raised). + + virtual void supplier_not_exist (TAO_CEC_ProxyPullConsumer *proxy, + CORBA::Environment &); + // Invoked by helper classes when they detect that a supplier does + // not exists (i.e. _non_existent() returns true and/or the + // CORBA::OBJECT_NOT_EXIST exception has been raised). + + virtual void system_exception (TAO_CEC_ProxyPullConsumer *proxy, + CORBA::SystemException &, + CORBA::Environment &); + // Some system exception was rasied while trying to push an event. }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp index c12ca6d43ea..c1a8963b7c3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp @@ -17,6 +17,7 @@ #include "EC_Priority_Scheduling.h" #include "EC_Proxy_Collection.h" #include "EC_Concrete_Proxy_Set.h" +#include "EC_Command.h" // Work around bug in GHS compiler #include "EC_Reactive_Timeout_Generator.h" #include "EC_Event_Channel.h" #include "EC_Reactive_ConsumerControl.h" diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h index 1231331cb9c..21a22caefaf 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h @@ -149,6 +149,10 @@ public: TAO_EC_Scheduling_Strategy* scheduling_strategy (void) const; // Access the scheduling strategy + TAO_EC_ConsumerControl *consumer_control (void) const; + TAO_EC_SupplierControl *supplier_control (void) const; + // Access the client control strategies. + // = The factory methods, they delegate on the EC_Factory. TAO_EC_ProxyPushSupplier* create_proxy_push_supplier (void); void destroy_proxy_push_supplier (TAO_EC_ProxyPushSupplier*); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i index b06d45ecba1..93c1b7585e4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i @@ -57,6 +57,18 @@ TAO_EC_Event_Channel::scheduling_strategy (void) const return this->scheduling_strategy_; } +ACE_INLINE TAO_EC_ConsumerControl* +TAO_EC_Event_Channel::consumer_control (void) const +{ + return this->consumer_control_; +} + +ACE_INLINE TAO_EC_SupplierControl* +TAO_EC_Event_Channel::supplier_control (void) const +{ + return this->supplier_control_; +} + ACE_INLINE TAO_EC_ProxyPushSupplier* TAO_EC_Event_Channel::create_proxy_push_supplier (void) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index 2de191d2487..d93a9327b9a 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -6,6 +6,7 @@ #include "EC_QOS_Info.h" #include "EC_Event_Channel.h" #include "EC_Scheduling_Strategy.h" +#include "EC_ConsumerControl.h" #if ! defined (__ACE_INLINE__) #include "EC_ProxySupplier.i" @@ -438,10 +439,25 @@ TAO_EC_ProxyPushSupplier::push_to_consumer (const RtecEventComm::EventSet& event consumer->push (event, ACE_TRY_ENV); ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) + { + TAO_EC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->consumer_not_exist (this, ACE_TRY_ENV); + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_EC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->system_exception (this, + sysex, + ACE_TRY_ENV); + } ACE_CATCHANY { - // @@ This is where the policies for misbehaving consumers - // should kick in.... for the moment just ignore them. + // Shouldn't happen, but does not hurt } ACE_ENDTRY; } @@ -472,10 +488,25 @@ TAO_EC_ProxyPushSupplier::reactive_push_to_consumer ( consumer->push (event, ACE_TRY_ENV); ACE_TRY_CHECK; } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_used) + { + TAO_EC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->consumer_not_exist (this, ACE_TRY_ENV); + } + ACE_CATCH (CORBA::SystemException, sysex) + { + TAO_EC_ConsumerControl *control = + this->event_channel_->consumer_control (); + + control->system_exception (this, + sysex, + ACE_TRY_ENV); + } ACE_CATCHANY { - // @@ This is where the policies for misbehaving consumers - // should kick in.... for the moment just ignore them. + // Shouldn't happen } ACE_ENDTRY; } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_ConsumerControl.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_ConsumerControl.cpp index 1952164eb4c..d16e11d5d93 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_ConsumerControl.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_ConsumerControl.cpp @@ -163,8 +163,11 @@ TAO_EC_Reactive_ConsumerControl::system_exception ( { ACE_TRY { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // // if (CORBA::TRANSIENT::_narrow (&exception) != 0 // && exception->minor () == 0x54410085) // return; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_SupplierControl.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_SupplierControl.cpp index 8e6f94564df..8e192d8d7ef 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_SupplierControl.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Reactive_SupplierControl.cpp @@ -161,8 +161,11 @@ TAO_EC_Reactive_SupplierControl::system_exception ( { ACE_TRY { - // This is TAO's minor code for a failed connection, we may - // want to be more lenient in the future.. + // The current implementation is very strict, and kicks out a + // client on the first system exception. We may + // want to be more lenient in the future, for example, + // this is TAO's minor code for a failed connection. + // // if (CORBA::TRANSIENT::_narrow (&exception) != 0 // && exception->minor () == 0x54410085) // return; diff --git a/TAO/orbsvcs/tests/CosEvent/Basic/run_test.pl b/TAO/orbsvcs/tests/CosEvent/Basic/run_test.pl index 748213cbfd4..6b861e791de 100755 --- a/TAO/orbsvcs/tests/CosEvent/Basic/run_test.pl +++ b/TAO/orbsvcs/tests/CosEvent/Basic/run_test.pl @@ -47,7 +47,7 @@ if ($T->TimedWait (60) == -1) { $T->Kill (); $T->TimedWait (1); } -#print STDERR "\n\nPull-Push Events\n"; +print STDERR "\n\nPull-Push Events\n"; $T = Process::Create ($prefix . "Pull_Push_Event".$EXE_EXT, " -ORBSvcConf svc.pull.conf"); if ($T->TimedWait (60) == -1) { |