diff options
author | Chris Cleeland <chris.cleeland@gmail.com> | 2006-11-28 22:35:28 +0000 |
---|---|---|
committer | Chris Cleeland <chris.cleeland@gmail.com> | 2006-11-28 22:35:28 +0000 |
commit | 5555edd071f2d62876144e6a2f7b730266e228a8 (patch) | |
tree | a34ff8c76795ae0f0216162ef0ffbdcf670bb229 | |
parent | a86ffac9119fde13cad3fa8c2f921d9bc4aa0501 (diff) | |
download | ATCD-5555edd071f2d62876144e6a2f7b730266e228a8.tar.gz |
Mon Nov 27 20:46:57 UTC 2006 Chris Cleeland <cleeland_c@ociweb.com>
Merge back of changes related to RT 8881 and 8449 to branch from DOC.
18 files changed, 423 insertions, 48 deletions
diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp index e7d95c4dd99..6ca69d871c5 100644 --- a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp +++ b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp @@ -24,6 +24,7 @@ TAO_Notify_Service_Driver::TAO_Notify_Service_Driver (void) , notify_channel_name_ (NOTIFY_CHANNEL_NAME) , register_event_channel_ (0) , nthreads_ (1) +, separate_dispatching_orb_ (false) { // No-Op. } @@ -73,6 +74,22 @@ TAO_Notify_Service_Driver::init_ORB (int& argc, ACE_TCHAR *argv [] } int +TAO_Notify_Service_Driver::init_dispatching_ORB (int& argc, ACE_TCHAR *argv [] + ACE_ENV_ARG_DECL) +{ + // Copy command line parameter. + ACE_Argv_Type_Converter command_line(argc, argv); + + this->dispatching_orb_ = CORBA::ORB_init (command_line.get_argc(), + command_line.get_ASCII_argv(), + "dispatcher" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + return 0; +} + +int TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[] ACE_ENV_ARG_DECL) { @@ -97,8 +114,22 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[] return -1; } - this->notify_service_->init_service (this->orb_.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); + if (this->separate_dispatching_orb_) + { + if (this->init_dispatching_ORB (argc, argv + ACE_ENV_ARG_PARAMETER) != 0) + { + return -1; + } + + this->notify_service_->init_service2 (this->orb_.in (), this->dispatching_orb_.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + } + else + { + this->notify_service_->init_service (this->orb_.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + } if (this->nthreads_ > 0) // we have chosen to run in a thread pool. { @@ -303,6 +334,12 @@ TAO_Notify_Service_Driver::shutdown (ACE_ENV_SINGLE_ARG_DECL) // shutdown the ORB. if (!CORBA::is_nil (this->orb_.in ())) this->orb_->shutdown (); + + /// Release all the _vars as the ORB is gone now. + notify_factory_._retn (); + orb_._retn (); + poa_._retn (); + naming_._retn (); } int @@ -318,6 +355,12 @@ TAO_Notify_Service_Driver::parse_args (int &argc, ACE_TCHAR *argv[]) this->notify_factory_name_.set (ACE_TEXT_ALWAYS_CHAR(current_arg)); arg_shifter.consume_arg (); } + else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0) + { + ACE_DEBUG((LM_DEBUG, "Using separate dispatching ORB\n")); + this->separate_dispatching_orb_ = true; + arg_shifter.consume_arg (); + } else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Boot")) == 0) { this->bootstrap_ = 1; diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.h b/TAO/orbsvcs/Notify_Service/Notify_Service.h index a32e6747b2a..ddf7203b146 100644 --- a/TAO/orbsvcs/Notify_Service/Notify_Service.h +++ b/TAO/orbsvcs/Notify_Service/Notify_Service.h @@ -90,6 +90,9 @@ protected: int init_ORB (int& argc, ACE_TCHAR *argv [] ACE_ENV_ARG_DECL); // initialize the ORB. + int init_dispatching_ORB (int& argc, ACE_TCHAR *argv [] + ACE_ENV_ARG_DECL); + // initialize the dispatching ORB. TAO_Notify_Service* notify_service_; @@ -129,6 +132,9 @@ protected: CORBA::ORB_var orb_; // The ORB that we use. + CORBA::ORB_var dispatching_orb_; + // separate dispatching orb if needed. + PortableServer::POA_var poa_; // Reference to the root poa. @@ -140,6 +146,9 @@ protected: int nthreads_; // Number of worker threads. + + bool separate_dispatching_orb_; + // indicate that a separate ORB is used for dispatching events. }; #include /**/ "ace/post.h" diff --git a/TAO/orbsvcs/Notify_Service/README b/TAO/orbsvcs/Notify_Service/README index b83e208ac12..3dc06134774 100644 --- a/TAO/orbsvcs/Notify_Service/README +++ b/TAO/orbsvcs/Notify_Service/README @@ -46,9 +46,14 @@ Command line arguments: Naming Service. The default is "NotifyEventChannel". -"-ORBRunThreads" : Number of threads to run the +"-ORBRunThreads nthreads" : Number of threads to run the ORB::run method. +"-UseSeparateDispatchingORB 1|0" + : Indicates whether the service should create and + and use a separate ORB dedicated to dispatching of + events. + !! The -Notify_TPReactor option is deprecated!! use the -ORBRunThreads option instead. @@ -85,7 +90,7 @@ if you are using the "-NameSvc" options. $TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o naming.ior - and the CosEvent_Service as + and the Notify_Service as $ Notify_Service -ORBInitRef NameService=file://naming.ior @@ -113,7 +118,8 @@ The svc.conf options: The "Notify_Default_Event_Manager_Objects_Factory" service object accepts the following options: -"-DispatchingThreads [thread_count]" : How many threads for MT dispatching. +"-DispatchingThreads [thread_count]" : Enables MT dispatching with the specified number + of threads. "-ListenerThreads" : How many threads for listener filter evaluation. diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp index 8048c1dcaa5..4e769fedd26 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp @@ -7,6 +7,7 @@ ACE_RCSID (Notify, "$Id$") #include "ace/Bound_Ptr.h" +#include "tao/Stub.h" // For debug messages printing out ORBid. #include "orbsvcs/CosEventCommC.h" #include "orbsvcs/Notify/Event.h" #include "orbsvcs/Notify/Properties.h" @@ -35,14 +36,60 @@ TAO_Notify_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer ACE_THROW (CORBA::BAD_PARAM()); } - this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer); - ACE_TRY + { + if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ()) + { + this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer); + + this->publish_ = + CosNotifyComm::NotifyPublish::_narrow (push_consumer ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + { + // "Port" consumer's object reference from receiving ORB to dispatching ORB. + CORBA::String_var temp = + TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer); + + CORBA::Object_var obj = + TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in()); + + CosEventComm::PushConsumer_var new_cos_comm_pc = + CosEventComm::PushConsumer::_unchecked_narrow(obj.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + this->push_consumer_ = + CosEventComm::PushConsumer::_duplicate (new_cos_comm_pc.in()); + + // + // Note that here we do an _unchecked_narrow() in order to avoid + // making a call on the consumer b/c the consumer may not have activated + // its POA just yet. That means that before we use this reference the first + // time, we'll actually need to call _is_a() on it, i.e., the equivalent + // of an _narrow(). At the time of this writing, the only use of + // this->publish_ is in TAO_NS_Consumer::dispatch_updates_i (the superclass). + // If any other use is made of this data member, then the code to validate + // the actual type of the target object must be refactored. + this->publish_ = + CosNotifyComm::NotifyPublish::_unchecked_narrow (obj.in() + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push init dispatching ORB id is %s.\n", + obj->_stubobj()->orb_core()->orbid())); + } + //--cj end + } + } + ACE_CATCH (CORBA::TRANSIENT, ex) { - this->publish_ = - CosNotifyComm::NotifyPublish::_narrow (push_consumer - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_PushConsumer::init"); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_PushConsumer %@\n", this)); } ACE_CATCHANY { @@ -61,6 +108,13 @@ TAO_Notify_PushConsumer::release (void) void TAO_Notify_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL) { + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push dispatching ORB id is %s.\n", + this->push_consumer_->_stubobj()->orb_core()->orbid())); + } + //--cj end + this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp index 7b19b418ebc..96a7d9ef8a7 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp @@ -31,6 +31,7 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy) : proxy_ (proxy) , is_suspended_ (0) +, have_not_yet_verified_publish_ (true) , pacing_ (proxy->qos_properties_.pacing_interval ()) , max_batch_size_ (CosNotification::MaximumBatchSize, 0) , timer_id_ (-1) @@ -683,7 +684,14 @@ void TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) { - if (!CORBA::is_nil (this->publish_.in ())) + if (this->have_not_yet_verified_publish_) + { + this->have_not_yet_verified_publish_ = false; // no need to check again + if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0" + ACE_ENV_ARG_PARAMETER)) + this->publish_ = CosNotifyComm::NotifyPublish::_nil(); + } + if (! CORBA::is_nil (this->publish_.in ())) this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h index 43b591b51e4..89058b6492e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h @@ -165,6 +165,7 @@ protected: /// Interface that accepts offer_changes CosNotifyComm::NotifyPublish_var publish_; + bool have_not_yet_verified_publish_; /// The Pacing Interval const TAO_Notify_Property_Time & pacing_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp index 2128c1852a4..c71180e05de 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp @@ -110,6 +110,12 @@ TAO_CosNotify_Service::init (int argc, ACE_TCHAR *argv[]) task_per_proxy = 1; arg_shifter.consume_arg (); } + else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0) + { + properties->separate_dispatching_orb (true); + ACE_DEBUG((LM_DEBUG, ACE_TEXT("Using separate Dispatching ORB. \n"))); + arg_shifter.consume_arg (); + } else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllowReconnect")) == 0) { arg_shifter.consume_arg (); @@ -198,7 +204,36 @@ TAO_CosNotify_Service::init_service (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL) { ACE_DEBUG ((LM_DEBUG, "Loading the Cos Notification Service...\n")); - this->init_i (orb ACE_ENV_ARG_PARAMETER); + if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb()) + { + // got here by way of svc.conf. no second orb supplied so create one + if (NULL == TAO_Notify_PROPERTIES::instance()->dispatching_orb()) + { + ACE_DEBUG ((LM_DEBUG, "No dispatching orb supplied. Creating default one.\n")); + + int argc = 0; + char *argv0 = 0; + char **argv = &argv0; // ansi requires argv be null terminated. + CORBA::ORB_var dispatcher = CORBA::ORB_init (argc, argv, + "default_dispatcher" ACE_ENV_ARG_PARAMETER); + //ACE_CHECK_RETURN (-1); + + TAO_Notify_PROPERTIES::instance()->dispatching_orb(dispatcher.in()); + } + + this->init_i2 (orb, TAO_Notify_PROPERTIES::instance()->dispatching_orb() ACE_ENV_ARG_PARAMETER); + + } + else + { + this->init_i (orb ACE_ENV_ARG_PARAMETER); + } +} + +void +TAO_CosNotify_Service::init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL) +{ + this->init_i2 (orb, dispatching_orb ACE_ENV_ARG_PARAMETER); ACE_CHECK; } @@ -217,22 +252,57 @@ TAO_CosNotify_Service::init_i (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL) PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK; - /// Set the properties - TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance(); + // Set the properties + TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance(); + + properties->orb (orb); + properties->default_poa (default_poa.in ()); + + // Init the factory + this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER)); + ACE_CHECK; + ACE_ASSERT( this->factory_.get() != 0 ); + TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get()); + + this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER)); + ACE_CHECK; + ACE_ASSERT( this->builder_.get() != 0 ); + TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get()); +} + +void +TAO_CosNotify_Service::init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL) +{ + // Obtain the Root POA + CORBA::Object_var object = + orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (CORBA::is_nil (object.in ())) + ACE_ERROR ((LM_ERROR, " (%P|%t) Unable to resolve the RootPOA.\n")); + + PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Set the properties + TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance(); - properties->orb (orb); - properties->default_poa (default_poa.in ()); + properties->orb (orb); + properties->dispatching_orb (dispatching_orb); + properties->separate_dispatching_orb (true); - // Init the factory - this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER)); - ACE_CHECK; - ACE_ASSERT( this->factory_.get() != 0 ); - TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get()); + properties->default_poa (default_poa.in ()); - this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER)); - ACE_CHECK; - ACE_ASSERT( this->builder_.get() != 0 ); - TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get()); + // Init the factory and builder + this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER)); + ACE_CHECK; + ACE_ASSERT( this->factory_.get() != 0 ); + TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get()); + + this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER)); + ACE_CHECK; + ACE_ASSERT( this->builder_.get() != 0 ); + TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get()); } TAO_Notify_Factory* diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h index cc91e77d372..c3e2e8ebd47 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h +++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h @@ -51,6 +51,9 @@ public: virtual int fini (void); + /// separate dispatching orb Init + virtual void init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL); + /// Create the Channel Factory. virtual CosNotifyChannelAdmin::EventChannelFactory_ptr create (PortableServer::POA_ptr default_POA ACE_ENV_ARG_DECL); @@ -60,6 +63,8 @@ public: protected: /// Init the data members virtual void init_i (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL); + /// Init the data members separate dispatching orb + virtual void init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL); private: diff --git a/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp b/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp index 0137934dd22..8d781c1929e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp @@ -87,7 +87,13 @@ TAO_Notify_POA_Helper::create_i (PortableServer::POA_ptr parent_poa, const char* ACE_CHECK; if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, "Created POA : %s\n", this->poa_->the_name ())); + { + CORBA::String_var the_name = this->poa_->the_name ( + ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + ACE_DEBUG ((LM_DEBUG, "Created POA : %s\n", the_name.in ())); + } + /* // Destroy the policies for (CORBA::ULong index = 0; index < policy_list.length (); ++index) @@ -134,7 +140,13 @@ TAO_Notify_POA_Helper::activate (PortableServer::Servant servant, CORBA::Long& i id = this->id_factory_.id (); if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, "Activating object with id = %d in POA : %s\n", id, this->poa_->the_name ())); + { + CORBA::String_var the_name = this->poa_->the_name ( + ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (0); + + ACE_DEBUG ((LM_DEBUG, "Activating object with id = %d in POA : %s\n", id, the_name.in ())); + } // Convert CORBA::Long to ObjectId PortableServer::ObjectId_var oid = @@ -154,7 +166,12 @@ CORBA::Object_ptr TAO_Notify_POA_Helper::activate_with_id (PortableServer::Servant servant, CORBA::Long id ACE_ENV_ARG_DECL) { if (DEBUG_LEVEL > 0) - ACE_DEBUG ((LM_DEBUG, "Activating object with existing id = %d in POA : %s\n", id, this->poa_->the_name ())); + { + CORBA::String_var the_name = this->poa_->the_name ( + ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (0); + ACE_DEBUG ((LM_DEBUG, "Activating object with existing id = %d in POA : %s\n", id, the_name.in ())); + } this->id_factory_.set_last_used (id); // Convert CORBA::Long to ObjectId diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp b/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp index 798f24fb7f1..cf63ccb0be1 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp @@ -17,8 +17,11 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Notify_Properties::TAO_Notify_Properties (void) : factory_ (0) , builder_ (0) + , orb_(0) + , dispatching_orb_ (0) , asynch_updates_ (0) , allow_reconnect_ (false) + , separate_dispatching_orb_ (false) , updates_ (1) { // In case no conf. file is specified, the EC will default to reactive concurrency. @@ -37,8 +40,13 @@ TAO_Notify_Properties::~TAO_Notify_Properties () { } -#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION) -template class TAO_Singleton<TAO_Notify_Properties, ACE_Thread_Mutex> *TAO_Singleton<TAO_Notify_Properties, ACE_Thread_Mutex>::singleton_; -#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */ +TAO_Notify_Properties * +TAO_Notify_Properties::instance (void) +{ + // Hide the template instantiation to prevent multiple instances + // from being created. + return + TAO_Singleton<TAO_Notify_Properties, TAO_SYNCH_MUTEX>::instance (); +} TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.h b/TAO/orbsvcs/orbsvcs/Notify/Properties.h index 7ab3ec74637..98e38df5705 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Properties.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.h @@ -46,6 +46,9 @@ public: /// Destructor ~TAO_Notify_Properties (); + /// Return a singleton instance of this class. + static TAO_Notify_Properties * instance (void); + // = Property Accessors TAO_Notify_Factory* factory (void); void factory (TAO_Notify_Factory* factory); @@ -55,6 +58,8 @@ public: CORBA::ORB_ptr orb (void); void orb (CORBA::ORB_ptr orb); + CORBA::ORB_ptr dispatching_orb (void); + void dispatching_orb (CORBA::ORB_ptr dispatching_orb); PortableServer::POA_ptr default_poa (void); void default_poa (PortableServer::POA_ptr default_poa); @@ -68,6 +73,8 @@ public: // Turn on/off update messages. CORBA::Boolean updates (void); void updates (CORBA::Boolean updates); + bool separate_dispatching_orb (void); + void separate_dispatching_orb (bool b); // The QoS Property that must be applied to each newly created Event Channel const CosNotification::QoSProperties& default_event_channel_qos_properties (void); @@ -109,6 +116,9 @@ protected: /// ORB CORBA::ORB_var orb_; + /// dispatching orb + CORBA::ORB_var dispatching_orb_; + // POA PortableServer::POA_var default_poa_; @@ -118,6 +128,9 @@ protected: /// True if clients can reconnect to proxies. bool allow_reconnect_; + /// True is separate dispatching orb + bool separate_dispatching_orb_; + /// True if updates are enabled (default). CORBA::Boolean updates_; @@ -140,9 +153,10 @@ protected: CosNotification::QoSProperties pc_qos_; }; -TAO_NOTIFY_SERV_SINGLETON_DECLARE (TAO_Singleton, TAO_Notify_Properties, TAO_SYNCH_MUTEX) - -typedef TAO_Singleton<TAO_Notify_Properties, TAO_SYNCH_MUTEX> TAO_Notify_PROPERTIES; +/** + * @todo Remove this legacy TAO_Notify_Properties typedef. + */ +typedef TAO_Notify_Properties TAO_Notify_PROPERTIES; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.inl b/TAO/orbsvcs/orbsvcs/Notify/Properties.inl index c53e7087f8e..e48c65afbed 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Properties.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.inl @@ -34,12 +34,24 @@ TAO_Notify_Properties::orb (void) return CORBA::ORB::_duplicate (orb_.in ()); } +ACE_INLINE CORBA::ORB_ptr +TAO_Notify_Properties::dispatching_orb (void) +{ + return CORBA::ORB::_duplicate (dispatching_orb_.in ()); +} + ACE_INLINE void TAO_Notify_Properties::orb (CORBA::ORB_ptr orb) { orb_ = CORBA::ORB::_duplicate (orb); } +ACE_INLINE void +TAO_Notify_Properties::dispatching_orb (CORBA::ORB_ptr dispatching_orb) +{ + dispatching_orb_ = CORBA::ORB::_duplicate (dispatching_orb); +} + ACE_INLINE PortableServer::POA_ptr TAO_Notify_Properties::default_poa (void) { @@ -76,6 +88,18 @@ TAO_Notify_Properties::allow_reconnect (bool b) this->allow_reconnect_ = b; } +ACE_INLINE bool +TAO_Notify_Properties::separate_dispatching_orb (void) +{ + return this->separate_dispatching_orb_; +} + +ACE_INLINE void +TAO_Notify_Properties::separate_dispatching_orb (bool b) +{ + this->separate_dispatching_orb_ = b; +} + ACE_INLINE CORBA::Boolean TAO_Notify_Properties::updates (void) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp index 6a5aff084e8..20131d3e299 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp @@ -1,5 +1,6 @@ // $Id$ +#include "tao/TAO_Singleton.h" #include "orbsvcs/Notify/RT_Properties.h" #if ! defined (__ACE_INLINE__) @@ -19,4 +20,14 @@ TAO_Notify_RT_Properties::~TAO_Notify_RT_Properties () { } +TAO_Notify_RT_Properties * +TAO_Notify_RT_Properties::instance (void) +{ + // Hide the template instantiation to prevent multiple instances + // from being created. + + return + TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX>::instance (); +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h index 124f8757145..0dadf2b1533 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h +++ b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h @@ -18,7 +18,6 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "tao/TAO_Singleton.h" #include "tao/RTCORBA/RTCORBA.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -31,14 +30,14 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL */ class TAO_RT_Notify_Export TAO_Notify_RT_Properties { - friend class TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX>; - public: /// Constuctor TAO_Notify_RT_Properties (void); /// Destructor ~TAO_Notify_RT_Properties (); + /// Return singleton instance of this class. + static TAO_Notify_RT_Properties * instance (void); RTCORBA::RTORB_ptr rt_orb (void); void rt_orb (RTCORBA::RTORB_ptr rt_orb); @@ -54,9 +53,10 @@ protected: RTCORBA::Current_var current_; }; -TAO_RT_NOTIFY_SINGLETON_DECLARE (TAO_Singleton, TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX) - -typedef TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX> TAO_Notify_RT_PROPERTIES; +/** + * @todo Remove this legacy TAO_Notify_RT_Properties typedef. + */ +typedef TAO_Notify_RT_Properties TAO_Notify_RT_PROPERTIES; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index 19f972cd0cc..a221f8cde94 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -6,6 +6,7 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$") #include "ace/Reactor.h" #include "tao/debug.h" +#include "tao/Stub.h" // For debug messages printing out ORBid. #include "orbsvcs/Notify/QoSProperties.h" #include "orbsvcs/Notify/ProxySupplier.h" #include "orbsvcs/Notify/Worker_Task.h" @@ -42,8 +43,49 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p ACE_THROW (CORBA::BAD_PARAM()); } - this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); - this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ()) + { + this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + } + else + { + // "Port" consumer's object reference from receiving ORB to dispatching ORB. + CORBA::String_var temp = + TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer); + + CORBA::Object_var obj = + TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in()); + + ACE_TRY + { + CosNotifyComm::SequencePushConsumer_var new_push_consumer = + CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in()); + ACE_TRY_CHECK; + + this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer); + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer); + + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Sequence push init dispatching ORB id is %s.\n", + obj->_stubobj()->orb_core()->orbid())); + } + //--cj end + } + ACE_CATCH (CORBA::TRANSIENT, ex) + { + ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_SequencePushConsumer::init"); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this)); + } + ACE_CATCHANY + { + // _narrow failed + } + ACE_ENDTRY; + } } void @@ -259,6 +301,13 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& / void TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL) { + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n", + this->push_consumer_->_stubobj()->orb_core()->orbid())); + } + //--cj end + this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); ACE_CHECK; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp index feb1b56c29c..c9fc25077b6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp @@ -3,9 +3,10 @@ ACE_RCSID(RT_Notify, TAO_Notify_StructuredPushConsumer, "$Id$") +#include "ace/Bound_Ptr.h" +#include "tao/Stub.h" // For debug messages printing out ORBid. #include "orbsvcs/Notify/Properties.h" #include "orbsvcs/Notify/Event.h" -#include "ace/Bound_Ptr.h" TAO_BEGIN_VERSIONED_NAMESPACE_DECL @@ -29,10 +30,48 @@ TAO_Notify_StructuredPushConsumer::init (CosNotifyComm::StructuredPushConsumer_p ACE_THROW (CORBA::BAD_PARAM()); } - this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (push_consumer); - - this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); - + if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ()) + { + this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (push_consumer); + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); + } + else + { + // "Port" consumer's object reference from receiving ORB to dispatching ORB. + CORBA::String_var temp = + TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer); + + CORBA::Object_var obj = + TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in()); + + ACE_TRY + { + CosNotifyComm::StructuredPushConsumer_var new_push_consumer = + CosNotifyComm::StructuredPushConsumer::_unchecked_narrow(obj.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (new_push_consumer.in()); + this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in()); + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Structured push init dispatching ORB id is %s.\n", + obj->_stubobj()->orb_core()->orbid())); + } + //--cj end + } + ACE_CATCH (CORBA::TRANSIENT, ex) + { + ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_StructuredPushConsumer::init"); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_StructuredPushConsumer %@\n", this)); + } + ACE_CATCHANY + { + // _narrow failed + } + ACE_ENDTRY; + } } void @@ -55,7 +94,15 @@ TAO_Notify_StructuredPushConsumer::push (const CORBA::Any& event ACE_ENV_ARG_DEC void TAO_Notify_StructuredPushConsumer::push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) { + //--cj verify dispatching ORB + if (TAO_debug_level >= 10) { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Structured push dispatching ORB id is %s.\n", + this->push_consumer_->_stubobj()->orb_core()->orbid())); + } + //--cj end + this->push_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER); + ACE_CHECK; } /// Push a batch of events to this consumer. diff --git a/TAO/orbsvcs/tests/Notify/Basic/run_test.pl b/TAO/orbsvcs/tests/Notify/Basic/run_test.pl index 23c71d27f1a..18fccbc0466 100755 --- a/TAO/orbsvcs/tests/Notify/Basic/run_test.pl +++ b/TAO/orbsvcs/tests/Notify/Basic/run_test.pl @@ -91,11 +91,15 @@ if (PerlACE::waitforfile_timed ($namingior, $startup_timeout) == -1) { exit 1; } +for $dispatch_opt ("", "-UseSeparateDispatchingOrb 1") +{ + for $config (@test_configs) { print STDERR "\nTesting Notification Service with config file = $config ....\n\n"; $Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service", + ' '.$dispatch_opt.' '. "-ORBInitRef NameService=file://$namingior " . "-IORoutput $notifyior " . "-ORBSvcConf $config"); @@ -143,6 +147,7 @@ for $config (@test_configs) $Notification->Kill (); } +} $Naming->Kill (); diff --git a/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl b/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl index 8a181a0712b..0a37bf4613b 100755 --- a/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl +++ b/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl @@ -87,11 +87,14 @@ if (PerlACE::waitforfile_timed ($namingior, $startup_timeout) == -1) { exit 1; } +for $dispatch_opt ("", "-UseSeparateDispatchingOrb 1") +{ for $config (@test_configs) { print STDERR "\nTesting Notification Service with config file = $config ....\n\n"; $Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service", + ' '.$dispatch_opt.' '. "-ORBInitRef NameService=file://$namingior " . "-IORoutput $notifyior " . "-ORBSvcConf $config " . @@ -136,6 +139,7 @@ for $config (@test_configs) $Notification->Kill (); } +} $Naming->Kill (); |