diff options
-rw-r--r-- | TAO/ChangeLog-98c | 44 | ||||
-rw-r--r-- | TAO/orbsvcs/Event_Service/Event_Service.cpp | 70 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp | 46 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h | 19 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i | 24 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | 50 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Event_Channel.h | 15 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Event_Channel.i | 18 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp | 4 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp | 1 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp | 54 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Task_Manager.h | 44 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Task_Manager.i | 7 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl | 12 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/RtecEventComm.idl | 4 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp | 13 | ||||
-rwxr-xr-x | TAO/orbsvcs/tests/EC_Multiple/histo.pl | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp | 13 |
18 files changed, 325 insertions, 115 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c index c1db9268677..aab12d4f4a9 100644 --- a/TAO/ChangeLog-98c +++ b/TAO/ChangeLog-98c @@ -1,3 +1,47 @@ +Tue Jun 23 12:18:39 1998 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/orbsvcs/Event/Task_Manager.h: + * orbsvcs/orbsvcs/Event/Task_Manager.i: + * orbsvcs/orbsvcs/Event/Task_Manager.cpp: + * orbsvcs/orbsvcs/Event/Event_Channel.h: + * orbsvcs/orbsvcs/Event/Event_Channel.i: + * orbsvcs/orbsvcs/Event/Event_Channel.cpp: + * orbsvcs/orbsvcs/Event/Dispatching_Modules.h: + * orbsvcs/orbsvcs/Event/Dispatching_Modules.i: + * orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp: + * orbsvcs/orbsvcs/Event/RT_Task.cpp: + * orbsvcs/orbsvcs/Event/ReactorTask.cpp: + The Event Channel can be shutdown cleanly using the destroy() + method. The problem was that two components (the Dispatching + Module and the TaskManager or the handler for Timer threads) + were not waiting for their threads to shutdown. + The modules keep their own Thread_Manager to wait for shutdown. + Startup was also changed: the event channel constructor + receives a new argument to control the creation of the internal + threads; if the argument is FALSE the user must call the + activate() method to start the threads. + + * orbsvcs/orbsvcs/RtecEventComm.idl: + * orbsvcs/orbsvcs/RtecEventChannelAdmin.idl: + Many operations were oneways because early releases of TAO did + not support nested upcalls. + + * orbsvcs/tests/EC_Multiple/EC_Multiple.cpp: + use the activate method to start the threads. + + * orbsvcs/Event_Service/Event_Service.cpp: + Added support for a collocated Scheduling Service; this is the + common use case that we wish to implement, improves performance + and works around some nested upcall problems in the ORB. + The user can select the old behavior using the <-s global> + flag. + + * orbsvcs/tests/Event_Latency/Event_Latency.cpp: + Only shutdown the EC once our event loop exits + + * orbsvcs/tests/EC_Multiple/histo.pl: + Fixed typo in a comment. + Tue Jun 23 11:59:12 1998 David L. Levine <levine@cs.wustl.edu> * tests/Cubit/TAO/IDL_Cubit/Makefile,tests/Thruput/TAO/Makefile, diff --git a/TAO/orbsvcs/Event_Service/Event_Service.cpp b/TAO/orbsvcs/Event_Service/Event_Service.cpp index c6741ad7f76..5482176333f 100644 --- a/TAO/orbsvcs/Event_Service/Event_Service.cpp +++ b/TAO/orbsvcs/Event_Service/Event_Service.cpp @@ -1,19 +1,28 @@ // $Id$ #include "ace/Get_Opt.h" +#include "ace/Auto_Ptr.h" #include "tao/corba.h" #include "orbsvcs/CosNamingC.h" #include "orbsvcs/Scheduler_Factory.h" #include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Sched/Config_Scheduler.h" #include "orbsvcs/Event/Event_Channel.h" const char* service_name = "EventService"; +// The name we use to register with the Naming Service. + +int global_scheduler = 0; +// If 0 we instantiante a local Scheduling Service and register it +// with the Naming Service. +// Otherwise we just resolve the Scheduling Service using the Naming +// Service (i.e. we assume there is a global scheduling service running. int parse_args (int argc, char *argv []) { - ACE_Get_Opt get_opt (argc, argv, "n:"); + ACE_Get_Opt get_opt (argc, argv, "n:s:"); int opt; while ((opt = get_opt ()) != EOF) @@ -23,6 +32,31 @@ parse_args (int argc, char *argv []) case 'n': service_name = get_opt.optarg; break; + + case 's': + // It could be just a flag (i.e. no "global" or "local" + // argument, but this is consistent with the EC_Multiple + // test and also allows for a runtime scheduling service. + + if (ACE_OS::strcasecmp (get_opt.optarg, "global") == 0) + { + global_scheduler = 1; + } + else if (ACE_OS::strcasecmp (get_opt.optarg, "local") == 0) + { + global_scheduler = 0; + } + else + { + ACE_DEBUG ((LM_DEBUG, + "Unknown scheduling type <%s> " + "defaulting to local\n", + get_opt.optarg)); + global_scheduler = 0; + } + break; + + case '?': default: ACE_DEBUG ((LM_DEBUG, @@ -76,6 +110,32 @@ int main (int argc, char *argv[]) CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV); TAO_CHECK_ENV; + auto_ptr<POA_RtecScheduler::Scheduler> scheduler_impl; + RtecScheduler::Scheduler_var scheduler; + + if (global_scheduler == 0) + { + scheduler_impl = + auto_ptr<POA_RtecScheduler::Scheduler>(new ACE_Config_Scheduler); + if (scheduler_impl.get () == 0) + return 1; + scheduler = scheduler_impl->_this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + CORBA::String_var str = + orb->object_to_string (scheduler.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n", + str.in ())); + + // Register the servant with the Naming Context.... + CosNaming::Name schedule_name (1); + schedule_name.length (1); + schedule_name[0].id = CORBA::string_dup ("ScheduleService"); + naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + } + ACE_Scheduler_Factory::use_config (naming_context.in ()); // Register Event_Service with Naming Service. @@ -113,3 +173,11 @@ int main (int argc, char *argv[]) return 0; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler>; +template class auto_ptr<POA_RtecScheduler::Scheduler>; +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Auto_Basic_Ptr<POA_RtecScheduler::Scheduler> +#pragma instantiate auto_ptr<POA_RtecScheduler::Scheduler> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp index 51e224e8a3c..99c96d272ee 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.cpp @@ -91,10 +91,24 @@ ACE_ES_Dispatch_Request::operator delete (void *mem) // ************************************************************ +void +ACE_ES_Dispatching_Base::activate (void) +{ +} + +void +ACE_ES_Dispatching_Base::shutdown (void) +{ + ACE_DEBUG ((LM_DEBUG, + "EC (%t) ACE_ES_Dispatching_Base module shutting down.\n")); +} + +// ************************************************************ + ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *channel, int threads_per_queue) : ACE_ES_Dispatching_Base (channel), - notification_strategy_ (this), + notification_strategy_ (this, channel->task_manager ()), highest_priority_ (0), shutdown_ (0), threads_per_queue_ (threads_per_queue) @@ -110,8 +124,6 @@ ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *chan queues_[x] = 0; delete_me_queues_[x] = 0; } - - this->initialize_queues (); } ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void) @@ -125,7 +137,7 @@ ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void) void ACE_ES_Priority_Dispatching::initialize_queues (void) { - for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++) + for (int x = 0; x < ACE_Scheduler_MAX_PRIORITIES; x++) { // Convert ACE_Scheduler_Rate (it's really a period, not a rate!) // to a form we can easily work with. @@ -135,18 +147,20 @@ ACE_ES_Priority_Dispatching::initialize_queues (void) RtecScheduler::Period period = period_tv.sec () * 10000000 + period_tv.usec () * 10; - queues_[x] = new ACE_ES_Dispatch_Queue (this, ¬ification_strategy_); - if (queues_[x] == 0 || - queues_[x]->open_queue (period, - threads_per_queue_) == -1) + ACE_NEW (this->queues_[x], + ACE_ES_Dispatch_Queue (this, ¬ification_strategy_)); + this->queues_[x]->thr_mgr (&this->thr_mgr_); + + if ( this->queues_[x]->open_queue (period, + threads_per_queue_) == -1) { - ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Dispatching::initialize_queues")); + ACE_ERROR ((LM_ERROR, "%p.\n", + "ACE_ES_Priority_Dispatching::initialize_queues")); return; } - queue_count_[x] = 1; + this->queue_count_[x] = 1; } - highest_priority_ = ACE_Scheduler_MAX_PRIORITIES - 1; } @@ -361,6 +375,12 @@ ACE_ES_Priority_Dispatching::handle_input (ACE_HANDLE) return this->handle_signal (0, 0, 0); } +void +ACE_ES_Priority_Dispatching::activate (void) +{ + this->initialize_queues (); +} + // Shutdown each queue. When each queue exits, they will call back // this->dispatch_queue_closed which allows us to free up resources. // When the last queue has closed, we'll delete ourselves. @@ -387,6 +407,10 @@ ACE_ES_Priority_Dispatching::shutdown (void) ACE_DEBUG ((LM_DEBUG, "shutting down dispatch queue %d.\n", x)); queues_[x]->shutdown_task (); } + + if (this->thr_mgr_.wait () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", + "Priority_Dispatching::shutdown - waiting")); } // This gets called every time a Dispatch Queue closes down. We diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h index b61799d7ba1..9df28723559 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h @@ -86,6 +86,10 @@ public: virtual void dispatch_queue_closed (ACE_ES_Dispatch_Queue *q); // Called when all the threads of a <q> have exited. + virtual void activate (void); + // This is called by the Event Channel. It will create all the + // threads and only return once they are all up and running. + virtual void shutdown (void); // This is called by the Event Channel. This will attempt to shut // down all of its threads gracefully. Wish it luck. @@ -242,7 +246,8 @@ class TAO_ORBSVCS_Export ACE_ES_ReactorEx_NS : public ACE_Notification_Strategy // the ReactorEx::notify mechanism. { public: - ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh); + ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh, + ACE_Task_Manager *tm); // Stores away <eh> for when this->open is called. int open (void); @@ -264,6 +269,9 @@ public: private: ACE_Auto_Event event_; // Registered with the ReactorEx. + + ACE_Task_Manager *task_manager_; + // To gain access into the Reactor tasks. }; typedef ACE_ES_ReactorEx_NS ACE_ES_Notification_Strategy; @@ -279,7 +287,8 @@ class TAO_ORBSVCS_Export ACE_ES_Reactor_NS : public ACE_Reactor_Notification_Str // version is for non WIN32 platforms. { public: - ACE_ES_Reactor_NS (ACE_Event_Handler *eh); + ACE_ES_Reactor_NS (ACE_Event_Handler *eh, + ACE_Task_Manager *tm); // Calls ACE_Reactor_Notification_Strategy with the ORB's reactor // and signal mask. @@ -387,6 +396,9 @@ public: CORBA::Environment &); // Enqueues the request on the appropriate Dispatch Queue. + virtual void activate (void); + // Open all queues. + virtual void shutdown (void); // Closes all queues "asynchronously." When all queues are closed, // deletes them all and then deletes itself. @@ -431,6 +443,9 @@ protected: int threads_per_queue_; // The number of threads to spawn for each dispatch queue. + + ACE_RT_Thread_Manager thr_mgr_; + // The thread manager for the threads of this object. }; // ************************************************************ diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i index d819d5485c9..93b7b6b5c09 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i @@ -197,32 +197,28 @@ ACE_ES_Dispatching_Base::dispatch_event (ACE_ES_Dispatch_Request *request, return 0; } -ACE_INLINE void -ACE_ES_Dispatching_Base::shutdown (void) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) ACE_ES_Dispatching_Base module shutting down.\n")); -} - // ************************************************************ #if defined (ACE_WIN32) ACE_INLINE -ACE_ES_ReactorEx_NS::ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh) - : ACE_Notification_Strategy (eh, ACE_Event_Handler::NULL_MASK) +ACE_ES_ReactorEx_NS::ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh, + ACE_Task_Manager* tm) + : ACE_Notification_Strategy (eh, ACE_Event_Handler::NULL_MASK), + task_manager_ (tm) { } ACE_INLINE int ACE_ES_ReactorEx_NS::open (void) { - return ACE_Task_Manager::instance ()-> - GetReactorTask (0)->get_reactor ().register_handler (eh_, event_.handle ()); + return this->task_manager_->GetReactorTask (0)-> + get_reactor ().register_handler (eh_, event_.handle ()); } ACE_INLINE void ACE_ES_ReactorEx_NS::shutdown (void) { - ACE_Task_Manager::instance ()->GetReactorTask (0)-> + this->task_manager_->GetReactorTask (0)-> get_reactor ().remove_handler (eh_, ACE_Event_Handler::DONT_CALL); } @@ -242,9 +238,9 @@ ACE_ES_ReactorEx_NS::notify (ACE_Event_Handler *eh, #else /* !defined (ACE_WIN32) */ // This class is only necessary on non-win32 platforms. ACE_INLINE -ACE_ES_Reactor_NS::ACE_ES_Reactor_NS (ACE_Event_Handler *eh) - : ACE_Reactor_Notification_Strategy (&(ACE_Task_Manager::instance ()-> - GetReactorTask (0)->get_reactor ()), +ACE_ES_Reactor_NS::ACE_ES_Reactor_NS (ACE_Event_Handler *eh, + ACE_Task_Manager *tm) + : ACE_Reactor_Notification_Strategy (&tm->GetReactorTask (0)->get_reactor (), eh, ACE_Event_Handler::READ_MASK) { } diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index 54de0e53077..844f8a002c1 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -230,7 +230,7 @@ class TAO_ORBSVCS_Export ACE_ES_Priority_Timer : public ACE_Event_Handler // ReactorEx dispatching the timers for its given priority. { public: - ACE_ES_Priority_Timer (void); + ACE_ES_Priority_Timer (ACE_Task_Manager* task_manager); // Default construction. int connected (RtecScheduler::handle_t rt_info); @@ -256,6 +256,9 @@ private: virtual int handle_timeout (const ACE_Time_Value &tv, const void *act); // Casts <act> to ACE_ES_Timer_ACT and calls execute. + + ACE_Task_Manager* task_manager_; + // The pointer to the manager for the timer threads. }; // ************************************************************ @@ -312,7 +315,7 @@ ACE_ES_Priority_Timer::schedule_timer (RtecScheduler::handle_t rt_info, { // Add the timer to the task's dependency list. RtecScheduler::handle_t timer_rtinfo = - ACE_Task_Manager::instance()->GetReactorTask (preemption_priority)->rt_info (); + this->task_manager_->GetReactorTask (preemption_priority)->rt_info (); TAO_TRY { @@ -334,8 +337,7 @@ ACE_ES_Priority_Timer::schedule_timer (RtecScheduler::handle_t rt_info, ACE_Time_Value tv_interval; ORBSVCS_Time::TimeT_to_Time_Value (tv_interval, interval); - return ACE_Task_Manager::instance()-> - GetReactorTask (preemption_priority)-> + return this->task_manager_->GetReactorTask (preemption_priority)-> get_reactor ().schedule_timer (this, (void *) act, tv_delta, tv_interval); @@ -347,7 +349,7 @@ ACE_ES_Priority_Timer::cancel_timer (RtecScheduler::OS_Priority preemption_prior { const void *vp; - int result = ACE_Task_Manager::instance()-> + int result = this->task_manager_-> GetReactorTask (preemption_priority)-> get_reactor ().cancel_timer (id, &vp); @@ -605,7 +607,8 @@ ACE_Push_Consumer_Proxy::shutdown (void) // ************************************************************ -ACE_EventChannel::ACE_EventChannel (u_long type) +ACE_EventChannel::ACE_EventChannel (CORBA::Boolean activate_threads, + u_long type) : rtu_manager_ (0), type_ (type), state_ (INITIAL_STATE), @@ -614,19 +617,24 @@ ACE_EventChannel::ACE_EventChannel (u_long type) consumer_module_ = new ACE_ES_Consumer_Module (this); // RtecEventChannelAdmin::ConsumerAdmin_duplicate(consumer_module_); - ACE_NEW(dispatching_module_, - ACE_ES_Priority_Dispatching(this, THREADS_PER_DISPATCH_QUEUE)); + ACE_NEW (this->task_manager_, ACE_Task_Manager); + + ACE_NEW (this->dispatching_module_, + ACE_ES_Priority_Dispatching(this, THREADS_PER_DISPATCH_QUEUE)); correlation_module_ = new ACE_ES_Correlation_Module (this); subscription_module_ = new ACE_ES_Subscription_Module (this); supplier_module_ = new ACE_ES_Supplier_Module (this); - timer_ = new ACE_ES_Priority_Timer; + ACE_NEW (this->timer_, ACE_ES_Priority_Timer (this->task_manager_)); consumer_module_->open (dispatching_module_); dispatching_module_->open (consumer_module_, correlation_module_); correlation_module_->open (dispatching_module_, subscription_module_); subscription_module_->open (correlation_module_, supplier_module_); supplier_module_->open (subscription_module_); + + if (activate_threads) + this->activate (); } ACE_EventChannel::~ACE_EventChannel (void) @@ -643,7 +651,8 @@ ACE_EventChannel::~ACE_EventChannel (void) ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_EventChannel::~ACE_EventChannel")); } TAO_ENDTRY; - // @@ TODO: Shouldn't we use _release() instead? + // @@ TODO: Some of this objects are servants, IMHO we should + // deactivate them (there is no implicit deactivation in the POA). delete rtu_manager_; delete consumer_module_; delete dispatching_module_; @@ -651,12 +660,14 @@ ACE_EventChannel::~ACE_EventChannel (void) delete subscription_module_; delete supplier_module_; delete timer_; + + delete this->task_manager_; } void -ACE_EventChannel::destroy (CORBA::Environment &_env) +ACE_EventChannel::destroy (CORBA::Environment &) { - ACE_UNUSED_ARG (_env); + TAO_ORB_Core_instance ()->orb ()->shutdown (); ACE_ES_GUARD ace_mon (lock_); if (ace_mon.locked () == 0) @@ -697,13 +708,21 @@ ACE_EventChannel::destroy (CORBA::Environment &_env) } void +ACE_EventChannel::activate (void) +{ + this->dispatching_module_->activate (); + this->task_manager_->activate (); +} + +void ACE_EventChannel::shutdown (void) { // @@ TODO: Find a portable way to shutdown the ORB, on Orbix we have // to call deactive_impl () on a CORBA::POA is that the portable // way? // With TAO we need access to the ORB (to call shutdown() on it). - TAO_ORB_Core_instance ()->orb ()->shutdown (); + this->task_manager_->shutdown (); + this->dispatching_module_->shutdown (); } void @@ -3156,7 +3175,8 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy, // ************************************************************ -ACE_ES_Priority_Timer::ACE_ES_Priority_Timer (void) +ACE_ES_Priority_Timer::ACE_ES_Priority_Timer (ACE_Task_Manager* tm) + : task_manager_ (tm) { } @@ -3189,7 +3209,7 @@ ACE_ES_Priority_Timer::connected (RtecScheduler::handle_t rt_info) TAO_ENDTRY; // Just make sure the ORB allocates resources for this priority. - if (ACE_Task_Manager::instance()->GetReactorTask (preemption_priority) == 0) + if (this->task_manager_->GetReactorTask (preemption_priority) == 0) ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Timer::connected"), -1); diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h index 5978412552a..942e27319f9 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h @@ -177,9 +177,12 @@ public: CONSUMER = 1, SUPPLIER = 2, SHUTDOWN = CONSUMER | SUPPLIER }; - ACE_EventChannel (u_long type = ACE_DEFAULT_EVENT_CHANNEL_TYPE); + ACE_EventChannel (CORBA::Boolean activate_threads = CORBA::B_TRUE, + u_long type = ACE_DEFAULT_EVENT_CHANNEL_TYPE); // Construction of the given <type>. Check the **_CHANNEL // enumerations defined below. + // By default we activate the threads on construction, but it is + // possible to create the EC first and activate the threads later. virtual ~ACE_EventChannel (void); // Calls destroy. @@ -222,6 +225,9 @@ public: void report_disconnect (u_long); // Consumer or supplier disconnected. + void activate (void); + // Activate the internal threads of the EC + void shutdown (void); // Do not call this. The last module has shut down. @@ -241,6 +247,8 @@ public: // inform any gateways it has. // TODO: currently we only support consumer gateways. + ACE_Task_Manager* task_manager (void) const; + private: ACE_RTU_Manager *rtu_manager_; // The RTU manager dude! @@ -262,6 +270,11 @@ private: Gateway_Set gwys_; // Keep the set of Gateways, i.e. connections to peer EC. + + ACE_Task_Manager* task_manager_; + // @@ TODO: change that class and object name. + // This object handles the threads related to timers, is a bad name, + // but this is not the opportunity to change it. }; // ************************************************************ diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i index 4474509a8c4..f9689302605 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i @@ -112,6 +112,12 @@ ACE_EventChannel::timer (void) return timer_; } +ACE_INLINE ACE_Task_Manager* +ACE_EventChannel::task_manager (void) const +{ + return this->task_manager_; +} + // ************************************************************ // Makes a temporary Event_var and appends it to the <dest>. @@ -383,12 +389,20 @@ ACE_RTU_Manager::should_preempt (void) return 0; else { +#if 0 // Expire any timers. Am I evil for putting this here? ACE_Time_Value tv; - if (ACE_Task_Manager::instance ()-> - GetReactorTask (0)->get_reactor ().handle_events (&tv) == -1) + if (this->task_manager_->GetReactorTask (0)-> + get_reactor ().handle_events (&tv) == -1) ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_RTU_Manager::should_preempt")); +#else + // This routine was dead-code, but I'll leave it here until I + // find out what it is supposed to do. + ACE_ERROR ((LM_WARNING, + "EC (%t) RTU_Manager::should_preempt - obsolete\n")); + +#endif int should_preempt = should_preempt_; should_preempt_ = 0; diff --git a/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp index 57ea45cfec6..25a39b016c0 100644 --- a/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/RT_Task.cpp @@ -303,10 +303,6 @@ ACE_RT_Task::synch_threads (size_t threads) threads - this->thr_count (), thread_priority)); - // This is here so that the constructor does not call it. The - // ORB has an instance of one of these. - this->thr_mgr (ACE_Task_Manager::instance ()->ThrMgr ()); - // Add the difference. // First try real-time scheduling with specified priority. long flags = THR_BOUND | THR_SCHED_FIFO; diff --git a/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp b/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp index b503ce66b11..d30839cf859 100644 --- a/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/ReactorTask.cpp @@ -117,7 +117,6 @@ int ACE_ES_Reactor_Task::svc_one() void ACE_ES_Reactor_Task::threads_closed() { - delete this; } void ACE_ES_Reactor_Task::shutdown_task() diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp index fdbf2202eb1..a1f5b74f0c1 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.cpp @@ -7,39 +7,57 @@ #include "Task_Manager.i" #endif /* __ACE_INLINE__ */ -ACE_Task_Manager::ACE_Task_Manager() +ACE_Task_Manager::ACE_Task_Manager (void) { - for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++) - { - reactorTasks[x] = 0; - } + for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i) + this->reactorTasks[i] = 0; } -void ACE_Task_Manager::initialize() +void ACE_Task_Manager::activate (void) { - for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++) + for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i) { + if (this->reactorTasks[i] != 0) + continue; + // Convert ACE_Scheduler_Rate (it's really a period, not a rate!) // to a form we can easily work with. ACE_Time_Value period_tv; - ORBSVCS_Time::TimeT_to_Time_Value (period_tv, ACE_Scheduler_Rates[x]); + ORBSVCS_Time::TimeT_to_Time_Value (period_tv, ACE_Scheduler_Rates[i]); RtecScheduler::Period period = period_tv.sec () * 10000000 + period_tv.usec () * 10; - reactorTasks[x] = new ReactorTask; + ACE_NEW (this->reactorTasks[i], ReactorTask); + + this->reactorTasks[i]->thr_mgr (this->ThrMgr ()); - if (reactorTasks[x] == 0 || - reactorTasks[x]->open_reactor (period) == -1) + if (this->reactorTasks[i]->open_reactor (period) == -1) { - ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ORB::initialize_reactors")); - return; + ACE_ERROR ((LM_ERROR, "%p\n", + "EC (%t) Task_Manager - open reactor")); } } } -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>; -#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX> -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +void +ACE_Task_Manager::shutdown (void) +{ + for (int i = 0; i < ACE_Scheduler_MAX_PRIORITIES; ++i) + { + if (this->reactorTasks[i] != 0) + this->reactorTasks[i]->shutdown_task (); + } + + if (this->ThrMgr ()->wait () == -1) + ACE_ERROR ((LM_DEBUG, "%p\n", "EC (%t) Task_Manager wait")); + + for (int j = 0; j < ACE_Scheduler_MAX_PRIORITIES; ++j) + { + if (this->reactorTasks[j] != 0) + { + delete this->reactorTasks[j]; + this->reactorTasks[j] = 0; + } + } +} diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h index 9e097a957e8..4332be69071 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h +++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.h @@ -14,22 +14,31 @@ class ACE_ES_Reactor_Task; class TAO_ORBSVCS_Export ACE_Task_Manager -// = TITLE -// Singleton class for the pool of ACE_ReactorTask. -// -// = DESCRIPTION -// The EventChannel uses a pool of ACE_ReactorTask to handle the -// dispatching of Events. In real-time multi-threaded enviroments -// this maps to a different thread per priority. -// This class offers a centralized access point to those tasks and -// some related services. -// { + // + // = TITLE + // Manager for the pool of ACE_ReactorTask. + // + // = DESCRIPTION + // The EventChannel uses a pool of ACE_ReactorTask to handle the + // dispatching of timeouts. In real-time multi-threaded enviroments + // this maps to a different thread per priority. + // This class offers a centralized access point to those tasks and + // some related services. + // public: - typedef ACE_ES_Reactor_Task ReactorTask; + ACE_Task_Manager (void); + // Create the Task_Manager. + + void activate (void); + // Activate the threads, it waits until the threads are up and + // running. + + void shutdown (void); + // Deactivate the threads, it waits until all the threads have + // terminated. - static ACE_Task_Manager* instance(); - // Returns the singleton. + typedef ACE_ES_Reactor_Task ReactorTask; ReactorTask* GetReactorTask(RtecScheduler::OS_Priority priority); // Obtain the ReactorTask for the given priority. @@ -39,14 +48,11 @@ public: // Returns a global ThreadManager for the Task pool. private: - friend class ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>; - ACE_Task_Manager(); - - void initialize(); - -private: ReactorTask *reactorTasks[ACE_Scheduler_MAX_PRIORITIES]; + // The set of ReactorTasks + ACE_RT_Thread_Manager thr_mgr; + // The thread manager. }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i index 35abcc0268c..f2f392018f8 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i +++ b/TAO/orbsvcs/orbsvcs/Event/Task_Manager.i @@ -7,7 +7,7 @@ ACE_Task_Manager::GetReactorTask(RtecScheduler::OS_Priority priority) { if (reactorTasks[priority] == 0) { - initialize(); + this->activate (); //ACE_ERROR_RETURN ((LM_ERROR, //"%p no reactor task for priority %d.\n", //"ACE_Task_Manager::GetReactor", @@ -22,9 +22,4 @@ ACE_INLINE ACE_RT_Thread_Manager* ACE_Task_Manager::ThrMgr() return &thr_mgr; } -ACE_INLINE ACE_Task_Manager* ACE_Task_Manager::instance() -{ - return ACE_Singleton<ACE_Task_Manager,ACE_SYNCH_MUTEX>::instance(); -} - diff --git a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl index 3bd63cb441a..78668087900 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl @@ -32,17 +32,15 @@ module RtecEventChannelAdmin { }; interface ProxyPushConsumer: RtecEventComm::PushConsumer { - oneway void connect_push_supplier( + void connect_push_supplier( in RtecEventComm::PushSupplier push_supplier, - in SupplierQOS qos); - // raises(AlreadyConnected); + in SupplierQOS qos) raises(AlreadyConnected); }; interface ProxyPushSupplier: RtecEventComm::PushSupplier { - oneway void connect_push_consumer( + void connect_push_consumer( in RtecEventComm::PushConsumer push_consumer, - in ConsumerQOS qos); - // raises(AlreadyConnected, TypeError); + in ConsumerQOS qos) raises(AlreadyConnected, TypeError); }; // TODO: Find out the exception specs for the following interface's @@ -64,7 +62,7 @@ module RtecEventChannelAdmin { ConsumerAdmin for_consumers(); SupplierAdmin for_suppliers(); - void destroy(); + void destroy (); }; }; diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl index 732a0cfb6ed..de3347b5669 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl @@ -75,11 +75,11 @@ module RtecEventComm { interface PushConsumer { oneway void push (in EventSet data); - oneway void disconnect_push_consumer(); + void disconnect_push_consumer(); }; interface PushSupplier { - oneway void disconnect_push_supplier(); + void disconnect_push_supplier(); }; }; diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp index 5ab7de648ba..c4be6fd8a5b 100644 --- a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp +++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp @@ -348,9 +348,11 @@ Test_ECG::run (int argc, char* argv[]) break; } - // Register Event_Service with Naming Service. - ACE_EventChannel ec_impl; + // Create the EventService implementation, but don't start its + // internal threads. + ACE_EventChannel ec_impl (CORBA::B_FALSE); + // Register Event_Service with the Naming Service. RtecEventChannelAdmin::EventChannel_var ec = ec_impl._this (TAO_TRY_ENV); TAO_CHECK_ENV; @@ -483,7 +485,12 @@ Test_ECG::run (int argc, char* argv[]) ready_mon.release (); this->ready_cnd_.broadcast (); - ACE_DEBUG ((LM_DEBUG, "running EC test\n")); + ACE_DEBUG ((LM_DEBUG, "activate the EC\n")); + + // Create the EC internal threads + ec_impl.activate (); + + ACE_DEBUG ((LM_DEBUG, "running the test\n")); if (orb->run () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); diff --git a/TAO/orbsvcs/tests/EC_Multiple/histo.pl b/TAO/orbsvcs/tests/EC_Multiple/histo.pl index f6cc77ce0e4..dcd13f27320 100755 --- a/TAO/orbsvcs/tests/EC_Multiple/histo.pl +++ b/TAO/orbsvcs/tests/EC_Multiple/histo.pl @@ -1,7 +1,7 @@ # # $Id$ # -# Extract a histogram, minium, maximum and average from a file, +# Extract a histogram, minimum, maximum and average from a file, # filtering by a given RE. # diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp index 0aaf3e4b652..bebc59e85fe 100644 --- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp +++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp @@ -255,9 +255,6 @@ Latency_Consumer::shutdown (void) // Disconnect from the push supplier. this->suppliers_->disconnect_push_supplier (TAO_TRY_ENV); TAO_CHECK_ENV; - - ACE_DEBUG ((LM_DEBUG, "@@ we should shutdown here!!!\n")); - TAO_CHECK_ENV; } TAO_CATCHANY { @@ -659,11 +656,6 @@ Latency_Supplier::shutdown (void) if (master_) { - // @@ TODO: Do this portably (keeping the ORB_ptr returned from - // ORB_init) - channel_admin_->destroy (TAO_TRY_ENV); - TAO_CHECK_ENV; - TAO_ORB_Core_instance ()->orb ()->shutdown (); } } @@ -939,6 +931,11 @@ main (int argc, char *argv []) } delete [] consumer; + // @@ TODO: Do this portably (keeping the ORB_ptr returned from + // ORB_init) + ec->destroy (TAO_TRY_ENV); + TAO_CHECK_ENV; + ACE_TIMEPROBE_PRINT; } TAO_CATCHANY |