diff options
author | wilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-10-29 16:10:47 +0000 |
---|---|---|
committer | wilson_d <wilson_d@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-10-29 16:10:47 +0000 |
commit | 50e18994616f637b3d5444594da771f2318cc0d1 (patch) | |
tree | 19f0ddc17f1db668d1c60dd7b7fade30a7b53e7c /TAO | |
parent | 6b91b5bcda830a14261024b894a483d28b290328 (diff) | |
download | ATCD-50e18994616f637b3d5444594da771f2318cc0d1.tar.gz |
ChangeLogTag: Fri Oct 29 10:53:56 2004 Dale Wilson <wilson_d@ociweb.com>pnotify_branch
Diffstat (limited to 'TAO')
24 files changed, 400 insertions, 231 deletions
diff --git a/TAO/ChangeLog_pnotify b/TAO/ChangeLog_pnotify index 4009c8f5089..44b45852664 100644 --- a/TAO/ChangeLog_pnotify +++ b/TAO/ChangeLog_pnotify @@ -1,3 +1,42 @@ +Fri Oct 29 10:53:56 2004 Dale Wilson <wilson_d@ociweb.com> + + * orbsvcs/Notify_Service/Notify_Service.cpp: + Don't write IOR file until Notification Service + is *really* ready to run. + + * orbsvcs/orbsvcs/Notify/Admin.cpp: + * orbsvcs/orbsvcs/Notify/Builder.cpp: + * orbsvcs/orbsvcs/Notify/Consumer.cpp: + * orbsvcs/orbsvcs/Notify/Delivery_Request.h: + * orbsvcs/orbsvcs/Notify/Delivery_Request.inl: + * orbsvcs/orbsvcs/Notify/EventChannelFactory.h: + * orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp: + * orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp: + * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h: + * orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp: + * orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Random_File.cpp: + * orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp: + * orbsvcs/orbsvcs/Notify/Routing_Slip.h: + * orbsvcs/orbsvcs/Notify/Routing_Slip.cpp: + * orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp: + + * orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h: + * orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp: + * orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp: + + Add Event reloading and restarting. Passes Reconnection test. + There is a lot of debug output still turned on so some housecleaning + is definately in order. + + + * orbsvcs/tests/Notify/Reconnecting/Consumer.cpp: + * orbsvcs/tests/Notify/Reconnecting/Supplier.cpp: + Add additional diagnostic information to track down problems + revealed by test. + + Wed Oct 27 11:59:01 2004 Dale Wilson <wilson_d@ociweb.com> * orbsvcs/orbsvcs/CosNotification.mpc: diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp index 0e1eaf79ed2..12f7fdd7ea7 100644 --- a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp +++ b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp @@ -10,7 +10,7 @@ #include "ace/Argv_Type_Converter.h" #include "tao/ORB_Core.h" #include "ace/Dynamic_Service.h" -#include "../orbsvcs/Notify/Service.h" +#include "orbsvcs/orbsvcs/Notify/Service.h" TAO_Notify_Service_Driver::TAO_Notify_Service_Driver (void) : notify_service_ (0), @@ -90,7 +90,7 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[] if (this->notify_service_ == 0) { - ACE_DEBUG ((LM_DEBUG, "Service not found! check conf. file\n")); + ACE_DEBUG ((LM_DEBUG, "Service not found! check conf.file\n")); return -1; } @@ -139,24 +139,6 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[] ACE_ASSERT (!CORBA::is_nil (this->notify_factory_.in ())); - // Write IOR to a file, if asked. - CORBA::String_var str = - this->orb_->object_to_string (this->notify_factory_.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - if (this->ior_output_file_) - { - ACE_OS::fprintf (this->ior_output_file_, - "%s", - str.in ()); - ACE_OS::fclose (this->ior_output_file_); - this->ior_output_file_ = 0; - } - else if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "The Notification Event Channel Factory IOR is <%s>\n", - str.in ())); - // Make it bootstrappable, if asked. if (this->bootstrap_) { @@ -184,84 +166,103 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[] } } + // Register with the Name service, if asked if (this->use_name_svc_) - { - // Register the Factory - ACE_ASSERT (!CORBA::is_nil (this->naming_.in ())); + { + // Register the Factory + ACE_ASSERT (!CORBA::is_nil (this->naming_.in ())); #if defined (TAO_NOTIFY_USE_NAMING_CONTEXT) - CosNaming::Name name (1); - name.length (1); - name[0].id = - CORBA::string_dup (this->notify_factory_name_.c_str ()); + CosNaming::Name name (1); + name.length (1); + name[0].id = + CORBA::string_dup (this->notify_factory_name_.c_str ()); #else - CosNaming::Name_var name = - this->naming_->to_name (this->notify_factory_name_.c_str () - ACE_ENV_ARG_PARAMETER); + CosNaming::Name_var name = + this->naming_->to_name (this->notify_factory_name_.c_str () + ACE_ENV_ARG_PARAMETER); #endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */ - ACE_CHECK_RETURN (-1); + ACE_CHECK_RETURN (-1); #if defined (TAO_NOTIFY_USE_NAMING_CONTEXT) - this->naming_->rebind (name, - this->notify_factory_.in () - ACE_ENV_ARG_PARAMETER); + this->naming_->rebind (name, + this->notify_factory_.in () + ACE_ENV_ARG_PARAMETER); #else - this->naming_->rebind (name.in (), - this->notify_factory_.in () - ACE_ENV_ARG_PARAMETER); + this->naming_->rebind (name.in (), + this->notify_factory_.in () + ACE_ENV_ARG_PARAMETER); #endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */ - ACE_CHECK_RETURN (-1); + ACE_CHECK_RETURN (-1); - ACE_DEBUG ((LM_DEBUG, - "Registered with the naming service as: %s\n", - this->notify_factory_name_.c_str())); - - if (this->register_event_channel_ == 1) - { - // create an event channel - CosNotifyChannelAdmin::ChannelID id; - - CosNotification::QoSProperties initial_qos; - CosNotification::AdminProperties initial_admin; - - CosNotifyChannelAdmin::EventChannel_var ec = - this->notify_factory_->create_channel (initial_qos, - initial_admin, - id - ACE_ENV_ARG_PARAMETER); + ACE_DEBUG ((LM_DEBUG, + "Registered with the naming service as: %s\n", + this->notify_factory_name_.c_str())); + + if (this->register_event_channel_ == 1) + { + // create an event channel + CosNotifyChannelAdmin::ChannelID id; + + CosNotification::QoSProperties initial_qos; + CosNotification::AdminProperties initial_admin; + + CosNotifyChannelAdmin::EventChannel_var ec = + this->notify_factory_->create_channel (initial_qos, + initial_admin, + id + ACE_ENV_ARG_PARAMETER); #if defined (TAO_NOTIFY_USE_NAMING_CONTEXT) - name[0].id = - CORBA::string_dup (this->notify_channel_name_.c_str ()); + name[0].id = + CORBA::string_dup (this->notify_channel_name_.c_str ()); #else - name = this->naming_->to_name ( - this->notify_channel_name_.c_str () - ACE_ENV_ARG_PARAMETER); + name = this->naming_->to_name ( + this->notify_channel_name_.c_str () + ACE_ENV_ARG_PARAMETER); #endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */ - ACE_CHECK_RETURN (-1); - + ACE_CHECK_RETURN (-1); #if defined (TAO_NOTIFY_USE_NAMING_CONTEXT) - this->naming_->rebind (name, - ec.in () - ACE_ENV_ARG_PARAMETER); + this->naming_->rebind (name, + ec.in () + ACE_ENV_ARG_PARAMETER); #else - this->naming_->rebind (name.in (), - ec.in () - ACE_ENV_ARG_PARAMETER); + this->naming_->rebind (name.in (), + ec.in () + ACE_ENV_ARG_PARAMETER); #endif /* TAO_NOTIFY_USE_NAMING_CONTEXT */ - ACE_CHECK_RETURN (-1); + ACE_CHECK_RETURN (-1); + + ACE_DEBUG ((LM_DEBUG, + "Registered an Event Channel with the naming service as: %s\n", + this->notify_channel_name_.c_str())); + + } + } - ACE_DEBUG ((LM_DEBUG, - "Registered an Event Channel with the naming service as: %s\n", - this->notify_channel_name_.c_str())); + // Write IOR to a file, if asked. + // Note: do this last to ensure that we're up and running before the file is written + CORBA::String_var str = + this->orb_->object_to_string (this->notify_factory_.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); - } + if (this->ior_output_file_) + { + ACE_OS::fprintf (this->ior_output_file_, + "%s", + str.in ()); + ACE_OS::fclose (this->ior_output_file_); + this->ior_output_file_ = 0; } + else if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "The Notification Event Channel Factory IOR is <%s>\n", + str.in ())); return 0; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp index 04d5b4151d3..c09b42e1bee 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Admin.cpp @@ -159,7 +159,7 @@ TAO_Notify_Admin::load_attrs(const TAO_Notify::NVPList& attrs) } if (attrs.find ("default", value)) { - this->is_default_ = ACE_OS::strcmp (value, "yes"); + this->is_default_ = (ACE_OS::strcmp (value, "yes") == 0); } } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp index 42d97901752..e6958fdb4aa 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp @@ -100,5 +100,9 @@ void TAO_Notify_PushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer ACE_ENV_ARG_DECL) { - int todo_reconnect; + TAO_Notify_PushConsumer* tmp = ACE_dynamic_cast(TAO_Notify_PushConsumer*, old_consumer); + ACE_ASSERT(tmp != 0); + this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->schedule_timer(false); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp index 1c185e7dd4a..5aa8b9c2390 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp @@ -211,12 +211,9 @@ TAO_Notify_Builder::build_event_channel_factory (PortableServer::POA_ptr poa ACE // PortableServer::ServantBase_var servant_var (ecf); - ecf->TAO_Notify_EventChannelFactory::init (poa ACE_ENV_ARG_PARAMETER); - - CORBA::Object_var obj = ecf->activate (ecf ACE_ENV_ARG_PARAMETER); + ecf->init (poa ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (ecf_ret._retn ()); - - ecf_ret = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER); + ecf_ret = ecf->activate_self (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (ecf_ret._retn ()); return (ecf_ret._retn ()); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp index f7b23e7d7b6..ff9b0a58e60 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp @@ -58,14 +58,14 @@ TAO_Notify_Consumer::proxy (void) return this->proxy_supplier (); } -//@@ todo: consider buffering strategy void TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties) { this->max_batch_size_ = qos_properties.maximum_batch_size (); /* -if (this->max_batch_size_.is_valid ()) +//@@ todo: consider buffering strategy + if (this->max_batch_size_.is_valid ()) {// set the max batch size. this->buffering_strategy_->batch_size (this->max_batch_size_.value ()); } @@ -346,6 +346,7 @@ TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch) ex._info ().c_str () )); // @@todo what to return here? + return DISPATCH_RETRY; } ACE_CATCHANY { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h index 6bdf5c08496..235a5a68f85 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.h @@ -17,6 +17,7 @@ // Forward declarations of referenced classes class TAO_Notify_EventChannelFactory; +class TAO_Notify_ProxySupplier; namespace TAO_Notify { @@ -98,6 +99,9 @@ public: /// expose routing slip method bool should_retry () const; + /// expose routing slip method + void dispatch (TAO_Notify_ProxySupplier * proxy_supplier, bool filter ACE_ENV_ARG_DECL); + // Meaningless, but needed by ACE_Vector on some platforms (gcc2.x LynxOS) bool operator == (const Delivery_Request & rhs) const; // Meaningless, but needed by ACE_Vector on some platforms diff --git a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl index c8b279b908d..7ebe5e07392 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Delivery_Request.inl @@ -40,6 +40,15 @@ Delivery_Request::should_retry () const return this->routing_slip_->should_retry (); } +void +Delivery_Request::dispatch ( + TAO_Notify_ProxySupplier * proxy_supplier, + bool filter ACE_ENV_ARG_DECL) +{ + this->routing_slip_->dispatch (proxy_supplier, filter ACE_ENV_ARG_PARAMETER); +} + + } // namespace TAO_Notify diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp index ad1b3a8661f..14f84bb280e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.cpp @@ -288,8 +288,6 @@ TAO_Notify_EventChannelFactory::save_persistent (TAO_Notify::Topology_Saver& sav void TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL) { -#define EVENT_PERISISTENCE_SUPPORT //@@todo -#ifdef EVENT_PERISISTENCE_SUPPORT //@@todo TAO_Notify::Event_Persistence_Strategy * strategy = ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); if (strategy != 0) @@ -329,9 +327,6 @@ TAO_Notify_EventChannelFactory::load_event_persistence (ACE_ENV_SINGLE_ARG_DECL) ACE_CHECK; } } -#else //EVENT_PERISISTENCE_SUPPORT //@@todo -#pragma message ("TODO: EVENT_PERISISTENCE_SUPPORT") -#endif //EVENT_PERISISTENCE_SUPPORT //@@todo } bool @@ -411,13 +406,8 @@ TAO_Notify_EventChannelFactory::reconnect (ACE_ENV_SINGLE_ARG_DECL) ACE_CHECK; // Then send reconnection announcement to registered clients - CORBA::Object_var obj = this->poa()->id_to_reference (this->id () ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - CosNotifyChannelAdmin::EventChannelFactory_var this_reference = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ()); - ACE_ASSERT (CORBA::is_nil (this_reference.in ())); - // todo: Is there an easier way? - this->reconnect_registry_.send_reconnect (this_reference.in () ACE_ENV_ARG_PARAMETER); + ACE_ASSERT (!CORBA::is_nil (this->channel_factory_.in ())); + this->reconnect_registry_.send_reconnect (this->channel_factory_.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK; // reactivate events in-progress @@ -525,6 +515,39 @@ TAO_Notify_EventChannelFactory::find_proxy_supplier (TAO_Notify::IdVec & id_path return result; } +CosNotifyChannelAdmin::EventChannelFactory_ptr +TAO_Notify_EventChannelFactory::activate_self (ACE_ENV_SINGLE_ARG_DECL) +{ + CORBA::Object_var obj = this->activate (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannelFactory::_nil ()); + this->channel_factory_ + = CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER); + CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_NEW_ENV + { + if (DEBUG_LEVEL > 9) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) TAO_Notify_EventChannelFactory::activate_self") )); + } + this->reconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // ignore for now + } + ACE_ENDTRY; + return this->channel_factory_._retn(); +} + + +TAO_Notify_Object::ID +TAO_Notify_EventChannelFactory::get_id () const +{ + return id(); +} + + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class TAO_Notify_Find_Worker_T<TAO_Notify_EventChannel diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h index c2eecb3c863..41af5f203bc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h +++ b/TAO/orbsvcs/orbsvcs/Notify/EventChannelFactory.h @@ -100,6 +100,7 @@ public: CORBA::Long id, const TAO_Notify::NVPList& attrs ACE_ENV_ARG_DECL); + CosNotifyChannelAdmin::EventChannelFactory_ptr activate_self (ACE_ENV_SINGLE_ARG_DECL); virtual void reconnect (ACE_ENV_SINGLE_ARG_DECL); /// handle change notifications @@ -113,7 +114,7 @@ public: TAO_Notify_ProxyConsumer * find_proxy_consumer (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL); TAO_Notify_ProxySupplier * find_proxy_supplier (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL); TAO_Notify_Object * follow_id_path (TAO_Notify::IdVec & id_path, size_t position ACE_ENV_ARG_DECL); - virtual TAO_Notify_Object::ID get_id () const {return id();} // @@todo move to INL + virtual TAO_Notify_Object::ID get_id () const; protected: @@ -176,6 +177,9 @@ public: private: TAO_SYNCH_MUTEX topology_save_lock_; + + CosNotifyChannelAdmin::EventChannelFactory_var channel_factory_; + /// change-in-progress detector to avoid duplicates short topology_save_seq_; TAO_Notify::Topology_Factory* topology_factory_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp index 57c4528586b..14c22d871cc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Lookup.cpp @@ -37,8 +37,16 @@ TAO_Notify_Method_Request_Lookup::work ( TAO_Notify_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL) { - TAO_Notify_Method_Request_Dispatch_No_Copy request (*this, proxy_supplier, true); - proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER); + if (delivery_request_.get () == 0) + { + TAO_Notify_Method_Request_Dispatch_No_Copy request (*this, proxy_supplier, true); + proxy_supplier->deliver (request ACE_ENV_ARG_PARAMETER); + } + else + { + delivery_request_->dispatch (proxy_supplier, true ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } } TAO_Notify_Method_Request_Lookup::execute_i (ACE_ENV_SINGLE_ARG_DECL) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp index 24f960f74f4..c8e21dac7d9 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.cpp @@ -6,7 +6,9 @@ #include "ace/OS_NS_string.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_Notify { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h index 15b1e4eb713..97ea0556f36 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Persistent_File_Allocator.h @@ -120,6 +120,7 @@ private: * blocks, reads, and writes. This class also manages a thread that performs * background updating of a Random_File. * @@todo this is too much for one class to do. It should be refactored. + * @@todo: we shouldn't arbitrarily use a thread. */ class TAO_Notify_Serv_Export Persistent_File_Allocator { diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp index 77c6bad51dd..1d95752eea9 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp @@ -21,7 +21,7 @@ ACE_RCSID(RT_Notify, TAO_Notify_ProxyConsumer, "$Id$") #include "SupplierAdmin.h" #include "EventChannel.h" #include "Routing_Slip.h" -#define DEBUG_LEVEL 10 +//#define DEBUG_LEVEL 10 #ifndef DEBUG_LEVEL # define DEBUG_LEVEL TAO_debug_level #endif //DEBUG_LEVEL diff --git a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp index 3542a10690d..9fbb6e11c20 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Random_File.cpp @@ -4,8 +4,10 @@ #include "ace/OS.h" #include <tao/debug.h> -//#define DEBUG_LEVEL 9 // uncomment to force debug messages -#define DEBUG_LEVEL TAO_debug_level // coment to force debug messages +//#define DEBUG_LEVEL 9 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_Notify { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp b/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp index a074eca09c8..322cc463d45 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Reconnection_Registry.cpp @@ -17,6 +17,11 @@ #include "Properties.h" #include "Topology_Saver.h" #include <ace/Vector_T.h> +//#define DEBUG_LEVEL 10 +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif + namespace TAO_Notify { Reconnection_Registry::Reconnection_Registry (Topology_Parent & parent) @@ -41,7 +46,7 @@ namespace TAO_Notify //@@todo DO WE NEED THREAD SAFENESS? NotifyExt::ReconnectionRegistry::ReconnectionID next_id = ++highest_id_; - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Reconnect registry: registering %d\n"), @@ -68,7 +73,7 @@ namespace TAO_Notify Reconnection_Registry::unregister_callback (NotifyExt::ReconnectionRegistry::ReconnectionID id ACE_ENV_ARG_DECL) { - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Reconnect registry: unregistering %d\n"), @@ -109,7 +114,7 @@ namespace TAO_Notify iter.advance ()) { NVPList cattrs; - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Reconnect registry: saving %d\n"), @@ -150,7 +155,7 @@ namespace TAO_Notify { highest_id_ = id; - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Reconnect registry: reloading %d\n"), @@ -188,7 +193,7 @@ namespace TAO_Notify { ACE_TRY_NEW_ENV { - if (TAO_debug_level > 0) + if (DEBUG_LEVEL > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Reconnection Registry: Sending reconnection to client %d\n"), @@ -251,9 +256,6 @@ template class ACE_Hash_Map_Manager_Ex<NotifyExt::ReconnectionRegistry::Reconnec template class ACE_Hash_Map_Iterator_Base_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>; template class ACE_Hash_Map_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>; template class ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex>; -// vector of int's is instantiated elsewhere -//template class ACE_Vector<NotifyExt::ReconnectionRegistry::ReconnectionID>; -//template class ACE_Array_Base<NotifyExt::ReconnectionRegistry::ReconnectionID>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Hash_Map_Entry<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString > @@ -261,7 +263,5 @@ template class ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry: #pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex> #pragma instantiate ACE_Hash_Map_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex> #pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<NotifyExt::ReconnectionRegistry::ReconnectionID, ACE_CString ,ACE_Hash<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Equal_To<NotifyExt::ReconnectionRegistry::ReconnectionID>, ACE_Null_Mutex> -// vector of int's is instantiated elsewhere -//#pragma instantiate ACE_Vector<NotifyExt::ReconnectionRegistry::ReconnectionID> -//#pragma instantiate ACE_Array_Base<NotifyExt::ReconnectionRegistry::ReconnectionID> + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp index d64e96d8ecf..1f86e0699d9 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.cpp @@ -19,7 +19,9 @@ #include "Method_Request_Dispatch.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL #define QUEUE_ALLOWED 1 diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h index 9e57fde894a..92f21cb075a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip.h @@ -96,13 +96,6 @@ public: void dispatch (TAO_Notify_ProxySupplier * proxy_supplier, bool filter ACE_ENV_ARG_DECL); -#if 0 - /// \brief Forward delivery to a consumer via a proxy supplier - /// \param proxy_supplier the proxy supplier that will deliver the event - /// \param filter should consumer-based filtering be applied? - void forward (TAO_Notify_ProxySupplier* ps, bool filter); -#endif - ///////////////////////////////////////// /// \brief Wait until the event/routing_slip has /// been saved at least once. diff --git a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp index 93cde74a63e..10f316f4e7c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Routing_Slip_Queue.cpp @@ -10,7 +10,9 @@ #include "ace/Dynamic_Service.h" //#define DEBUG_LEVEL 9 -#define DEBUG_LEVEL TAO_debug_level +#ifndef DEBUG_LEVEL +# define DEBUG_LEVEL TAO_debug_level +#endif //DEBUG_LEVEL namespace TAO_Notify { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index 99116aa75a2..e21a8dae22a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -34,21 +34,38 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer () } void -TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL) +TAO_Notify_SequencePushConsumer::init ( + CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties +#if 1 + ACE_ENV_ARG_DECL_NOT_USED) +#else //1 + ACE_ENV_ARG_DECL) +#endif { - this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); + set_consumer (push_consumer); - this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); +#if 1 //// @@ TODO: use buffering strategy in TAO_Notify_Consumer??? + ACE_UNUSED_ARG ( admin_properties); +#else //1 -/* @@ TODO: use buffering strategy in TAO_Notify_Consumer??? ACE_NEW_THROW_EX (this->buffering_strategy_, TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties, this->max_batch_size_.value ()), CORBA::NO_MEMORY ()); -*/ +#endif // 1 } void +TAO_Notify_SequencePushConsumer::set_consumer ( + CosNotifyComm::SequencePushConsumer_ptr push_consumer) +{ + this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + +} + + +void TAO_Notify_SequencePushConsumer::release (void) { delete this; @@ -225,7 +242,10 @@ TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const void TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer - ACE_ENV_ARG_DECL) + ACE_ENV_ARG_DECL_NOT_USED) { - int todo_reconnect; + TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer); + ACE_ASSERT(tmp != 0); + this->set_consumer(tmp->push_consumer_.in()); + this->schedule_timer(false); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h index 7397661c224..e2c39d71b43 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h @@ -51,6 +51,8 @@ public: /// Init the Consumer void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL); + void set_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer); + /// TAO_Notify_Destroy_Callback methods. virtual void release (void); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp index ec38cd3203d..01bb530cf57 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp @@ -65,7 +65,11 @@ void TAO_Notify_StructuredPushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer ACE_ENV_ARG_DECL) { - int todo_reconnect; + TAO_Notify_StructuredPushConsumer* tmp = dynamic_cast<TAO_Notify_StructuredPushConsumer *> (old_consumer); + ACE_ASSERT(tmp != 0); + this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->schedule_timer(false); } bool diff --git a/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp b/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp index f03f1dfa917..3ac4643c225 100644 --- a/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp +++ b/TAO/orbsvcs/tests/Notify/Reconnecting/Consumer.cpp @@ -1070,11 +1070,11 @@ Consumer_Main::load_ids() { int field = 0; - char buffer[100]; + char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty ACE_OS::fgets (buffer, sizeof(buffer), idf); ACE_OS::fclose (idf); char * pb = buffer; - while (*pb != 0) + while (!ok && *pb != 0) { char * eb = ACE_OS::strchr (pb, ','); char * nb = eb + 1; @@ -1083,14 +1083,11 @@ Consumer_Main::load_ids() eb = pb + ACE_OS::strlen (pb); nb = eb; } - else - { - *eb = 0; - } + *eb = 0; if (pb < eb) { int value = ACE_OS::atoi(pb); - switch (field) + switch (++field) { case 1: this->mode_ = static_cast<Mode_T> (value); @@ -1291,7 +1288,7 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) if (this->verbose_) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) supplier: Connect to Existing event channel %d\n"), + ACE_TEXT ("(%P|%t) Consumer: Connect to Existing event channel %d\n"), ACE_static_cast (int, this->ec_id_) )); } @@ -1308,8 +1305,8 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) if (!ok) { - CosNotification::QoSProperties qosprops(7); - qosprops.length(7); + CosNotification::QoSProperties qosprops (7); + qosprops.length (7); CORBA::ULong i = 0; #ifdef DISABLE_PROPERTIES_TODO qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability); @@ -1329,7 +1326,7 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) #endif qosprops.length (i); CosNotification::AdminProperties adminprops(4); - adminprops.length(4); + adminprops.length (4); i = 0; #ifdef DISABLE_PROPERTIES_TODO adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength); @@ -1349,7 +1346,6 @@ Consumer_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) this->ec_id_ ACE_ENV_ARG_PARAMETER); ACE_CHECK; - ok = ! CORBA::is_nil (ec_.in ()); if (ok && this->verbose_) { @@ -1378,35 +1374,63 @@ void Consumer_Main::init_consumer_admin (ACE_ENV_SINGLE_ARG_DECL) { bool ok = false; - if (this->reconnecting_ && this->sa_id_ != default_admin_id) + if (this->reconnecting_) { - ACE_TRY_EX(ONE) + if (this->sa_id_ == default_admin_id) { - this->sa_ = this->ec_->get_consumeradmin( - this->sa_id_ - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK_EX(ONE); - ok = ! CORBA::is_nil (this->sa_.in ()); - if (ok && this->verbose_) + ACE_TRY_EX(TWO) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Consumer: Reconnect to consumer admin %d\n"), - ACE_static_cast (int, this->sa_id_) - )); + this->sa_ = this->ec_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK_EX(TWO); + ok = ! CORBA::is_nil (this->sa_.in ()); + this->sa_id_ = default_admin_id; + if (ok && this->verbose_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Using default consumer admin\n") + )); + } + else if (this->verbose_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: No default consumer admin\n") + )); + } } + ACE_CATCHALL + { + } + ACE_ENDTRY; } - ACE_CATCHALL + else // not default admin { + ACE_TRY_EX(ONE) + { + this->sa_ = this->ec_->get_consumeradmin( + this->sa_id_ + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(ONE); + ok = ! CORBA::is_nil (this->sa_.in ()); + if (ok && this->verbose_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Reconnect to consumer admin %d\n"), + ACE_static_cast (int, this->sa_id_) + )); + } + } + ACE_CATCHALL + { + } + ACE_ENDTRY; } - ACE_ENDTRY; } - - if (!ok) + else // !reconnecting { - ACE_TRY_EX(TWO) + ACE_TRY_EX(THREE) { this->sa_ = this->ec_->default_consumer_admin (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK_EX(TWO); + ACE_TRY_CHECK_EX(THREE); ok = ! CORBA::is_nil (this->sa_.in ()); this->sa_id_ = default_admin_id; if (ok && this->verbose_) @@ -1426,45 +1450,51 @@ Consumer_Main::init_consumer_admin (ACE_ENV_SINGLE_ARG_DECL) { } ACE_ENDTRY; - } - if (!ok) - { - this->sa_ = this->ec_->new_for_consumers( - CosNotifyChannelAdmin::OR_OP, - this->sa_id_ - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - ok = ! CORBA::is_nil (this->sa_.in ()); - -#ifdef TEST_SET_QOS - // temporary: be sure we can set qos properties here - if (ok) + if (!ok) { - CosNotification::QoSProperties qosprops(2); - CORBA::ULong i = 0; - qosprops.length(2); - - qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability); - qosprops[i++].value <<= CosNotification::Persistent; - qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability); - qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much - qosprops.length(i); - this->sa_->set_qos (qosprops ACE_ENV_ARG_PARAMETER); + this->sa_ = this->ec_->new_for_consumers( + CosNotifyChannelAdmin::OR_OP, + this->sa_id_ + ACE_ENV_ARG_PARAMETER); ACE_CHECK; - } + ok = ! CORBA::is_nil (this->sa_.in ()); + +#ifdef TEST_SET_QOS + // temporary: be sure we can set qos properties here + if (ok) + { + CosNotification::QoSProperties qosprops(2); + CORBA::ULong i = 0; + qosprops.length(2); + + qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability); + qosprops[i++].value <<= CosNotification::Persistent; + qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability); + qosprops[i++].value <<= CosNotification::Persistent; // Required, or we won't persist much + qosprops.length(i); + this->sa_->set_qos (qosprops ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } #endif - if (ok && this->verbose_) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Consumer: Create new consumer admin %d\n"), - ACE_static_cast (int, this->sa_id_) - )); + if (ok && this->verbose_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Create new consumer admin %d\n"), + ACE_static_cast (int, this->sa_id_) + )); + } } } + if (!ok) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: connect to consumer admin failed %d\n"), + ACE_static_cast (int, this->sa_id_) + )); + } } - void Consumer_Main::init_structured_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) { @@ -1505,7 +1535,7 @@ Consumer_Main::init_structured_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) if (ok && this->verbose_) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Consumer: Create new proxy %d\n"), + ACE_TEXT ("(%P|%t) Consumer: Create new structured proxy %d\n"), ACE_static_cast (int, this->structured_proxy_id_) )); } @@ -1601,7 +1631,7 @@ Consumer_Main::init_sequence_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) if (ok && this->verbose_) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Consumer: Create new proxy %d\n"), + ACE_TEXT ("(%P|%t) Consumer: Create new sequence proxy %d\n"), ACE_static_cast (int, this->sequence_proxy_id_) )); } @@ -1692,9 +1722,28 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) ACE_static_cast (int, this->any_proxy_id_) )); } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d returned nil\n"), + ACE_static_cast (int, this->any_proxy_id_) + )); + } + } + ACE_CATCHANY + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d threw exception\n"), + ACE_static_cast (int, this->any_proxy_id_) + )); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, ACE_TEXT ("To wit:")); } ACE_CATCHALL { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Consumer: Get proxy supplier %d threw exception\n"), + ACE_static_cast (int, this->any_proxy_id_) + )); } ACE_ENDTRY; } @@ -1708,18 +1757,10 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) ACE_CHECK; ok = ! CORBA::is_nil (proxy.in ()); -#ifdef TEST_SET_QOS - // temporary - if (ok) - { - set_proxy_qos (proxy.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } -#endif // TEST_SET_QOS if (ok && this->verbose_) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Consumer: Create new proxy\n"), + ACE_TEXT ("(%P|%t) Consumer: Create new Any proxy %d\n"), ACE_static_cast (int, this->any_proxy_id_) )); } @@ -1732,7 +1773,7 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Consumer: Received wrong type of push supplier proxy %d\n"), - ACE_static_cast (int, this->sequence_proxy_id_) + ACE_static_cast (int, this->any_proxy_id_) )); ACE_THROW (CORBA::BAD_PARAM()); } @@ -1764,6 +1805,7 @@ Consumer_Main::init_any_proxy_supplier (ACE_ENV_SINGLE_ARG_DECL) this->any_push_consumer_ref_.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK; + this->any_push_consumer_.set_connected(true); } diff --git a/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp b/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp index 5c0f2757a5b..88bfd7cdb3a 100644 --- a/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp +++ b/TAO/orbsvcs/tests/Notify/Reconnecting/Supplier.cpp @@ -452,13 +452,13 @@ Supplier_Main::save_ids() int imode = ACE_static_cast (int, this->mode_); ACE_OS::fprintf (idf, "%d,%d,%d,%d,%d,%d,%d,\n", - static_cast <int> (imode), - static_cast <int> (ec_id_), - static_cast <int> (sa_id_), - static_cast <int> (structured_proxy_id_), - static_cast <int> (sequence_proxy_id_), - static_cast <int> (any_proxy_id_), - static_cast <int> (endflag) ); + static_cast<int> (imode), + static_cast<int> (ec_id_), + static_cast<int> (sa_id_), + static_cast<int> (structured_proxy_id_), + static_cast<int> (sequence_proxy_id_), + static_cast<int> (any_proxy_id_), + static_cast<int> (endflag) ); ACE_OS::fclose (idf); } } @@ -474,14 +474,12 @@ Supplier_Main::load_ids() { int field = 0; - char buffer[100] = ""; + char buffer[100] = ""; // because ACE fgets doesn't put a null if the file is empty ACE_OS::fgets (buffer, sizeof(buffer), idf); ACE_OS::fclose (idf); - char * pb = buffer; while (!ok && *pb != 0) { -fprintf (stderr, "Buffer: %s\n" , pb); char * eb = ACE_OS::strchr (pb, ','); char * nb = eb + 1; if (eb == 0) @@ -490,11 +488,9 @@ fprintf (stderr, "Buffer: %s\n" , pb); nb = eb; } *eb = 0; -fprintf (stderr, "PARSE: [%s]\n", pb); if (pb < eb) { int value = ACE_OS::atoi(pb); -fprintf (stderr, "field[%d], %d\n", field, value); switch (++field) { case 1: @@ -657,6 +653,8 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) ACE_ENDTRY; } + // if we don't have a channel yet, and a channel id file was specified + // try to read from it if (!ok && this->channel_file_.length () > 0) { FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "r"); @@ -675,12 +673,17 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK_EX (unique_label_1) ok = ! CORBA::is_nil (this->ec_.in ()); - if (ok && this->verbose_) + if (ok) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) supplier: Connect to Consumer's event channel %d\n"), - ACE_static_cast (int, this->ec_id_) - )); + if (this->verbose_) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Supplier: Connect to Existing event channel %d\n"), + ACE_static_cast (int, this->ec_id_) + )); + } + // kill the channel filename so we don't overwrite the file + this->channel_file_ = ""; } } ACE_CATCHALL @@ -692,11 +695,10 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) if (!ok) { - CosNotification::QoSProperties qosprops(7); - qosprops.length(7); - + CosNotification::QoSProperties qosprops (7); + qosprops.length (7); CORBA::ULong i = 0; -#ifdef DISABLE_PROPERITIES_TODO +#ifdef DISABLE_PROPERTIES_TODO qosprops[i].name = CORBA::string_dup(CosNotification::EventReliability); qosprops[i++].value <<= CosNotification::Persistent; qosprops[i].name = CORBA::string_dup(CosNotification::ConnectionReliability); @@ -712,10 +714,9 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) qosprops[i].name = CORBA::string_dup(CosNotification::PacingInterval); qosprops[i++].value <<= (TimeBase::TimeT) 50 * 10000; // 50ms #endif - qosprops.length(i); - + qosprops.length (i); CosNotification::AdminProperties adminprops(4); - adminprops.length(4); + adminprops.length (4); i = 0; #ifdef DISABLE_PROPERTIES_TODO adminprops[i].name = CORBA::string_dup(CosNotification::MaxQueueLength); @@ -740,10 +741,21 @@ Supplier_Main::init_event_channel (ACE_ENV_SINGLE_ARG_DECL) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Supplier: Create event channel %d\n"), - ACE_static_cast (int, this->ec_id_) + static_cast<int> (this->ec_id_) )); } } + + // save channel id + if (ok && this->channel_file_.length() > 0) + { + FILE * chf = ACE_OS::fopen (this->channel_file_.c_str (), "w"); + if (chf != 0) + { + fprintf (chf, "%d\n", static_cast<int> (this->ec_id_)); + fclose (chf); + } + } } CosNotifyChannelAdmin::AdminID default_admin_id = ACE_static_cast (CosNotifyChannelAdmin::AdminID, -1); @@ -1033,7 +1045,7 @@ Supplier_Main::init_any_proxy_consumer (ACE_ENV_SINGLE_ARG_DECL) if (ok && this->verbose_) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) Supplier: Create new proxy\n"), + ACE_TEXT ("(%P|%t) Supplier: Create new proxy %d\n"), ACE_static_cast (int, this->any_proxy_id_) )); } @@ -1083,9 +1095,6 @@ Supplier_Main::init_any_proxy_consumer (ACE_ENV_SINGLE_ARG_DECL) int Supplier_Main::fini (ACE_ENV_SINGLE_ARG_DECL) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) supplier::fini\n") - )); if (this->disconnect_on_exit_) { this->reconnection_callback_.fini (ACE_ENV_SINGLE_ARG_PARAMETER); |