summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TAO/ChangeLog-98c44
-rw-r--r--TAO/orbsvcs/Event_Service/Event_Service.cpp70
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp46
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h19
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i24
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp50
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.h15
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.i18
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp1
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp54
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Task_Manager.h44
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Task_Manager.i7
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl12
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecEventComm.idl4
-rw-r--r--TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp13
-rwxr-xr-xTAO/orbsvcs/tests/EC_Multiple/histo.pl2
-rw-r--r--TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp13
18 files changed, 325 insertions, 115 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c
index c1db9268677..aab12d4f4a9 100644
--- a/TAO/ChangeLog-98c
+++ b/TAO/ChangeLog-98c
@@ -1,3 +1,47 @@
+Tue Jun 23 12:18:39 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/orbsvcs/Event/Task_Manager.h:
+ * orbsvcs/orbsvcs/Event/Task_Manager.i:
+ * orbsvcs/orbsvcs/Event/Task_Manager.cpp:
+ * orbsvcs/orbsvcs/Event/Event_Channel.h:
+ * orbsvcs/orbsvcs/Event/Event_Channel.i:
+ * orbsvcs/orbsvcs/Event/Event_Channel.cpp:
+ * orbsvcs/orbsvcs/Event/Dispatching_Modules.h:
+ * orbsvcs/orbsvcs/Event/Dispatching_Modules.i:
+ * orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp:
+ * orbsvcs/orbsvcs/Event/RT_Task.cpp:
+ * orbsvcs/orbsvcs/Event/ReactorTask.cpp:
+ The Event Channel can be shutdown cleanly using the destroy()
+ method. The problem was that two components (the Dispatching
+ Module and the TaskManager or the handler for Timer threads)
+ were not waiting for their threads to shutdown.
+ The modules keep their own Thread_Manager to wait for shutdown.
+ Startup was also changed: the event channel constructor
+ receives a new argument to control the creation of the internal
+ threads; if the argument is FALSE the user must call the
+ activate() method to start the threads.
+
+ * orbsvcs/orbsvcs/RtecEventComm.idl:
+ * orbsvcs/orbsvcs/RtecEventChannelAdmin.idl:
+ Many operations were oneways because early releases of TAO did
+ not support nested upcalls.
+
+ * orbsvcs/tests/EC_Multiple/EC_Multiple.cpp:
+ use the activate method to start the threads.
+
+ * orbsvcs/Event_Service/Event_Service.cpp:
+ Added support for a collocated Scheduling Service; this is the
+ common use case that we wish to implement, improves performance
+ and works around some nested upcall problems in the ORB.
+ The user can select the old behavior using the <-s global>
+ flag.
+
+ * orbsvcs/tests/Event_Latency/Event_Latency.cpp:
+ Only shutdown the EC once our event loop exits
+
+ * orbsvcs/tests/EC_Multiple/histo.pl:
+ Fixed typo in a comment.
+
Tue Jun 23 11:59:12 1998 David L. Levine <levine@cs.wustl.edu>
* tests/Cubit/TAO/IDL_Cubit/Makefile,tests/Thruput/TAO/Makefile,
diff --git a/TAO/orbsvcs/Event_Service/Event_Service.cpp b/TAO/orbsvcs/Event_Service/Event_Service.cpp
index c6741ad7f76..5482176333f 100644
--- a/TAO/orbsvcs/Event_Service/Event_Service.cpp
+++ b/TAO/orbsvcs/Event_Service/Event_Service.cpp
@@ -1,19 +1,28 @@
// $Id$
#include "ace/Get_Opt.h"
+#include "ace/Auto_Ptr.h"
#include "tao/corba.h"
#include "orbsvcs/CosNamingC.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Sched/Config_Scheduler.h"
#include "orbsvcs/Event/Event_Channel.h"
const char* service_name = "EventService";
+// The name we use to register with the Naming Service.
+
+int global_scheduler = 0;
+// If 0 we instantiante a local Scheduling Service and register it
+// with the Naming Service.
+// Otherwise we just resolve the Scheduling Service using the Naming
+// Service (i.e. we assume there is a global scheduling service running.
int
parse_args (int argc, char *argv [])
{
- ACE_Get_Opt get_opt (argc, argv, "n:");
+ ACE_Get_Opt get_opt (argc, argv, "n:s:");
int opt;
while ((opt = get_opt ()) != EOF)
@@ -23,6 +32,31 @@ parse_args (int argc, char *argv [])
case 'n':
service_name = get_opt.optarg;
break;
+
+ case 's':
+ // It could be just a flag (i.e. no "global" or "local"
+ // argument, but this is consistent with the EC_Multiple
+ // test and also allows for a runtime scheduling service.
+
+ if (ACE_OS::strcasecmp (get_opt.optarg, "global") == 0)
+ {
+ global_scheduler = 1;
+ }
+ else if (ACE_OS::strcasecmp (get_opt.optarg, "local") == 0)
+ {
+ global_scheduler = 0;
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Unknown scheduling type <%s> "
+ "defaulting to local\n",
+ get_opt.optarg));
+ global_scheduler = 0;
+ }
+ break;
+
+
case '?':
default:
ACE_DEBUG ((LM_DEBUG,
@@ -76,6 +110,32 @@ int main (int argc, char *argv[])
CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV);
TAO_CHECK_ENV;
+ auto_ptr<POA_RtecScheduler::Scheduler> scheduler_impl;
+ RtecScheduler::Scheduler_var scheduler;
+
+ if (global_scheduler == 0)
+ {
+ scheduler_impl =
+ auto_ptr<POA_RtecScheduler::Scheduler>(new ACE_Config_Scheduler);
+ if (scheduler_impl.get () == 0)
+ return 1;
+ scheduler = scheduler_impl->_this (TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+
+ CORBA::String_var str =
+ orb->object_to_string (scheduler.in (), TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+ ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n",
+ str.in ()));
+
+ // Register the servant with the Naming Context....
+ CosNaming::Name schedule_name (1);
+ schedule_name.length (1);
+ schedule_name[0].id = CORBA::string_dup ("ScheduleService");
+ naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+ }
+
ACE_Scheduler_Factory::use_config (naming_context.in ());
// Register Event_Service with Naming Service.
@@ -113,3 +173,11 @@ int main (int argc, char *argv[])
return 0;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler>;
+template class auto_ptr<POA_RtecScheduler::Scheduler>;
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler>
+#pragma instantiate auto_ptr<POA_RtecScheduler::Scheduler>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp
index 51e224e8a3c..99c96d272ee 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp
@@ -91,10 +91,24 @@ ACE_ES_Dispatch_Request::operator delete (void *mem)
// ************************************************************
+void
+ACE_ES_Dispatching_Base::activate (void)
+{
+}
+
+void
+ACE_ES_Dispatching_Base::shutdown (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "EC (%t) ACE_ES_Dispatching_Base module shutting down.\n"));
+}
+
+// ************************************************************
+
ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *channel,
int threads_per_queue)
: ACE_ES_Dispatching_Base (channel),
- notification_strategy_ (this),
+ notification_strategy_ (this, channel->task_manager ()),
highest_priority_ (0),
shutdown_ (0),
threads_per_queue_ (threads_per_queue)
@@ -110,8 +124,6 @@ ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *chan
queues_[x] = 0;
delete_me_queues_[x] = 0;
}
-
- this->initialize_queues ();
}
ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void)
@@ -125,7 +137,7 @@ ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void)
void
ACE_ES_Priority_Dispatching::initialize_queues (void)
{
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
+ for (int x = 0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
{
// Convert ACE_Scheduler_Rate (it's really a period, not a rate!)
// to a form we can easily work with.
@@ -135,18 +147,20 @@ ACE_ES_Priority_Dispatching::initialize_queues (void)
RtecScheduler::Period period = period_tv.sec () * 10000000 +
period_tv.usec () * 10;
- queues_[x] = new ACE_ES_Dispatch_Queue (this, &notification_strategy_);
- if (queues_[x] == 0 ||
- queues_[x]->open_queue (period,
- threads_per_queue_) == -1)
+ ACE_NEW (this->queues_[x],
+ ACE_ES_Dispatch_Queue (this, &notification_strategy_));
+ this->queues_[x]->thr_mgr (&this->thr_mgr_);
+
+ if ( this->queues_[x]->open_queue (period,
+ threads_per_queue_) == -1)
{
- ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Dispatching::initialize_queues"));
+ ACE_ERROR ((LM_ERROR, "%p.\n",
+ "ACE_ES_Priority_Dispatching::initialize_queues"));
return;
}
- queue_count_[x] = 1;
+ this->queue_count_[x] = 1;
}
-
highest_priority_ = ACE_Scheduler_MAX_PRIORITIES - 1;
}
@@ -361,6 +375,12 @@ ACE_ES_Priority_Dispatching::handle_input (ACE_HANDLE)
return this->handle_signal (0, 0, 0);
}
+void
+ACE_ES_Priority_Dispatching::activate (void)
+{
+ this->initialize_queues ();
+}
+
// Shutdown each queue. When each queue exits, they will call back
// this->dispatch_queue_closed which allows us to free up resources.
// When the last queue has closed, we'll delete ourselves.
@@ -387,6 +407,10 @@ ACE_ES_Priority_Dispatching::shutdown (void)
ACE_DEBUG ((LM_DEBUG, "shutting down dispatch queue %d.\n", x));
queues_[x]->shutdown_task ();
}
+
+ if (this->thr_mgr_.wait () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "Priority_Dispatching::shutdown - waiting"));
}
// This gets called every time a Dispatch Queue closes down. We
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
index b61799d7ba1..9df28723559 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
+++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h
@@ -86,6 +86,10 @@ public:
virtual void dispatch_queue_closed (ACE_ES_Dispatch_Queue *q);
// Called when all the threads of a <q> have exited.
+ virtual void activate (void);
+ // This is called by the Event Channel. It will create all the
+ // threads and only return once they are all up and running.
+
virtual void shutdown (void);
// This is called by the Event Channel. This will attempt to shut
// down all of its threads gracefully. Wish it luck.
@@ -242,7 +246,8 @@ class TAO_ORBSVCS_Export ACE_ES_ReactorEx_NS : public ACE_Notification_Strategy
// the ReactorEx::notify mechanism.
{
public:
- ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh);
+ ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh,
+ ACE_Task_Manager *tm);
// Stores away <eh> for when this->open is called.
int open (void);
@@ -264,6 +269,9 @@ public:
private:
ACE_Auto_Event event_;
// Registered with the ReactorEx.
+
+ ACE_Task_Manager *task_manager_;
+ // To gain access into the Reactor tasks.
};
typedef ACE_ES_ReactorEx_NS ACE_ES_Notification_Strategy;
@@ -279,7 +287,8 @@ class TAO_ORBSVCS_Export ACE_ES_Reactor_NS : public ACE_Reactor_Notification_Str
// version is for non WIN32 platforms.
{
public:
- ACE_ES_Reactor_NS (ACE_Event_Handler *eh);
+ ACE_ES_Reactor_NS (ACE_Event_Handler *eh,
+ ACE_Task_Manager *tm);
// Calls ACE_Reactor_Notification_Strategy with the ORB's reactor
// and signal mask.
@@ -387,6 +396,9 @@ public:
CORBA::Environment &);
// Enqueues the request on the appropriate Dispatch Queue.
+ virtual void activate (void);
+ // Open all queues.
+
virtual void shutdown (void);
// Closes all queues "asynchronously." When all queues are closed,
// deletes them all and then deletes itself.
@@ -431,6 +443,9 @@ protected:
int threads_per_queue_;
// The number of threads to spawn for each dispatch queue.
+
+ ACE_RT_Thread_Manager thr_mgr_;
+ // The thread manager for the threads of this object.
};
// ************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
index d819d5485c9..93b7b6b5c09 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
+++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i
@@ -197,32 +197,28 @@ ACE_ES_Dispatching_Base::dispatch_event (ACE_ES_Dispatch_Request *request,
return 0;
}
-ACE_INLINE void
-ACE_ES_Dispatching_Base::shutdown (void)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) ACE_ES_Dispatching_Base module shutting down.\n"));
-}
-
// ************************************************************
#if defined (ACE_WIN32)
ACE_INLINE
-ACE_ES_ReactorEx_NS::ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh)
- : ACE_Notification_Strategy (eh, ACE_Event_Handler::NULL_MASK)
+ACE_ES_ReactorEx_NS::ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh,
+ ACE_Task_Manager* tm)
+ : ACE_Notification_Strategy (eh, ACE_Event_Handler::NULL_MASK),
+ task_manager_ (tm)
{
}
ACE_INLINE int
ACE_ES_ReactorEx_NS::open (void)
{
- return ACE_Task_Manager::instance ()->
- GetReactorTask (0)->get_reactor ().register_handler (eh_, event_.handle ());
+ return this->task_manager_->GetReactorTask (0)->
+ get_reactor ().register_handler (eh_, event_.handle ());
}
ACE_INLINE void
ACE_ES_ReactorEx_NS::shutdown (void)
{
- ACE_Task_Manager::instance ()->GetReactorTask (0)->
+ this->task_manager_->GetReactorTask (0)->
get_reactor ().remove_handler (eh_, ACE_Event_Handler::DONT_CALL);
}
@@ -242,9 +238,9 @@ ACE_ES_ReactorEx_NS::notify (ACE_Event_Handler *eh,
#else /* !defined (ACE_WIN32) */
// This class is only necessary on non-win32 platforms.
ACE_INLINE
-ACE_ES_Reactor_NS::ACE_ES_Reactor_NS (ACE_Event_Handler *eh)
- : ACE_Reactor_Notification_Strategy (&(ACE_Task_Manager::instance ()->
- GetReactorTask (0)->get_reactor ()),
+ACE_ES_Reactor_NS::ACE_ES_Reactor_NS (ACE_Event_Handler *eh,
+ ACE_Task_Manager *tm)
+ : ACE_Reactor_Notification_Strategy (&tm->GetReactorTask (0)->get_reactor (),
eh, ACE_Event_Handler::READ_MASK)
{
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
index 54de0e53077..844f8a002c1 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
@@ -230,7 +230,7 @@ class TAO_ORBSVCS_Export ACE_ES_Priority_Timer : public ACE_Event_Handler
// ReactorEx dispatching the timers for its given priority.
{
public:
- ACE_ES_Priority_Timer (void);
+ ACE_ES_Priority_Timer (ACE_Task_Manager* task_manager);
// Default construction.
int connected (RtecScheduler::handle_t rt_info);
@@ -256,6 +256,9 @@ private:
virtual int handle_timeout (const ACE_Time_Value &tv,
const void *act);
// Casts <act> to ACE_ES_Timer_ACT and calls execute.
+
+ ACE_Task_Manager* task_manager_;
+ // The pointer to the manager for the timer threads.
};
// ************************************************************
@@ -312,7 +315,7 @@ ACE_ES_Priority_Timer::schedule_timer (RtecScheduler::handle_t rt_info,
{
// Add the timer to the task's dependency list.
RtecScheduler::handle_t timer_rtinfo =
- ACE_Task_Manager::instance()->GetReactorTask (preemption_priority)->rt_info ();
+ this->task_manager_->GetReactorTask (preemption_priority)->rt_info ();
TAO_TRY
{
@@ -334,8 +337,7 @@ ACE_ES_Priority_Timer::schedule_timer (RtecScheduler::handle_t rt_info,
ACE_Time_Value tv_interval;
ORBSVCS_Time::TimeT_to_Time_Value (tv_interval, interval);
- return ACE_Task_Manager::instance()->
- GetReactorTask (preemption_priority)->
+ return this->task_manager_->GetReactorTask (preemption_priority)->
get_reactor ().schedule_timer (this,
(void *) act,
tv_delta, tv_interval);
@@ -347,7 +349,7 @@ ACE_ES_Priority_Timer::cancel_timer (RtecScheduler::OS_Priority preemption_prior
{
const void *vp;
- int result = ACE_Task_Manager::instance()->
+ int result = this->task_manager_->
GetReactorTask (preemption_priority)->
get_reactor ().cancel_timer (id, &vp);
@@ -605,7 +607,8 @@ ACE_Push_Consumer_Proxy::shutdown (void)
// ************************************************************
-ACE_EventChannel::ACE_EventChannel (u_long type)
+ACE_EventChannel::ACE_EventChannel (CORBA::Boolean activate_threads,
+ u_long type)
: rtu_manager_ (0),
type_ (type),
state_ (INITIAL_STATE),
@@ -614,19 +617,24 @@ ACE_EventChannel::ACE_EventChannel (u_long type)
consumer_module_ = new ACE_ES_Consumer_Module (this);
// RtecEventChannelAdmin::ConsumerAdmin_duplicate(consumer_module_);
- ACE_NEW(dispatching_module_,
- ACE_ES_Priority_Dispatching(this, THREADS_PER_DISPATCH_QUEUE));
+ ACE_NEW (this->task_manager_, ACE_Task_Manager);
+
+ ACE_NEW (this->dispatching_module_,
+ ACE_ES_Priority_Dispatching(this, THREADS_PER_DISPATCH_QUEUE));
correlation_module_ = new ACE_ES_Correlation_Module (this);
subscription_module_ = new ACE_ES_Subscription_Module (this);
supplier_module_ = new ACE_ES_Supplier_Module (this);
- timer_ = new ACE_ES_Priority_Timer;
+ ACE_NEW (this->timer_, ACE_ES_Priority_Timer (this->task_manager_));
consumer_module_->open (dispatching_module_);
dispatching_module_->open (consumer_module_, correlation_module_);
correlation_module_->open (dispatching_module_, subscription_module_);
subscription_module_->open (correlation_module_, supplier_module_);
supplier_module_->open (subscription_module_);
+
+ if (activate_threads)
+ this->activate ();
}
ACE_EventChannel::~ACE_EventChannel (void)
@@ -643,7 +651,8 @@ ACE_EventChannel::~ACE_EventChannel (void)
ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_EventChannel::~ACE_EventChannel"));
}
TAO_ENDTRY;
- // @@ TODO: Shouldn't we use _release() instead?
+ // @@ TODO: Some of this objects are servants, IMHO we should
+ // deactivate them (there is no implicit deactivation in the POA).
delete rtu_manager_;
delete consumer_module_;
delete dispatching_module_;
@@ -651,12 +660,14 @@ ACE_EventChannel::~ACE_EventChannel (void)
delete subscription_module_;
delete supplier_module_;
delete timer_;
+
+ delete this->task_manager_;
}
void
-ACE_EventChannel::destroy (CORBA::Environment &_env)
+ACE_EventChannel::destroy (CORBA::Environment &)
{
- ACE_UNUSED_ARG (_env);
+ TAO_ORB_Core_instance ()->orb ()->shutdown ();
ACE_ES_GUARD ace_mon (lock_);
if (ace_mon.locked () == 0)
@@ -697,13 +708,21 @@ ACE_EventChannel::destroy (CORBA::Environment &_env)
}
void
+ACE_EventChannel::activate (void)
+{
+ this->dispatching_module_->activate ();
+ this->task_manager_->activate ();
+}
+
+void
ACE_EventChannel::shutdown (void)
{
// @@ TODO: Find a portable way to shutdown the ORB, on Orbix we have
// to call deactive_impl () on a CORBA::POA is that the portable
// way?
// With TAO we need access to the ORB (to call shutdown() on it).
- TAO_ORB_Core_instance ()->orb ()->shutdown ();
+ this->task_manager_->shutdown ();
+ this->dispatching_module_->shutdown ();
}
void
@@ -3156,7 +3175,8 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy,
// ************************************************************
-ACE_ES_Priority_Timer::ACE_ES_Priority_Timer (void)
+ACE_ES_Priority_Timer::ACE_ES_Priority_Timer (ACE_Task_Manager* tm)
+ : task_manager_ (tm)
{
}
@@ -3189,7 +3209,7 @@ ACE_ES_Priority_Timer::connected (RtecScheduler::handle_t rt_info)
TAO_ENDTRY;
// Just make sure the ORB allocates resources for this priority.
- if (ACE_Task_Manager::instance()->GetReactorTask (preemption_priority) == 0)
+ if (this->task_manager_->GetReactorTask (preemption_priority) == 0)
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
"ACE_ES_Priority_Timer::connected"), -1);
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
index 5978412552a..942e27319f9 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h
@@ -177,9 +177,12 @@ public:
CONSUMER = 1, SUPPLIER = 2,
SHUTDOWN = CONSUMER | SUPPLIER };
- ACE_EventChannel (u_long type = ACE_DEFAULT_EVENT_CHANNEL_TYPE);
+ ACE_EventChannel (CORBA::Boolean activate_threads = CORBA::B_TRUE,
+ u_long type = ACE_DEFAULT_EVENT_CHANNEL_TYPE);
// Construction of the given <type>. Check the **_CHANNEL
// enumerations defined below.
+ // By default we activate the threads on construction, but it is
+ // possible to create the EC first and activate the threads later.
virtual ~ACE_EventChannel (void);
// Calls destroy.
@@ -222,6 +225,9 @@ public:
void report_disconnect (u_long);
// Consumer or supplier disconnected.
+ void activate (void);
+ // Activate the internal threads of the EC
+
void shutdown (void);
// Do not call this. The last module has shut down.
@@ -241,6 +247,8 @@ public:
// inform any gateways it has.
// TODO: currently we only support consumer gateways.
+ ACE_Task_Manager* task_manager (void) const;
+
private:
ACE_RTU_Manager *rtu_manager_;
// The RTU manager dude!
@@ -262,6 +270,11 @@ private:
Gateway_Set gwys_;
// Keep the set of Gateways, i.e. connections to peer EC.
+
+ ACE_Task_Manager* task_manager_;
+ // @@ TODO: change that class and object name.
+ // This object handles the threads related to timers, is a bad name,
+ // but this is not the opportunity to change it.
};
// ************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
index 4474509a8c4..f9689302605 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i
@@ -112,6 +112,12 @@ ACE_EventChannel::timer (void)
return timer_;
}
+ACE_INLINE ACE_Task_Manager*
+ACE_EventChannel::task_manager (void) const
+{
+ return this->task_manager_;
+}
+
// ************************************************************
// Makes a temporary Event_var and appends it to the <dest>.
@@ -383,12 +389,20 @@ ACE_RTU_Manager::should_preempt (void)
return 0;
else
{
+#if 0
// Expire any timers. Am I evil for putting this here?
ACE_Time_Value tv;
- if (ACE_Task_Manager::instance ()->
- GetReactorTask (0)->get_reactor ().handle_events (&tv) == -1)
+ if (this->task_manager_->GetReactorTask (0)->
+ get_reactor ().handle_events (&tv) == -1)
ACE_ERROR ((LM_ERROR, "%p.\n",
"ACE_RTU_Manager::should_preempt"));
+#else
+ // This routine was dead-code, but I'll leave it here until I
+ // find out what it is supposed to do.
+ ACE_ERROR ((LM_WARNING,
+ "EC (%t) RTU_Manager::should_preempt - obsolete\n"));
+
+#endif
int should_preempt = should_preempt_;
should_preempt_ = 0;
diff --git a/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp
index 57ea45cfec6..25a39b016c0 100644
--- a/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp
@@ -303,10 +303,6 @@ ACE_RT_Task::synch_threads (size_t threads)
threads - this->thr_count (),
thread_priority));
- // This is here so that the constructor does not call it. The
- // ORB has an instance of one of these.
- this->thr_mgr (ACE_Task_Manager::instance ()->ThrMgr ());
-
// Add the difference.
// First try real-time scheduling with specified priority.
long flags = THR_BOUND | THR_SCHED_FIFO;
diff --git a/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp b/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp
index b503ce66b11..d30839cf859 100644
--- a/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp
@@ -117,7 +117,6 @@ int ACE_ES_Reactor_Task::svc_one()
void ACE_ES_Reactor_Task::threads_closed()
{
- delete this;
}
void ACE_ES_Reactor_Task::shutdown_task()
diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp
index fdbf2202eb1..a1f5b74f0c1 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp
@@ -7,39 +7,57 @@
#include "Task_Manager.i"
#endif /* __ACE_INLINE__ */
-ACE_Task_Manager::ACE_Task_Manager()
+ACE_Task_Manager::ACE_Task_Manager (void)
{
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
- {
- reactorTasks[x] = 0;
- }
+ for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i)
+ this->reactorTasks[i] = 0;
}
-void ACE_Task_Manager::initialize()
+void ACE_Task_Manager::activate (void)
{
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
+ for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i)
{
+ if (this->reactorTasks[i] != 0)
+ continue;
+
// Convert ACE_Scheduler_Rate (it's really a period, not a rate!)
// to a form we can easily work with.
ACE_Time_Value period_tv;
- ORBSVCS_Time::TimeT_to_Time_Value (period_tv, ACE_Scheduler_Rates[x]);
+ ORBSVCS_Time::TimeT_to_Time_Value (period_tv, ACE_Scheduler_Rates[i]);
RtecScheduler::Period period = period_tv.sec () * 10000000 +
period_tv.usec () * 10;
- reactorTasks[x] = new ReactorTask;
+ ACE_NEW (this->reactorTasks[i], ReactorTask);
+
+ this->reactorTasks[i]->thr_mgr (this->ThrMgr ());
- if (reactorTasks[x] == 0 ||
- reactorTasks[x]->open_reactor (period) == -1)
+ if (this->reactorTasks[i]->open_reactor (period) == -1)
{
- ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ORB::initialize_reactors"));
- return;
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "EC (%t) Task_Manager - open reactor"));
}
}
}
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>;
-#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-#pragma instantiate ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+void
+ACE_Task_Manager::shutdown (void)
+{
+ for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i)
+ {
+ if (this->reactorTasks[i] != 0)
+ this->reactorTasks[i]->shutdown_task ();
+ }
+
+ if (this->ThrMgr ()->wait () == -1)
+ ACE_ERROR ((LM_DEBUG, "%p\n", "EC (%t) Task_Manager wait"));
+
+ for (int j = 0; j < ACE_Scheduler_MAX_PRIORITIES; ++j)
+ {
+ if (this->reactorTasks[j] != 0)
+ {
+ delete this->reactorTasks[j];
+ this->reactorTasks[j] = 0;
+ }
+ }
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h
index 9e097a957e8..4332be69071 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h
+++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h
@@ -14,22 +14,31 @@
class ACE_ES_Reactor_Task;
class TAO_ORBSVCS_Export ACE_Task_Manager
-// = TITLE
-// Singleton class for the pool of ACE_ReactorTask.
-//
-// = DESCRIPTION
-// The EventChannel uses a pool of ACE_ReactorTask to handle the
-// dispatching of Events. In real-time multi-threaded enviroments
-// this maps to a different thread per priority.
-// This class offers a centralized access point to those tasks and
-// some related services.
-//
{
+ //
+ // = TITLE
+ // Manager for the pool of ACE_ReactorTask.
+ //
+ // = DESCRIPTION
+ // The EventChannel uses a pool of ACE_ReactorTask to handle the
+ // dispatching of timeouts. In real-time multi-threaded enviroments
+ // this maps to a different thread per priority.
+ // This class offers a centralized access point to those tasks and
+ // some related services.
+ //
public:
- typedef ACE_ES_Reactor_Task ReactorTask;
+ ACE_Task_Manager (void);
+ // Create the Task_Manager.
+
+ void activate (void);
+ // Activate the threads, it waits until the threads are up and
+ // running.
+
+ void shutdown (void);
+ // Deactivate the threads, it waits until all the threads have
+ // terminated.
- static ACE_Task_Manager* instance();
- // Returns the singleton.
+ typedef ACE_ES_Reactor_Task ReactorTask;
ReactorTask* GetReactorTask(RtecScheduler::OS_Priority priority);
// Obtain the ReactorTask for the given priority.
@@ -39,14 +48,11 @@ public:
// Returns a global ThreadManager for the Task pool.
private:
- friend class ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>;
- ACE_Task_Manager();
-
- void initialize();
-
-private:
ReactorTask *reactorTasks[ACE_Scheduler_MAX_PRIORITIES];
+ // The set of ReactorTasks
+
ACE_RT_Thread_Manager thr_mgr;
+ // The thread manager.
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i
index 35abcc0268c..f2f392018f8 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i
+++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i
@@ -7,7 +7,7 @@ ACE_Task_Manager::GetReactorTask(RtecScheduler::OS_Priority priority)
{
if (reactorTasks[priority] == 0)
{
- initialize();
+ this->activate ();
//ACE_ERROR_RETURN ((LM_ERROR,
//"%p no reactor task for priority %d.\n",
//"ACE_Task_Manager::GetReactor",
@@ -22,9 +22,4 @@ ACE_INLINE ACE_RT_Thread_Manager* ACE_Task_Manager::ThrMgr()
return &thr_mgr;
}
-ACE_INLINE ACE_Task_Manager* ACE_Task_Manager::instance()
-{
- return ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>::instance();
-}
-
diff --git a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
index 3bd63cb441a..78668087900 100644
--- a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
+++ b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl
@@ -32,17 +32,15 @@ module RtecEventChannelAdmin {
};
interface ProxyPushConsumer: RtecEventComm::PushConsumer {
- oneway void connect_push_supplier(
+ void connect_push_supplier(
in RtecEventComm::PushSupplier push_supplier,
- in SupplierQOS qos);
- // raises(AlreadyConnected);
+ in SupplierQOS qos) raises(AlreadyConnected);
};
interface ProxyPushSupplier: RtecEventComm::PushSupplier {
- oneway void connect_push_consumer(
+ void connect_push_consumer(
in RtecEventComm::PushConsumer push_consumer,
- in ConsumerQOS qos);
- // raises(AlreadyConnected, TypeError);
+ in ConsumerQOS qos) raises(AlreadyConnected, TypeError);
};
// TODO: Find out the exception specs for the following interface's
@@ -64,7 +62,7 @@ module RtecEventChannelAdmin {
ConsumerAdmin for_consumers();
SupplierAdmin for_suppliers();
- void destroy();
+ void destroy ();
};
};
diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
index 732a0cfb6ed..de3347b5669 100644
--- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
+++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
@@ -75,11 +75,11 @@ module RtecEventComm {
interface PushConsumer {
oneway void push (in EventSet data);
- oneway void disconnect_push_consumer();
+ void disconnect_push_consumer();
};
interface PushSupplier {
- oneway void disconnect_push_supplier();
+ void disconnect_push_supplier();
};
};
diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
index 5ab7de648ba..c4be6fd8a5b 100644
--- a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
+++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp
@@ -348,9 +348,11 @@ Test_ECG::run (int argc, char* argv[])
break;
}
- // Register Event_Service with Naming Service.
- ACE_EventChannel ec_impl;
+ // Create the EventService implementation, but don't start its
+ // internal threads.
+ ACE_EventChannel ec_impl (CORBA::B_FALSE);
+ // Register Event_Service with the Naming Service.
RtecEventChannelAdmin::EventChannel_var ec =
ec_impl._this (TAO_TRY_ENV);
TAO_CHECK_ENV;
@@ -483,7 +485,12 @@ Test_ECG::run (int argc, char* argv[])
ready_mon.release ();
this->ready_cnd_.broadcast ();
- ACE_DEBUG ((LM_DEBUG, "running EC test\n"));
+ ACE_DEBUG ((LM_DEBUG, "activate the EC\n"));
+
+ // Create the EC internal threads
+ ec_impl.activate ();
+
+ ACE_DEBUG ((LM_DEBUG, "running the test\n"));
if (orb->run () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
diff --git a/TAO/orbsvcs/tests/EC_Multiple/histo.pl b/TAO/orbsvcs/tests/EC_Multiple/histo.pl
index f6cc77ce0e4..dcd13f27320 100755
--- a/TAO/orbsvcs/tests/EC_Multiple/histo.pl
+++ b/TAO/orbsvcs/tests/EC_Multiple/histo.pl
@@ -1,7 +1,7 @@
#
# $Id$
#
-# Extract a histogram, minium, maximum and average from a file,
+# Extract a histogram, minimum, maximum and average from a file,
# filtering by a given RE.
#
diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
index 0aaf3e4b652..bebc59e85fe 100644
--- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
+++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp
@@ -255,9 +255,6 @@ Latency_Consumer::shutdown (void)
// Disconnect from the push supplier.
this->suppliers_->disconnect_push_supplier (TAO_TRY_ENV);
TAO_CHECK_ENV;
-
- ACE_DEBUG ((LM_DEBUG, "@@ we should shutdown here!!!\n"));
- TAO_CHECK_ENV;
}
TAO_CATCHANY
{
@@ -659,11 +656,6 @@ Latency_Supplier::shutdown (void)
if (master_)
{
- // @@ TODO: Do this portably (keeping the ORB_ptr returned from
- // ORB_init)
- channel_admin_->destroy (TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
TAO_ORB_Core_instance ()->orb ()->shutdown ();
}
}
@@ -939,6 +931,11 @@ main (int argc, char *argv [])
}
delete [] consumer;
+ // @@ TODO: Do this portably (keeping the ORB_ptr returned from
+ // ORB_init)
+ ec->destroy (TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+
ACE_TIMEPROBE_PRINT;
}
TAO_CATCHANY