summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp
diff options
context:
space:
mode:
authorpradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-02-26 07:48:31 +0000
committerpradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-02-26 07:48:31 +0000
commit4f38ae570ea7355fb227ba3698b96082fd23aa76 (patch)
treefc70c8de3615f50f096ad7007037be237f95887a /TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp
parent1586f6fd7343bf3b818e8c4b1eb83112a0c08ade (diff)
downloadATCD-4f38ae570ea7355fb227ba3698b96082fd23aa76.tar.gz
Mon Feb 26 01:47:04 2001 Pradeep Gore <pradeep@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp277
1 files changed, 192 insertions, 85 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp
index 2231bb97ff3..490a1ce7968 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp
@@ -13,6 +13,26 @@
ACE_RCSID(Notify, Notify_Default_EMO_Factory, "$Id$")
+#define ASYNCH_UPDATES_THREADS 1
+ // The number of threads to send subscription/publication updates.
+ // As this is a relatively adminstrative function. let's just use 1
+ // thread to send the updates asynchronously.
+
+
+ TAO_EMO_Options::TAO_EMO_Options (void)
+ : mt_dispatching_ (0),
+ mt_source_eval_ (0),
+ mt_lookup_ (0),
+ mt_listener_eval_ (0),
+ asynch_updates_ (0),
+ alloc_task_per_proxy_ (0),
+ dispatching_threads_ (1),
+ source_threads_ (1),
+ lookup_threads_ (1),
+ listener_threads_ (1)
+{
+}
+
int
TAO_Notify_Default_EMO_Factory::init_svc (void)
{
@@ -21,25 +41,23 @@ TAO_Notify_Default_EMO_Factory::init_svc (void)
}
TAO_Notify_Default_EMO_Factory::TAO_Notify_Default_EMO_Factory (void)
- : mt_dispatching_ (0),
- mt_source_eval_ (0),
- mt_lookup_ (0),
- mt_listener_eval_ (0),
- dispatching_threads_ (1),
- source_threads_ (1),
- lookup_threads_ (1),
- listener_threads_ (1)
+ :prealloc_source_eval_task_ (0),
+ prealloc_listener_eval_task_ (0),
+ prealloc_dispatching_task_ (0)
{
}
TAO_Notify_Default_EMO_Factory::~TAO_Notify_Default_EMO_Factory ()
{
+ delete prealloc_source_eval_task_;
+ delete prealloc_listener_eval_task_;
+ delete prealloc_dispatching_task_;
}
int
TAO_Notify_Default_EMO_Factory::init (int argc, char* argv[])
{
- ACE_DEBUG ((LM_DEBUG, "TAO_Notify_Default_EMO_Factory::init\n"));
+ // ACE_DEBUG ((LM_DEBUG, "TAO_Notify_Default_EMO_Factory::init\n"));
ACE_Arg_Shifter arg_shifter (argc, argv);
@@ -50,56 +68,126 @@ TAO_Notify_Default_EMO_Factory::init (int argc, char* argv[])
if (ACE_OS::strcasecmp (arg, "-MTDispatching") == 0)
{
- this->mt_dispatching_ = 1;
+ EMO_OPTIONS::instance ()->mt_dispatching_ = 1;
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-DispatchingThreads")))
{
- this->dispatching_threads_ = ACE_OS::atoi (current_arg);
+ EMO_OPTIONS::instance ()->dispatching_threads_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if (ACE_OS::strcasecmp (arg, "-MTSourceEval") == 0)
{
- this->mt_source_eval_ = 1;
+ EMO_OPTIONS::instance ()->mt_source_eval_ = 1;
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-SourceThreads")))
{
- this->source_threads_ = ACE_OS::atoi (current_arg);
+ EMO_OPTIONS::instance ()->source_threads_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if (ACE_OS::strcasecmp (arg, "-MTLookup") == 0)
{
- this->mt_lookup_ = 1;
+ EMO_OPTIONS::instance ()->mt_lookup_ = 1;
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-LookupThreads")))
{
- this->lookup_threads_ = ACE_OS::atoi (current_arg);
+ EMO_OPTIONS::instance ()->lookup_threads_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if (ACE_OS::strcasecmp (arg, "-MTListenerEval") == 0)
{
- this->mt_listener_eval_ = 1;
+ EMO_OPTIONS::instance ()->mt_listener_eval_ = 1;
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-ListenerThreads")))
{
- this->listener_threads_ = ACE_OS::atoi (current_arg);
+ EMO_OPTIONS::instance ()->listener_threads_ = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if (ACE_OS::strcasecmp (arg, "-AsynchUpdates") == 0)
+ {
+ EMO_OPTIONS::instance ()->asynch_updates_ = 1;
+ arg_shifter.consume_arg ();
+ }
+ else if (ACE_OS::strcasecmp (arg, "-AllocateTaskperProxy") == 0)
+ {
+ EMO_OPTIONS::instance ()->alloc_task_per_proxy_ = 1;
arg_shifter.consume_arg ();
}
else
{
- arg_shifter.ignore_arg ();
+ ACE_DEBUG ((LM_DEBUG,"EMO Factory did not understand %s",arg));
+ arg_shifter.ignore_arg ();
}
}
+
+ return 0;
+}
+
+TAO_Notify_Worker_Task*
+TAO_Notify_Default_EMO_Factory::create_task (int mt, int tp_size,CORBA::Environment &ACE_TRY_ENV)
+{
+ TAO_Notify_Worker_Task* task;
+
+ int threads_flags =
+ THR_SCHED_DEFAULT|THR_BOUND|THR_NEW_LWP;
+
+ // int dispatching_threads_priority
+
+ // Later: give the user options to specify threads flags and thread priority for each task.
+
+ if (mt == 1)
+ ACE_NEW_THROW_EX (task, TAO_Notify_MT_Worker_Task (tp_size,
+ threads_flags,
+ 0),
+ CORBA::NO_MEMORY ());
+ else
+ ACE_NEW_THROW_EX (task,
+ TAO_Notify_Worker_Task (),
+ CORBA::NO_MEMORY ());
+ return task;
+}
+
+
+int
+TAO_Notify_Default_EMO_Factory::init_instance (void)
+{
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 0) // preallocate all tasks.
+ {
+ return this->preallocate_tasks ();
+ }
+ return 0;
+}
+
+int
+TAO_Notify_Default_EMO_Factory::preallocate_tasks (void)
+{
+ ACE_DECLARE_NEW_CORBA_ENV;
+
+ this->prealloc_source_eval_task_ =
+ create_task (EMO_OPTIONS::instance ()->mt_source_eval_, EMO_OPTIONS::instance ()->source_threads_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ this->prealloc_listener_eval_task_ =
+ create_task (EMO_OPTIONS::instance ()->mt_listener_eval_, EMO_OPTIONS::instance ()->listener_threads_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
+ this->prealloc_dispatching_task_ =
+ create_task (EMO_OPTIONS::instance ()->mt_dispatching_, EMO_OPTIONS::instance ()->dispatching_threads_,
+ ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
+
return 0;
}
int
TAO_Notify_Default_EMO_Factory::fini (void)
{
- ACE_DEBUG ((LM_DEBUG, "TAO_Notify_Default_EMO_Factory::fini\n"));
+ // ACE_DEBUG ((LM_DEBUG, "TAO_Notify_Default_EMO_Factory::fini\n"));
return 0;
}
@@ -108,7 +196,7 @@ TAO_Notify_Default_EMO_Factory::create_event_manager (TAO_Notify_EventChannel_i*
{
TAO_Notify_Event_Manager* event_manager;
ACE_NEW_THROW_EX (event_manager,
- TAO_Notify_Event_Manager (channel),
+ TAO_Notify_Event_Manager (channel, this),
CORBA::NO_MEMORY ());
return event_manager;
}
@@ -118,7 +206,7 @@ TAO_Notify_Default_EMO_Factory::create_event_map (CORBA::Environment &ACE_TRY_EN
{
TAO_Notify_Event_Map* event_map;
ACE_NEW_THROW_EX (event_map,
- TAO_Notify_Event_Map (),
+ TAO_Notify_Event_Map (this),
CORBA::NO_MEMORY ());
return event_map;
}
@@ -137,96 +225,109 @@ TAO_Notify_Default_EMO_Factory::create_event_processor (TAO_Notify_Event_Manager
TAO_Notify_Worker_Task*
TAO_Notify_Default_EMO_Factory::create_source_eval_task (CORBA::Environment &ACE_TRY_ENV)
{
- // @@ pass the correct option to initialize this as passive/active object.
- TAO_Notify_Worker_Task* task;
-
- if (this->mt_source_eval_ == 1)
- ACE_NEW_THROW_EX (task, TAO_Notify_MT_Worker_Task (this->source_threads_),
- CORBA::NO_MEMORY ());
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ return create_task (EMO_OPTIONS::instance ()->mt_source_eval_,
+ EMO_OPTIONS::instance ()->source_threads_,
+ ACE_TRY_ENV);
else
- ACE_NEW_THROW_EX (task,
- TAO_Notify_Worker_Task (),
- CORBA::NO_MEMORY ());
- return task;
+ return prealloc_source_eval_task_;
}
TAO_Notify_Worker_Task*
TAO_Notify_Default_EMO_Factory::create_lookup_task (CORBA::Environment &ACE_TRY_ENV)
{
- // @@ pass the correct option to initialize this as passive/active object.
- TAO_Notify_Worker_Task* task;
-
- if (this->mt_lookup_ == 1)
- ACE_NEW_THROW_EX (task, TAO_Notify_MT_Worker_Task (this->lookup_threads_),
- CORBA::NO_MEMORY ());
- else
- ACE_NEW_THROW_EX (task,
- TAO_Notify_Worker_Task (),
- CORBA::NO_MEMORY ());
- return task;
+ return create_task (EMO_OPTIONS::instance ()->mt_lookup_,
+ EMO_OPTIONS::instance ()->lookup_threads_, ACE_TRY_ENV);
}
TAO_Notify_Worker_Task*
TAO_Notify_Default_EMO_Factory::create_listener_eval_task (CORBA::Environment &ACE_TRY_ENV)
{
- // @@ pass the correct option to initialize this as passive/active object.
- TAO_Notify_Worker_Task* task;
-
- if (this->mt_listener_eval_ == 1)
- ACE_NEW_THROW_EX (task, TAO_Notify_MT_Worker_Task (this->listener_threads_),
- CORBA::NO_MEMORY ());
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ return create_task (EMO_OPTIONS::instance ()->mt_listener_eval_,
+ EMO_OPTIONS::instance ()->listener_threads_,
+ ACE_TRY_ENV);
else
- ACE_NEW_THROW_EX (task,
- TAO_Notify_Worker_Task (),
- CORBA::NO_MEMORY ());
- return task;
+ return prealloc_listener_eval_task_;
}
TAO_Notify_Worker_Task*
TAO_Notify_Default_EMO_Factory::create_dispatching_task (CORBA::Environment &ACE_TRY_ENV)
{
- TAO_Notify_Worker_Task* task;
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ return create_task (EMO_OPTIONS::instance ()->mt_dispatching_,
+ EMO_OPTIONS::instance ()->dispatching_threads_,
+ ACE_TRY_ENV);
+ else
+ return prealloc_dispatching_task_;
+}
- int dispatching_threads_flags =
- THR_SCHED_DEFAULT|THR_BOUND|THR_NEW_LWP;
+TAO_Notify_Worker_Task*
+TAO_Notify_Default_EMO_Factory::create_updates_task (CORBA::Environment &ACE_TRY_ENV)
+{
+ return create_task (EMO_OPTIONS::instance ()->asynch_updates_, ASYNCH_UPDATES_THREADS, ACE_TRY_ENV);
+}
- // int dispatching_threads_priority
+void
+TAO_Notify_Default_EMO_Factory::destroy_source_eval_task (TAO_Notify_Worker_Task* task)
+{
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ delete task;
+}
- // Later: give the user options to specify threads flags and thread priority for each task.
+void
+TAO_Notify_Default_EMO_Factory::destroy_listener_eval_task (TAO_Notify_Worker_Task* task)
+{
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ delete task;
+}
- if (this->mt_dispatching_ == 1)
- ACE_NEW_THROW_EX (task, TAO_Notify_MT_Worker_Task (this->dispatching_threads_,
- dispatching_threads_flags,
- 0),
- CORBA::NO_MEMORY ());
- else
- ACE_NEW_THROW_EX (task,
- TAO_Notify_Worker_Task (),
- CORBA::NO_MEMORY ());
- return task;
+void
+TAO_Notify_Default_EMO_Factory::destroy_dispatching_task (TAO_Notify_Worker_Task* task)
+{
+ if (EMO_OPTIONS::instance ()->alloc_task_per_proxy_ == 1)
+ delete task;
+}
+
+void
+TAO_Notify_Default_EMO_Factory::destroy_lookup_task (TAO_Notify_Worker_Task* task)
+{
+ delete task;
}
void
+TAO_Notify_Default_EMO_Factory::destroy_updates_task (TAO_Notify_Worker_Task* task)
+{
+ delete task;
+}
+
+
+void
TAO_Notify_Default_EMO_Factory::print_values (void)
{
- ACE_DEBUG ((LM_DEBUG,
- "EMO Factory = "
- "mt_dispatching %d "
- "mt_source_eval %d "
- " mt_lookup %d "
- " mt_listener_eval %d"
- " dispatching_threads %d "
- " source_threads %d "
- " lookup_threads %d "
- " listener_threads_ %d \n",
- mt_dispatching_,
- mt_source_eval_,
- mt_lookup_,
- mt_listener_eval_,
- dispatching_threads_,
- source_threads_,
- lookup_threads_,
- listener_threads_
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "EMO Factory = "
+ "mt_dispatching %d "
+ "mt_source_eval %d "
+ " mt_lookup %d "
+ " mt_listener_eval %d"
+ " dispatching_threads %d "
+ " source_threads %d "
+ " lookup_threads %d "
+ " listener_threads_ %d ",
+ " AsynchUpdates %d",
+ " AllocateTaskperProxy %d",
+ EMO_OPTIONS::instance ()->mt_dispatching_,
+ EMO_OPTIONS::instance ()->mt_source_eval_,
+ EMO_OPTIONS::instance ()->mt_lookup_,
+ EMO_OPTIONS::instance ()->mt_listener_eval_,
+ EMO_OPTIONS::instance ()->dispatching_threads_,
+ EMO_OPTIONS::instance ()->source_threads_,
+ EMO_OPTIONS::instance ()->lookup_threads_,
+ EMO_OPTIONS::instance ()->listener_threads_,
+ EMO_OPTIONS::instance ()->asynch_updates_,
+ EMO_OPTIONS::instance ()->alloc_task_per_proxy_
));
}
@@ -241,3 +342,9 @@ ACE_STATIC_SVC_DEFINE (TAO_Notify_Default_EMO_Factory,
ACE_FACTORY_DEFINE (TAO_Notify, TAO_Notify_Default_EMO_Factory)
// ****************************************************************
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Singleton<TAO_EMO_Options, ACE_Null_Mutex>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Singleton<TAO_EMO_Options, ACE_Null_Mutex>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */