diff options
Diffstat (limited to 'TAO/tao/RTCORBA/Thread_Pool.cpp')
-rw-r--r-- | TAO/tao/RTCORBA/Thread_Pool.cpp | 219 |
1 files changed, 67 insertions, 152 deletions
diff --git a/TAO/tao/RTCORBA/Thread_Pool.cpp b/TAO/tao/RTCORBA/Thread_Pool.cpp index f63d8809bb5..f274fb38a18 100644 --- a/TAO/tao/RTCORBA/Thread_Pool.cpp +++ b/TAO/tao/RTCORBA/Thread_Pool.cpp @@ -1,4 +1,4 @@ -#include "tao/RTCORBA/Thread_Pool.h" +#include "Thread_Pool.h" #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 @@ -7,7 +7,7 @@ ACE_RCSID (RTCORBA, "$Id$") #if ! defined (__ACE_INLINE__) -#include "tao/RTCORBA/Thread_Pool.inl" +#include "Thread_Pool.inl" #endif /* __ACE_INLINE__ */ #include "tao/Exception.h" @@ -22,8 +22,6 @@ ACE_RCSID (RTCORBA, #include "tao/Leader_Follower.h" #include "ace/Auto_Ptr.h" -TAO_BEGIN_VERSIONED_NAMESPACE_DECL - TAO_RT_New_Leader_Generator::TAO_RT_New_Leader_Generator ( TAO_Thread_Lane &lane) : lane_ (lane) @@ -56,10 +54,14 @@ TAO_Thread_Pool_Threads::svc (void) TAO_Thread_Pool_Threads::set_tss_resources (orb_core, this->lane_); + CORBA::ORB_ptr orb = + orb_core.orb (); + ACE_TRY_NEW_ENV { - // Do the work - this->run (orb_core); + // Run the ORB. + orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; } ACE_CATCHANY { @@ -75,18 +77,6 @@ TAO_Thread_Pool_Threads::svc (void) return 0; } -int -TAO_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core ACE_ENV_ARG_PARAMETER) -{ - CORBA::ORB_ptr orb = orb_core.orb (); - - // Run the ORB. - orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - return 0; -} - void TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core, TAO_Thread_Lane &thread_lane) @@ -99,67 +89,23 @@ TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core, tss.lane_ = &thread_lane; } -TAO_Dynamic_Thread_Pool_Threads::TAO_Dynamic_Thread_Pool_Threads (TAO_Thread_Lane &lane) - : TAO_Thread_Pool_Threads (lane) -{ -} - -int -TAO_Dynamic_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core ACE_ENV_ARG_PARAMETER) -{ - CORBA::ORB_ptr orb = orb_core.orb (); - - if (this->lane_.dynamic_thread_idle_timeout () == ACE_Time_Value::zero) - { - // No timeout specified, run the ORB until it shutdowns - orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - else - { - // A timeout is specified, run the ORB in an idle loop, if we - // don't handle any operations for the given timeout we just - // exit the loop and this thread ends itself. - ACE_Time_Value tv (this->lane_.dynamic_thread_idle_timeout ()); - while (!orb_core.has_shutdown () && orb->work_pending (tv)) - { - orb->perform_work (); - tv = this->lane_.dynamic_thread_idle_timeout (); - } - - if (TAO_debug_level > 7) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n") - ACE_TEXT ("Current number of dynamic threads left = %d; ") - ACE_TEXT ("RTCorba worker thread is ending!\n"), - this->lane_.pool ().id (), - this->lane_.id (), - this->thr_count () - 1)); - } - - return 0; -} - TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &pool, CORBA::ULong id, CORBA::Short lane_priority, CORBA::ULong static_threads, - CORBA::ULong dynamic_threads, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong dynamic_threads ACE_ENV_ARG_DECL_NOT_USED) : pool_ (pool), id_ (id), lane_priority_ (lane_priority), - shutdown_ (false), - static_threads_number_ (static_threads), - dynamic_threads_number_ (dynamic_threads), - static_threads_ (*this), - dynamic_threads_ (*this), + static_threads_ (static_threads), + dynamic_threads_ (dynamic_threads), + current_threads_ (0), + threads_ (*this), new_thread_generator_ (*this), resources_ (pool.manager ().orb_core (), &new_thread_generator_), - native_priority_ (TAO_INVALID_PRIORITY), - dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout) + native_priority_ (TAO_INVALID_PRIORITY) { } @@ -167,8 +113,14 @@ bool TAO_Thread_Lane::new_dynamic_thread (void) { // Note that we are checking this condition below without the lock - // held. - if (this->dynamic_threads_.thr_count () >= this->dynamic_threads_number_) + // held. The value of <static_threads> and <dynamic_threads> does + // not change, but <current_threads> increases when new dynamic + // threads are created. Even if we catch <current_threads> in an + // inconsistent state, we will double check later with the lock + // held. Therefore, this check should not cause any big problems. + if (this->current_threads_ >= + this->static_threads_ + + this->dynamic_threads_) return false; ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, @@ -179,25 +131,25 @@ TAO_Thread_Lane::new_dynamic_thread (void) TAO_Thread_Pool_Manager &manager = this->pool_.manager (); - if (!manager.orb_core ().has_shutdown () && !this->shutdown_&& - this->dynamic_threads_.thr_count () < this->dynamic_threads_number_) + if (this->current_threads_ < + (this->static_threads_ + + this->dynamic_threads_) && + !manager.orb_core ().has_shutdown ()) { if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n") - ACE_TEXT ("Current number of dynamic threads = %d; ") - ACE_TEXT ("static threads = %d; max dynamic threads = %d\n") + ACE_TEXT ("Current number of threads = %d; ") + ACE_TEXT ("static threads = %d; dynamic threads = %d\n") ACE_TEXT ("No leaders available; creating new leader!\n"), this->pool_.id (), this->id_, - this->dynamic_threads_.thr_count (), - this->static_threads_number_, - this->dynamic_threads_number_)); + this->current_threads_, + this->static_threads_, + this->dynamic_threads_)); int result = - this->create_threads_i (this->dynamic_threads_, - 1, - THR_BOUND | THR_DETACHED); + this->create_dynamic_threads_i (1); if (result != 0) ACE_ERROR_RETURN ((LM_ERROR, @@ -218,16 +170,16 @@ TAO_Thread_Lane::shutting_down (void) mon, this->lock_); - // We are shutting down, this way we are not creating any more new dynamic - // threads - this->shutdown_ = true; + // Just set the number of dynamic threads to 0, this means we just can't + // create any new one + this->dynamic_threads_ = 0; } void TAO_Thread_Lane::validate_and_map_priority (ACE_ENV_SINGLE_ARG_DECL) { - // Make sure that static_threads_number_ is not zero. - if (this->static_threads_number_ == 0) + // Make sure that <static_threads_> is not zero. + if (this->static_threads_ == 0) ACE_THROW (CORBA::BAD_PARAM ()); // Check that the priority is in bounds. @@ -353,8 +305,7 @@ TAO_Thread_Lane::shutdown_reactor (void) void TAO_Thread_Lane::wait (void) { - this->static_threads_.wait (); - this->dynamic_threads_.wait (); + this->threads_.wait (); } int @@ -363,31 +314,11 @@ TAO_Thread_Lane::is_collocated (const TAO_MProfile &mprofile) return this->resources_.is_collocated (mprofile); } -CORBA::ULong -TAO_Thread_Lane::current_threads (void) const -{ - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, - mon, - this->lock_, - 0); - - return (this->static_threads_.thr_count () + - this->dynamic_threads_.thr_count ()); -} - - int TAO_Thread_Lane::create_static_threads (void) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, - mon, - this->lock_, - 0); - // Create static threads. - return this->create_threads_i (this->static_threads_, - this->static_threads_number_, - THR_NEW_LWP | THR_JOINABLE); + return this->create_dynamic_threads (this->static_threads_); } int @@ -398,20 +329,17 @@ TAO_Thread_Lane::create_dynamic_threads (CORBA::ULong number_of_threads) this->lock_, 0); - return this->create_threads_i (this->dynamic_threads_, - number_of_threads, - THR_BOUND | THR_DETACHED); + return this->create_dynamic_threads_i (number_of_threads); } int -TAO_Thread_Lane::create_threads_i (TAO_Thread_Pool_Threads &thread_pool, - CORBA::ULong number_of_threads, - long thread_flags) +TAO_Thread_Lane::create_dynamic_threads_i (CORBA::ULong number_of_threads) { // Overwritten parameters. int force_active = 1; // Default parameters. + long default_flags = THR_NEW_LWP | THR_JOINABLE; int default_grp_id = -1; ACE_Task_Base *default_task = 0; ACE_hthread_t *default_thread_handles = 0; @@ -437,24 +365,27 @@ TAO_Thread_Lane::create_threads_i (TAO_Thread_Pool_Threads &thread_pool, this->pool ().manager ().orb_core (); long flags = - thread_flags | + default_flags | orb_core.orb_params ()->thread_creation_flags (); // Activate the threads. int result = - thread_pool.activate (flags, - number_of_threads, - force_active, - this->native_priority_, - default_grp_id, - default_task, - default_thread_handles, - default_stack, - stack_size_array); + this->threads_.activate (flags, + number_of_threads, + force_active, + this->native_priority_, + default_grp_id, + default_task, + default_thread_handles, + default_stack, + stack_size_array); if (result != 0) return result; + this->current_threads_ += + number_of_threads; + return result; } @@ -466,8 +397,7 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, CORBA::Short default_priority, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) : manager_ (manager), id_ (id), @@ -476,7 +406,6 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, allow_request_buffering_ (allow_request_buffering), max_buffered_requests_ (max_buffered_requests), max_request_buffer_size_ (max_request_buffer_size), - dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout), lanes_ (0), number_of_lanes_ (1), with_lanes_ (false) @@ -492,8 +421,7 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, 0, default_priority, static_threads, - dynamic_threads, - dynamic_thread_idle_timeout + dynamic_threads ACE_ENV_ARG_PARAMETER); } @@ -504,8 +432,7 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, CORBA::Boolean allow_borrowing, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) : manager_ (manager), id_ (id), @@ -514,7 +441,6 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, allow_request_buffering_ (allow_request_buffering), max_buffered_requests_ (max_buffered_requests), max_request_buffer_size_ (max_request_buffer_size), - dynamic_thread_idle_timeout_ (dynamic_thread_idle_timeout), lanes_ (0), number_of_lanes_ (lanes.length ()), with_lanes_ (true) @@ -534,8 +460,7 @@ TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager, i, lanes[i].lane_priority, lanes[i].static_threads, - lanes[i].dynamic_threads, - dynamic_thread_idle_timeout + lanes[i].dynamic_threads ACE_ENV_ARG_PARAMETER); } @@ -724,8 +649,7 @@ TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize, RTCORBA::Priority default_priority, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -738,8 +662,7 @@ TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize, default_priority, allow_request_buffering, max_buffered_requests, - max_request_buffer_size, - dynamic_thread_idle_timeout + max_request_buffer_size ACE_ENV_ARG_PARAMETER); } @@ -749,8 +672,7 @@ TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize, CORBA::Boolean allow_borrowing, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -762,8 +684,7 @@ TAO_Thread_Pool_Manager::create_threadpool_with_lanes (CORBA::ULong stacksize, allow_borrowing, allow_request_buffering, max_buffered_requests, - max_request_buffer_size, - dynamic_thread_idle_timeout + max_request_buffer_size ACE_ENV_ARG_PARAMETER); } @@ -817,8 +738,7 @@ TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize, RTCORBA::Priority default_priority, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -834,8 +754,7 @@ TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize, default_priority, allow_request_buffering, max_buffered_requests, - max_request_buffer_size, - dynamic_thread_idle_timeout + max_request_buffer_size ACE_ENV_ARG_PARAMETER), CORBA::NO_MEMORY ()); ACE_CHECK_RETURN (0); @@ -850,8 +769,7 @@ TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize, CORBA::Boolean allow_borrowing, CORBA::Boolean allow_request_buffering, CORBA::ULong max_buffered_requests, - CORBA::ULong max_request_buffer_size, - ACE_Time_Value const &dynamic_thread_idle_timeout + CORBA::ULong max_request_buffer_size ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { @@ -866,8 +784,7 @@ TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize, allow_borrowing, allow_request_buffering, max_buffered_requests, - max_request_buffer_size, - dynamic_thread_idle_timeout + max_request_buffer_size ACE_ENV_ARG_PARAMETER), CORBA::NO_MEMORY ()); ACE_CHECK_RETURN (0); @@ -950,6 +867,4 @@ TAO_Thread_Pool_Manager::orb_core (void) const return this->orb_core_; } -TAO_END_VERSIONED_NAMESPACE_DECL - #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */ |