summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Cleeland <chris.cleeland@gmail.com>2006-11-28 22:35:28 +0000
committerChris Cleeland <chris.cleeland@gmail.com>2006-11-28 22:35:28 +0000
commit5555edd071f2d62876144e6a2f7b730266e228a8 (patch)
treea34ff8c76795ae0f0216162ef0ffbdcf670bb229
parenta86ffac9119fde13cad3fa8c2f921d9bc4aa0501 (diff)
downloadATCD-5555edd071f2d62876144e6a2f7b730266e228a8.tar.gz
Mon Nov 27 20:46:57 UTC 2006 Chris Cleeland <cleeland_c@ociweb.com>
Merge back of changes related to RT 8881 and 8449 to branch from DOC.
-rw-r--r--TAO/orbsvcs/Notify_Service/Notify_Service.cpp47
-rw-r--r--TAO/orbsvcs/Notify_Service/Notify_Service.h9
-rw-r--r--TAO/orbsvcs/Notify_Service/README12
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp66
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp10
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Consumer.h1
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp98
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp23
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Properties.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Properties.h20
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Properties.inl24
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp11
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h12
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp53
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp57
-rwxr-xr-xTAO/orbsvcs/tests/Notify/Basic/run_test.pl5
-rwxr-xr-xTAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl4
18 files changed, 423 insertions, 48 deletions
diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
index e7d95c4dd99..6ca69d871c5 100644
--- a/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
+++ b/TAO/orbsvcs/Notify_Service/Notify_Service.cpp
@@ -24,6 +24,7 @@ TAO_Notify_Service_Driver::TAO_Notify_Service_Driver (void)
, notify_channel_name_ (NOTIFY_CHANNEL_NAME)
, register_event_channel_ (0)
, nthreads_ (1)
+, separate_dispatching_orb_ (false)
{
// No-Op.
}
@@ -73,6 +74,22 @@ TAO_Notify_Service_Driver::init_ORB (int& argc, ACE_TCHAR *argv []
}
int
+TAO_Notify_Service_Driver::init_dispatching_ORB (int& argc, ACE_TCHAR *argv []
+ ACE_ENV_ARG_DECL)
+{
+ // Copy command line parameter.
+ ACE_Argv_Type_Converter command_line(argc, argv);
+
+ this->dispatching_orb_ = CORBA::ORB_init (command_line.get_argc(),
+ command_line.get_ASCII_argv(),
+ "dispatcher"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+int
TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[]
ACE_ENV_ARG_DECL)
{
@@ -97,8 +114,22 @@ TAO_Notify_Service_Driver::init (int argc, ACE_TCHAR *argv[]
return -1;
}
- this->notify_service_->init_service (this->orb_.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
+ if (this->separate_dispatching_orb_)
+ {
+ if (this->init_dispatching_ORB (argc, argv
+ ACE_ENV_ARG_PARAMETER) != 0)
+ {
+ return -1;
+ }
+
+ this->notify_service_->init_service2 (this->orb_.in (), this->dispatching_orb_.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ }
+ else
+ {
+ this->notify_service_->init_service (this->orb_.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ }
if (this->nthreads_ > 0) // we have chosen to run in a thread pool.
{
@@ -303,6 +334,12 @@ TAO_Notify_Service_Driver::shutdown (ACE_ENV_SINGLE_ARG_DECL)
// shutdown the ORB.
if (!CORBA::is_nil (this->orb_.in ()))
this->orb_->shutdown ();
+
+ /// Release all the _vars as the ORB is gone now.
+ notify_factory_._retn ();
+ orb_._retn ();
+ poa_._retn ();
+ naming_._retn ();
}
int
@@ -318,6 +355,12 @@ TAO_Notify_Service_Driver::parse_args (int &argc, ACE_TCHAR *argv[])
this->notify_factory_name_.set (ACE_TEXT_ALWAYS_CHAR(current_arg));
arg_shifter.consume_arg ();
}
+ else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0)
+ {
+ ACE_DEBUG((LM_DEBUG, "Using separate dispatching ORB\n"));
+ this->separate_dispatching_orb_ = true;
+ arg_shifter.consume_arg ();
+ }
else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Boot")) == 0)
{
this->bootstrap_ = 1;
diff --git a/TAO/orbsvcs/Notify_Service/Notify_Service.h b/TAO/orbsvcs/Notify_Service/Notify_Service.h
index a32e6747b2a..ddf7203b146 100644
--- a/TAO/orbsvcs/Notify_Service/Notify_Service.h
+++ b/TAO/orbsvcs/Notify_Service/Notify_Service.h
@@ -90,6 +90,9 @@ protected:
int init_ORB (int& argc, ACE_TCHAR *argv []
ACE_ENV_ARG_DECL);
// initialize the ORB.
+ int init_dispatching_ORB (int& argc, ACE_TCHAR *argv []
+ ACE_ENV_ARG_DECL);
+ // initialize the dispatching ORB.
TAO_Notify_Service* notify_service_;
@@ -129,6 +132,9 @@ protected:
CORBA::ORB_var orb_;
// The ORB that we use.
+ CORBA::ORB_var dispatching_orb_;
+ // separate dispatching orb if needed.
+
PortableServer::POA_var poa_;
// Reference to the root poa.
@@ -140,6 +146,9 @@ protected:
int nthreads_;
// Number of worker threads.
+
+ bool separate_dispatching_orb_;
+ // indicate that a separate ORB is used for dispatching events.
};
#include /**/ "ace/post.h"
diff --git a/TAO/orbsvcs/Notify_Service/README b/TAO/orbsvcs/Notify_Service/README
index b83e208ac12..3dc06134774 100644
--- a/TAO/orbsvcs/Notify_Service/README
+++ b/TAO/orbsvcs/Notify_Service/README
@@ -46,9 +46,14 @@ Command line arguments:
Naming Service.
The default is "NotifyEventChannel".
-"-ORBRunThreads" : Number of threads to run the
+"-ORBRunThreads nthreads" : Number of threads to run the
ORB::run method.
+"-UseSeparateDispatchingORB 1|0"
+ : Indicates whether the service should create and
+ and use a separate ORB dedicated to dispatching of
+ events.
+
!! The -Notify_TPReactor option is deprecated!! use the -ORBRunThreads
option instead.
@@ -85,7 +90,7 @@ if you are using the "-NameSvc" options.
$TAO_ROOT/orbsvcs/Naming_Service/Naming_Service -o naming.ior
- and the CosEvent_Service as
+ and the Notify_Service as
$ Notify_Service -ORBInitRef NameService=file://naming.ior
@@ -113,7 +118,8 @@ The svc.conf options:
The "Notify_Default_Event_Manager_Objects_Factory" service object accepts the following options:
-"-DispatchingThreads [thread_count]" : How many threads for MT dispatching.
+"-DispatchingThreads [thread_count]" : Enables MT dispatching with the specified number
+ of threads.
"-ListenerThreads" : How many threads for listener filter evaluation.
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
index 8048c1dcaa5..4e769fedd26 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Any/PushConsumer.cpp
@@ -7,6 +7,7 @@ ACE_RCSID (Notify,
"$Id$")
#include "ace/Bound_Ptr.h"
+#include "tao/Stub.h" // For debug messages printing out ORBid.
#include "orbsvcs/CosEventCommC.h"
#include "orbsvcs/Notify/Event.h"
#include "orbsvcs/Notify/Properties.h"
@@ -35,14 +36,60 @@ TAO_Notify_PushConsumer::init (CosEventComm::PushConsumer_ptr push_consumer
ACE_THROW (CORBA::BAD_PARAM());
}
- this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer);
-
ACE_TRY
+ {
+ if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
+ {
+ this->push_consumer_ = CosEventComm::PushConsumer::_duplicate (push_consumer);
+
+ this->publish_ =
+ CosNotifyComm::NotifyPublish::_narrow (push_consumer ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ else
+ {
+ // "Port" consumer's object reference from receiving ORB to dispatching ORB.
+ CORBA::String_var temp =
+ TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
+
+ CORBA::Object_var obj =
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
+
+ CosEventComm::PushConsumer_var new_cos_comm_pc =
+ CosEventComm::PushConsumer::_unchecked_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ this->push_consumer_ =
+ CosEventComm::PushConsumer::_duplicate (new_cos_comm_pc.in());
+
+ //
+ // Note that here we do an _unchecked_narrow() in order to avoid
+ // making a call on the consumer b/c the consumer may not have activated
+ // its POA just yet. That means that before we use this reference the first
+ // time, we'll actually need to call _is_a() on it, i.e., the equivalent
+ // of an _narrow(). At the time of this writing, the only use of
+ // this->publish_ is in TAO_NS_Consumer::dispatch_updates_i (the superclass).
+ // If any other use is made of this data member, then the code to validate
+ // the actual type of the target object must be refactored.
+ this->publish_ =
+ CosNotifyComm::NotifyPublish::_unchecked_narrow (obj.in()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push init dispatching ORB id is %s.\n",
+ obj->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+ }
+ }
+ ACE_CATCH (CORBA::TRANSIENT, ex)
{
- this->publish_ =
- CosNotifyComm::NotifyPublish::_narrow (push_consumer
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
+ ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_PushConsumer::init");
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_PushConsumer %@\n", this));
}
ACE_CATCHANY
{
@@ -61,6 +108,13 @@ TAO_Notify_PushConsumer::release (void)
void
TAO_Notify_PushConsumer::push (const CORBA::Any& payload ACE_ENV_ARG_DECL)
{
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10) {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Any push dispatching ORB id is %s.\n",
+ this->push_consumer_->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+
this->push_consumer_->push (payload ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
index 7b19b418ebc..96a7d9ef8a7 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp
@@ -31,6 +31,7 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
: proxy_ (proxy)
, is_suspended_ (0)
+, have_not_yet_verified_publish_ (true)
, pacing_ (proxy->qos_properties_.pacing_interval ())
, max_batch_size_ (CosNotification::MaximumBatchSize, 0)
, timer_id_ (-1)
@@ -683,7 +684,14 @@ void
TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
ACE_ENV_ARG_DECL)
{
- if (!CORBA::is_nil (this->publish_.in ()))
+ if (this->have_not_yet_verified_publish_)
+ {
+ this->have_not_yet_verified_publish_ = false; // no need to check again
+ if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"
+ ACE_ENV_ARG_PARAMETER))
+ this->publish_ = CosNotifyComm::NotifyPublish::_nil();
+ }
+ if (! CORBA::is_nil (this->publish_.in ()))
this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
index 43b591b51e4..89058b6492e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h
@@ -165,6 +165,7 @@ protected:
/// Interface that accepts offer_changes
CosNotifyComm::NotifyPublish_var publish_;
+ bool have_not_yet_verified_publish_;
/// The Pacing Interval
const TAO_Notify_Property_Time & pacing_;
diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp
index 2128c1852a4..c71180e05de 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.cpp
@@ -110,6 +110,12 @@ TAO_CosNotify_Service::init (int argc, ACE_TCHAR *argv[])
task_per_proxy = 1;
arg_shifter.consume_arg ();
}
+ else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-UseSeparateDispatchingORB")) == 0)
+ {
+ properties->separate_dispatching_orb (true);
+ ACE_DEBUG((LM_DEBUG, ACE_TEXT("Using separate Dispatching ORB. \n")));
+ arg_shifter.consume_arg ();
+ }
else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-AllowReconnect")) == 0)
{
arg_shifter.consume_arg ();
@@ -198,7 +204,36 @@ TAO_CosNotify_Service::init_service (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, "Loading the Cos Notification Service...\n"));
- this->init_i (orb ACE_ENV_ARG_PARAMETER);
+ if (TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb())
+ {
+ // got here by way of svc.conf. no second orb supplied so create one
+ if (NULL == TAO_Notify_PROPERTIES::instance()->dispatching_orb())
+ {
+ ACE_DEBUG ((LM_DEBUG, "No dispatching orb supplied. Creating default one.\n"));
+
+ int argc = 0;
+ char *argv0 = 0;
+ char **argv = &argv0; // ansi requires argv be null terminated.
+ CORBA::ORB_var dispatcher = CORBA::ORB_init (argc, argv,
+ "default_dispatcher" ACE_ENV_ARG_PARAMETER);
+ //ACE_CHECK_RETURN (-1);
+
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb(dispatcher.in());
+ }
+
+ this->init_i2 (orb, TAO_Notify_PROPERTIES::instance()->dispatching_orb() ACE_ENV_ARG_PARAMETER);
+
+ }
+ else
+ {
+ this->init_i (orb ACE_ENV_ARG_PARAMETER);
+ }
+}
+
+void
+TAO_CosNotify_Service::init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL)
+{
+ this->init_i2 (orb, dispatching_orb ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
@@ -217,22 +252,57 @@ TAO_CosNotify_Service::init_i (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL)
PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- /// Set the properties
- TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
+ // Set the properties
+ TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
+
+ properties->orb (orb);
+ properties->default_poa (default_poa.in ());
+
+ // Init the factory
+ this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER));
+ ACE_CHECK;
+ ACE_ASSERT( this->factory_.get() != 0 );
+ TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
+
+ this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER));
+ ACE_CHECK;
+ ACE_ASSERT( this->builder_.get() != 0 );
+ TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
+}
+
+void
+TAO_CosNotify_Service::init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL)
+{
+ // Obtain the Root POA
+ CORBA::Object_var object =
+ orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (CORBA::is_nil (object.in ()))
+ ACE_ERROR ((LM_ERROR, " (%P|%t) Unable to resolve the RootPOA.\n"));
+
+ PortableServer::POA_var default_poa = PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Set the properties
+ TAO_Notify_Properties* properties = TAO_Notify_PROPERTIES::instance();
- properties->orb (orb);
- properties->default_poa (default_poa.in ());
+ properties->orb (orb);
+ properties->dispatching_orb (dispatching_orb);
+ properties->separate_dispatching_orb (true);
- // Init the factory
- this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER));
- ACE_CHECK;
- ACE_ASSERT( this->factory_.get() != 0 );
- TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
+ properties->default_poa (default_poa.in ());
- this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER));
- ACE_CHECK;
- ACE_ASSERT( this->builder_.get() != 0 );
- TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
+ // Init the factory and builder
+ this->factory_.reset (this->create_factory (ACE_ENV_SINGLE_ARG_PARAMETER));
+ ACE_CHECK;
+ ACE_ASSERT( this->factory_.get() != 0 );
+ TAO_Notify_PROPERTIES::instance()->factory (this->factory_.get());
+
+ this->builder_.reset (this->create_builder (ACE_ENV_SINGLE_ARG_PARAMETER));
+ ACE_CHECK;
+ ACE_ASSERT( this->builder_.get() != 0 );
+ TAO_Notify_PROPERTIES::instance()->builder (this->builder_.get());
}
TAO_Notify_Factory*
diff --git a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h
index cc91e77d372..c3e2e8ebd47 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/CosNotify_Service.h
@@ -51,6 +51,9 @@ public:
virtual int fini (void);
+ /// separate dispatching orb Init
+ virtual void init_service2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL);
+
/// Create the Channel Factory.
virtual CosNotifyChannelAdmin::EventChannelFactory_ptr create (PortableServer::POA_ptr default_POA ACE_ENV_ARG_DECL);
@@ -60,6 +63,8 @@ public:
protected:
/// Init the data members
virtual void init_i (CORBA::ORB_ptr orb ACE_ENV_ARG_DECL);
+ /// Init the data members separate dispatching orb
+ virtual void init_i2 (CORBA::ORB_ptr orb, CORBA::ORB_ptr dispatching_orb ACE_ENV_ARG_DECL);
private:
diff --git a/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp b/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp
index 0137934dd22..8d781c1929e 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/POA_Helper.cpp
@@ -87,7 +87,13 @@ TAO_Notify_POA_Helper::create_i (PortableServer::POA_ptr parent_poa, const char*
ACE_CHECK;
if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, "Created POA : %s\n", this->poa_->the_name ()));
+ {
+ CORBA::String_var the_name = this->poa_->the_name (
+ ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_DEBUG ((LM_DEBUG, "Created POA : %s\n", the_name.in ()));
+ }
+
/*
// Destroy the policies
for (CORBA::ULong index = 0; index < policy_list.length (); ++index)
@@ -134,7 +140,13 @@ TAO_Notify_POA_Helper::activate (PortableServer::Servant servant, CORBA::Long& i
id = this->id_factory_.id ();
if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, "Activating object with id = %d in POA : %s\n", id, this->poa_->the_name ()));
+ {
+ CORBA::String_var the_name = this->poa_->the_name (
+ ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+
+ ACE_DEBUG ((LM_DEBUG, "Activating object with id = %d in POA : %s\n", id, the_name.in ()));
+ }
// Convert CORBA::Long to ObjectId
PortableServer::ObjectId_var oid =
@@ -154,7 +166,12 @@ CORBA::Object_ptr
TAO_Notify_POA_Helper::activate_with_id (PortableServer::Servant servant, CORBA::Long id ACE_ENV_ARG_DECL)
{
if (DEBUG_LEVEL > 0)
- ACE_DEBUG ((LM_DEBUG, "Activating object with existing id = %d in POA : %s\n", id, this->poa_->the_name ()));
+ {
+ CORBA::String_var the_name = this->poa_->the_name (
+ ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (0);
+ ACE_DEBUG ((LM_DEBUG, "Activating object with existing id = %d in POA : %s\n", id, the_name.in ()));
+ }
this->id_factory_.set_last_used (id);
// Convert CORBA::Long to ObjectId
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp b/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp
index 798f24fb7f1..cf63ccb0be1 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.cpp
@@ -17,8 +17,11 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
TAO_Notify_Properties::TAO_Notify_Properties (void)
: factory_ (0)
, builder_ (0)
+ , orb_(0)
+ , dispatching_orb_ (0)
, asynch_updates_ (0)
, allow_reconnect_ (false)
+ , separate_dispatching_orb_ (false)
, updates_ (1)
{
// In case no conf. file is specified, the EC will default to reactive concurrency.
@@ -37,8 +40,13 @@ TAO_Notify_Properties::~TAO_Notify_Properties ()
{
}
-#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
-template class TAO_Singleton<TAO_Notify_Properties, ACE_Thread_Mutex> *TAO_Singleton<TAO_Notify_Properties, ACE_Thread_Mutex>::singleton_;
-#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */
+TAO_Notify_Properties *
+TAO_Notify_Properties::instance (void)
+{
+ // Hide the template instantiation to prevent multiple instances
+ // from being created.
+ return
+ TAO_Singleton<TAO_Notify_Properties, TAO_SYNCH_MUTEX>::instance ();
+}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.h b/TAO/orbsvcs/orbsvcs/Notify/Properties.h
index 7ab3ec74637..98e38df5705 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Properties.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.h
@@ -46,6 +46,9 @@ public:
/// Destructor
~TAO_Notify_Properties ();
+ /// Return a singleton instance of this class.
+ static TAO_Notify_Properties * instance (void);
+
// = Property Accessors
TAO_Notify_Factory* factory (void);
void factory (TAO_Notify_Factory* factory);
@@ -55,6 +58,8 @@ public:
CORBA::ORB_ptr orb (void);
void orb (CORBA::ORB_ptr orb);
+ CORBA::ORB_ptr dispatching_orb (void);
+ void dispatching_orb (CORBA::ORB_ptr dispatching_orb);
PortableServer::POA_ptr default_poa (void);
void default_poa (PortableServer::POA_ptr default_poa);
@@ -68,6 +73,8 @@ public:
// Turn on/off update messages.
CORBA::Boolean updates (void);
void updates (CORBA::Boolean updates);
+ bool separate_dispatching_orb (void);
+ void separate_dispatching_orb (bool b);
// The QoS Property that must be applied to each newly created Event Channel
const CosNotification::QoSProperties& default_event_channel_qos_properties (void);
@@ -109,6 +116,9 @@ protected:
/// ORB
CORBA::ORB_var orb_;
+ /// dispatching orb
+ CORBA::ORB_var dispatching_orb_;
+
// POA
PortableServer::POA_var default_poa_;
@@ -118,6 +128,9 @@ protected:
/// True if clients can reconnect to proxies.
bool allow_reconnect_;
+ /// True is separate dispatching orb
+ bool separate_dispatching_orb_;
+
/// True if updates are enabled (default).
CORBA::Boolean updates_;
@@ -140,9 +153,10 @@ protected:
CosNotification::QoSProperties pc_qos_;
};
-TAO_NOTIFY_SERV_SINGLETON_DECLARE (TAO_Singleton, TAO_Notify_Properties, TAO_SYNCH_MUTEX)
-
-typedef TAO_Singleton<TAO_Notify_Properties, TAO_SYNCH_MUTEX> TAO_Notify_PROPERTIES;
+/**
+ * @todo Remove this legacy TAO_Notify_Properties typedef.
+ */
+typedef TAO_Notify_Properties TAO_Notify_PROPERTIES;
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Properties.inl b/TAO/orbsvcs/orbsvcs/Notify/Properties.inl
index c53e7087f8e..e48c65afbed 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Properties.inl
+++ b/TAO/orbsvcs/orbsvcs/Notify/Properties.inl
@@ -34,12 +34,24 @@ TAO_Notify_Properties::orb (void)
return CORBA::ORB::_duplicate (orb_.in ());
}
+ACE_INLINE CORBA::ORB_ptr
+TAO_Notify_Properties::dispatching_orb (void)
+{
+ return CORBA::ORB::_duplicate (dispatching_orb_.in ());
+}
+
ACE_INLINE void
TAO_Notify_Properties::orb (CORBA::ORB_ptr orb)
{
orb_ = CORBA::ORB::_duplicate (orb);
}
+ACE_INLINE void
+TAO_Notify_Properties::dispatching_orb (CORBA::ORB_ptr dispatching_orb)
+{
+ dispatching_orb_ = CORBA::ORB::_duplicate (dispatching_orb);
+}
+
ACE_INLINE PortableServer::POA_ptr
TAO_Notify_Properties::default_poa (void)
{
@@ -76,6 +88,18 @@ TAO_Notify_Properties::allow_reconnect (bool b)
this->allow_reconnect_ = b;
}
+ACE_INLINE bool
+TAO_Notify_Properties::separate_dispatching_orb (void)
+{
+ return this->separate_dispatching_orb_;
+}
+
+ACE_INLINE void
+TAO_Notify_Properties::separate_dispatching_orb (bool b)
+{
+ this->separate_dispatching_orb_ = b;
+}
+
ACE_INLINE CORBA::Boolean
TAO_Notify_Properties::updates (void)
{
diff --git a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp
index 6a5aff084e8..20131d3e299 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.cpp
@@ -1,5 +1,6 @@
// $Id$
+#include "tao/TAO_Singleton.h"
#include "orbsvcs/Notify/RT_Properties.h"
#if ! defined (__ACE_INLINE__)
@@ -19,4 +20,14 @@ TAO_Notify_RT_Properties::~TAO_Notify_RT_Properties ()
{
}
+TAO_Notify_RT_Properties *
+TAO_Notify_RT_Properties::instance (void)
+{
+ // Hide the template instantiation to prevent multiple instances
+ // from being created.
+
+ return
+ TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX>::instance ();
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h
index 124f8757145..0dadf2b1533 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/RT_Properties.h
@@ -18,7 +18,6 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "tao/TAO_Singleton.h"
#include "tao/RTCORBA/RTCORBA.h"
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
@@ -31,14 +30,14 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
*/
class TAO_RT_Notify_Export TAO_Notify_RT_Properties
{
- friend class TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX>;
-
public:
/// Constuctor
TAO_Notify_RT_Properties (void);
/// Destructor
~TAO_Notify_RT_Properties ();
+ /// Return singleton instance of this class.
+ static TAO_Notify_RT_Properties * instance (void);
RTCORBA::RTORB_ptr rt_orb (void);
void rt_orb (RTCORBA::RTORB_ptr rt_orb);
@@ -54,9 +53,10 @@ protected:
RTCORBA::Current_var current_;
};
-TAO_RT_NOTIFY_SINGLETON_DECLARE (TAO_Singleton, TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX)
-
-typedef TAO_Singleton<TAO_Notify_RT_Properties, TAO_SYNCH_MUTEX> TAO_Notify_RT_PROPERTIES;
+/**
+ * @todo Remove this legacy TAO_Notify_RT_Properties typedef.
+ */
+typedef TAO_Notify_RT_Properties TAO_Notify_RT_PROPERTIES;
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index 19f972cd0cc..a221f8cde94 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -6,6 +6,7 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$")
#include "ace/Reactor.h"
#include "tao/debug.h"
+#include "tao/Stub.h" // For debug messages printing out ORBid.
#include "orbsvcs/Notify/QoSProperties.h"
#include "orbsvcs/Notify/ProxySupplier.h"
#include "orbsvcs/Notify/Worker_Task.h"
@@ -42,8 +43,49 @@ TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr p
ACE_THROW (CORBA::BAD_PARAM());
}
- this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
- this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+ if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
+ {
+ this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+ }
+ else
+ {
+ // "Port" consumer's object reference from receiving ORB to dispatching ORB.
+ CORBA::String_var temp =
+ TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
+
+ CORBA::Object_var obj =
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
+
+ ACE_TRY
+ {
+ CosNotifyComm::SequencePushConsumer_var new_push_consumer =
+ CosNotifyComm::SequencePushConsumer::_unchecked_narrow(obj.in());
+ ACE_TRY_CHECK;
+
+ this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (new_push_consumer);
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer);
+
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Sequence push init dispatching ORB id is %s.\n",
+ obj->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+ }
+ ACE_CATCH (CORBA::TRANSIENT, ex)
+ {
+ ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_SequencePushConsumer::init");
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_SequencePushConsumer %@\n", this));
+ }
+ ACE_CATCHANY
+ {
+ // _narrow failed
+ }
+ ACE_ENDTRY;
+ }
}
void
@@ -259,6 +301,13 @@ TAO_Notify_SequencePushConsumer::push (const CosNotification::StructuredEvent& /
void
TAO_Notify_SequencePushConsumer::push (const CosNotification::EventBatch& event_batch ACE_ENV_ARG_DECL)
{
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10) {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Sequence push dispatching ORB id is %s.\n",
+ this->push_consumer_->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+
this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
index feb1b56c29c..c9fc25077b6 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Structured/StructuredPushConsumer.cpp
@@ -3,9 +3,10 @@
ACE_RCSID(RT_Notify, TAO_Notify_StructuredPushConsumer, "$Id$")
+#include "ace/Bound_Ptr.h"
+#include "tao/Stub.h" // For debug messages printing out ORBid.
#include "orbsvcs/Notify/Properties.h"
#include "orbsvcs/Notify/Event.h"
-#include "ace/Bound_Ptr.h"
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
@@ -29,10 +30,48 @@ TAO_Notify_StructuredPushConsumer::init (CosNotifyComm::StructuredPushConsumer_p
ACE_THROW (CORBA::BAD_PARAM());
}
- this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (push_consumer);
-
- this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
-
+ if (!TAO_Notify_PROPERTIES::instance()->separate_dispatching_orb ())
+ {
+ this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (push_consumer);
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
+ }
+ else
+ {
+ // "Port" consumer's object reference from receiving ORB to dispatching ORB.
+ CORBA::String_var temp =
+ TAO_Notify_PROPERTIES::instance()->orb()->object_to_string(push_consumer);
+
+ CORBA::Object_var obj =
+ TAO_Notify_PROPERTIES::instance()->dispatching_orb()->string_to_object(temp.in());
+
+ ACE_TRY
+ {
+ CosNotifyComm::StructuredPushConsumer_var new_push_consumer =
+ CosNotifyComm::StructuredPushConsumer::_unchecked_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ this->push_consumer_ = CosNotifyComm::StructuredPushConsumer::_duplicate (new_push_consumer.in());
+ this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (new_push_consumer.in());
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Structured push init dispatching ORB id is %s.\n",
+ obj->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+ }
+ ACE_CATCH (CORBA::TRANSIENT, ex)
+ {
+ ACE_PRINT_EXCEPTION (ex, "Got a TRANSIENT in NS_StructuredPushConsumer::init");
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) got it for NS_StructuredPushConsumer %@\n", this));
+ }
+ ACE_CATCHANY
+ {
+ // _narrow failed
+ }
+ ACE_ENDTRY;
+ }
}
void
@@ -55,7 +94,15 @@ TAO_Notify_StructuredPushConsumer::push (const CORBA::Any& event ACE_ENV_ARG_DEC
void
TAO_Notify_StructuredPushConsumer::push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL)
{
+ //--cj verify dispatching ORB
+ if (TAO_debug_level >= 10) {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Structured push dispatching ORB id is %s.\n",
+ this->push_consumer_->_stubobj()->orb_core()->orbid()));
+ }
+ //--cj end
+
this->push_consumer_->push_structured_event (event ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
}
/// Push a batch of events to this consumer.
diff --git a/TAO/orbsvcs/tests/Notify/Basic/run_test.pl b/TAO/orbsvcs/tests/Notify/Basic/run_test.pl
index 23c71d27f1a..18fccbc0466 100755
--- a/TAO/orbsvcs/tests/Notify/Basic/run_test.pl
+++ b/TAO/orbsvcs/tests/Notify/Basic/run_test.pl
@@ -91,11 +91,15 @@ if (PerlACE::waitforfile_timed ($namingior, $startup_timeout) == -1) {
exit 1;
}
+for $dispatch_opt ("", "-UseSeparateDispatchingOrb 1")
+{
+
for $config (@test_configs)
{
print STDERR "\nTesting Notification Service with config file = $config ....\n\n";
$Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service",
+ ' '.$dispatch_opt.' '.
"-ORBInitRef NameService=file://$namingior " .
"-IORoutput $notifyior " .
"-ORBSvcConf $config");
@@ -143,6 +147,7 @@ for $config (@test_configs)
$Notification->Kill ();
}
+}
$Naming->Kill ();
diff --git a/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl b/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl
index 8a181a0712b..0a37bf4613b 100755
--- a/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl
+++ b/TAO/orbsvcs/tests/Notify/Basic/run_test_ipv6.pl
@@ -87,11 +87,14 @@ if (PerlACE::waitforfile_timed ($namingior, $startup_timeout) == -1) {
exit 1;
}
+for $dispatch_opt ("", "-UseSeparateDispatchingOrb 1")
+{
for $config (@test_configs)
{
print STDERR "\nTesting Notification Service with config file = $config ....\n\n";
$Notification = new PerlACE::Process ("../../../Notify_Service/Notify_Service",
+ ' '.$dispatch_opt.' '.
"-ORBInitRef NameService=file://$namingior " .
"-IORoutput $notifyior " .
"-ORBSvcConf $config " .
@@ -136,6 +139,7 @@ for $config (@test_configs)
$Notification->Kill ();
}
+}
$Naming->Kill ();