summaryrefslogtreecommitdiff
path: root/TAO/tao/RTCORBA/Thread_Pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/RTCORBA/Thread_Pool.cpp')
-rw-r--r--TAO/tao/RTCORBA/Thread_Pool.cpp505
1 files changed, 441 insertions, 64 deletions
diff --git a/TAO/tao/RTCORBA/Thread_Pool.cpp b/TAO/tao/RTCORBA/Thread_Pool.cpp
index be3f4bf7f60..aba3f630f2d 100644
--- a/TAO/tao/RTCORBA/Thread_Pool.cpp
+++ b/TAO/tao/RTCORBA/Thread_Pool.cpp
@@ -7,14 +7,71 @@ ACE_RCSID(tao, Thread_Pool, "$Id$")
#include "tao/Exception.h"
#include "ace/Auto_Ptr.h"
#include "tao/ORB_Core.h"
+#include "tao/Acceptor_Registry.h"
+#include "tao/Transport_Cache_Manager.h"
+#include "tao/debug.h"
+#include "tao/RTCORBA/Priority_Mapping_Manager.h"
+#include "tao/Leader_Follower.h"
#if !defined (__ACE_INLINE__)
# include "Thread_Pool.i"
#endif /* ! __ACE_INLINE__ */
-TAO_Thread_Pool_Threads::TAO_Thread_Pool_Threads (TAO_Thread_Lane &lane,
- ACE_Thread_Manager &tm)
- : ACE_Task_Base (&tm),
+TAO_RT_New_Leader_Generator::TAO_RT_New_Leader_Generator (TAO_Thread_Lane &lane)
+ : lane_ (lane)
+{
+}
+
+void
+TAO_RT_New_Leader_Generator::no_leaders_available (void)
+{
+ // Note that we are checking this condition below without the lock
+ // held. The value of <static_threads> and <dynamic_threads> does
+ // not change, but <current_threads> increases when new dynamic
+ // threads are created. Even if we catch <current_threads> in an
+ // inconsistent state, we will double check later with the lock
+ // held. Therefore, this check should not cause any big problems.
+ if (this->lane_.current_threads () ==
+ this->lane_.static_threads () +
+ this->lane_.dynamic_threads ())
+ return;
+
+ TAO_Thread_Pool_Manager &manager =
+ this->lane_.pool ().manager ();
+
+ ACE_GUARD (ACE_SYNCH_MUTEX,
+ mon,
+ manager.lock ());
+
+ if (this->lane_.current_threads () <
+ (this->lane_.static_threads () +
+ this->lane_.dynamic_threads ()) &&
+ !manager.orb_core ().has_shutdown ())
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO Process %P Pool %d Lane %d Thread %t\n")
+ ACE_TEXT ("Current number of threads = %d; static threads = %d; dynamic threads = %d\n")
+ ACE_TEXT ("No leaders available; creating new leader!\n"),
+ this->lane_.pool ().id (),
+ this->lane_.id (),
+ this->lane_.current_threads (),
+ this->lane_.static_threads (),
+ this->lane_.dynamic_threads ()));
+
+ int result =
+ this->lane_.create_dynamic_threads (1);
+
+ if (result != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Pool %d Lane %d Thread %t: cannot create dynamic thread\n",
+ this->lane_.pool ().id (),
+ this->lane_.id ()));
+ }
+}
+
+TAO_Thread_Pool_Threads::TAO_Thread_Pool_Threads (TAO_Thread_Lane &lane)
+ : ACE_Task_Base (lane.pool ().manager ().orb_core ().thr_mgr ()),
lane_ (lane)
{
}
@@ -28,43 +85,167 @@ TAO_Thread_Pool_Threads::lane (void) const
int
TAO_Thread_Pool_Threads::svc (void)
{
- CORBA::ORB_var orb =
- this->lane ().thread_pool ().orb ();
+ TAO_ORB_Core &orb_core =
+ this->lane ().pool ().manager ().orb_core ();
+
+ if (orb_core.has_shutdown ())
+ return 0;
+
+ // Set TSS resources for this thread.
+ TAO_Thread_Pool_Threads::set_tss_resources (orb_core,
+ this->lane_);
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
+ CORBA::ORB_ptr orb =
+ orb_core.orb ();
+
+ ACE_TRY_NEW_ENV
{
+ // Run the ORB.
orb->run (ACE_TRY_ENV);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
+ // No point propagating this exception. Print it out.
ACE_ERROR ((LM_ERROR,
"orb->run() raised exception for thread %t\n"));
+
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "");
}
ACE_ENDTRY;
return 0;
}
-TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &thread_pool,
+void
+TAO_Thread_Pool_Threads::set_tss_resources (TAO_ORB_Core &orb_core,
+ TAO_Thread_Lane &thread_lane)
+{
+ /// Get the ORB_Core's TSS resources.
+ TAO_ORB_Core_TSS_Resources &tss =
+ *orb_core.get_tss_resources ();
+
+ /// Set the lane attribute in TSS.
+ tss.lane_ = &thread_lane;
+}
+
+TAO_Thread_Lane::TAO_Thread_Lane (TAO_Thread_Pool &pool,
CORBA::ULong id,
CORBA::Short lane_priority,
CORBA::ULong static_threads,
- CORBA::ULong dynamic_threads)
- : thread_pool_ (thread_pool),
+ CORBA::ULong dynamic_threads,
+ CORBA::Environment &)
+ : pool_ (pool),
id_ (id),
lane_priority_ (lane_priority),
static_threads_ (static_threads),
dynamic_threads_ (dynamic_threads),
- threads_ (*this,
- thread_pool_.thread_manager_)
+ current_threads_ (0),
+ threads_ (*this),
+ new_thread_generator_ (*this),
+ resources_ (pool.manager ().orb_core (),
+ &new_thread_generator_),
+ native_priority_ (TAO_INVALID_PRIORITY)
+{
+}
+
+void
+TAO_Thread_Lane::validate_and_map_priority (CORBA::Environment &ACE_TRY_ENV)
+{
+ // Check that the priority is in bounds.
+ if (this->lane_priority_ < RTCORBA::minPriority ||
+ this->lane_priority_ > RTCORBA::maxPriority)
+ ACE_THROW (CORBA::BAD_PARAM ());
+
+ CORBA::ORB_ptr orb =
+ this->pool_.manager ().orb_core ().orb ();
+
+ // Get the priority mapping manager.
+ CORBA::Object_var obj =
+ orb->resolve_initial_references (TAO_OBJID_PRIORITYMAPPINGMANAGER,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ TAO_Priority_Mapping_Manager_var mapping_manager =
+ TAO_Priority_Mapping_Manager::_narrow (obj.in (),
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ RTCORBA::PriorityMapping *pm =
+ mapping_manager.in ()->mapping ();
+
+ // Map CORBA priority to native priority.
+ CORBA::Boolean result =
+ pm->to_native (this->lane_priority_,
+ this->native_priority_);
+
+ if (!result)
+ ACE_THROW (CORBA::DATA_CONVERSION ());
+
+ if (TAO_debug_level > 3)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - creating thread at ")
+ ACE_TEXT ("(corba:native) priority %d:%d\n"),
+ this->lane_priority_,
+ this->native_priority_));
+}
+
+void
+TAO_Thread_Lane::open (CORBA::Environment &ACE_TRY_ENV)
+{
+ // Validate and map priority.
+ this->validate_and_map_priority (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ // Open the acceptor registry.
+ int result = 0;
+ result =
+ this->resources_.open_acceptor_registry (1,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ if (result == -1)
+ ACE_THROW (CORBA::INTERNAL (
+ CORBA_SystemException::_tao_minor_code (
+ TAO_ACCEPTOR_REGISTRY_OPEN_LOCATION_CODE,
+ 0),
+ CORBA::COMPLETED_NO));
+}
+
+TAO_Thread_Lane::~TAO_Thread_Lane (void)
+{
+}
+
+void
+TAO_Thread_Lane::finalize (void)
+{
+ // Finalize resources.
+ this->resources_.finalize ();
+}
+
+void
+TAO_Thread_Lane::shutdown_reactor (void)
+{
+ this->resources_.shutdown_reactor ();
+}
+
+void
+TAO_Thread_Lane::wait (void)
+{
+ this->threads_.wait ();
+}
+
+int
+TAO_Thread_Lane::is_collocated (const TAO_MProfile &mprofile)
{
+ return this->resources_.is_collocated (mprofile);
}
int
TAO_Thread_Lane::create_static_threads (void)
{
+ // Create static threads.
return this->create_dynamic_threads (this->static_threads_);
}
@@ -91,22 +272,40 @@ TAO_Thread_Lane::create_dynamic_threads (CORBA::ULong number_of_threads)
for (index = 0;
index != number_of_threads;
++index)
- stack_size_array[index] = this->thread_pool_.stack_size_;
+ stack_size_array[index] =
+ this->pool ().stack_size_;
// Make sure the dynamically created stack size array is properly
// deleted.
ACE_Auto_Basic_Array_Ptr<size_t> auto_stack_size_array (stack_size_array);
+ TAO_ORB_Core &orb_core =
+ this->pool ().manager ().orb_core ();
+
+ long flags =
+ default_flags |
+ orb_core.orb_params ()->scope_policy () |
+ orb_core.orb_params ()->sched_policy ();
+
// Activate the threads.
- return this->threads_.activate (default_flags,
- number_of_threads,
- force_active,
- this->lane_priority_,
- default_grp_id,
- default_task,
- default_thread_handles,
- default_stack,
- stack_size_array);
+ int result =
+ this->threads_.activate (flags,
+ number_of_threads,
+ force_active,
+ this->native_priority_,
+ default_grp_id,
+ default_task,
+ default_thread_handles,
+ default_stack,
+ stack_size_array);
+
+ if (result != 0)
+ return result;
+
+ this->current_threads_ +=
+ number_of_threads;
+
+ return result;
}
CORBA::ULong
@@ -116,9 +315,9 @@ TAO_Thread_Lane::id (void) const
}
TAO_Thread_Pool &
-TAO_Thread_Lane::thread_pool (void) const
+TAO_Thread_Lane::pool (void) const
{
- return this->thread_pool_;
+ return this->pool_;
}
CORBA::Short
@@ -127,6 +326,12 @@ TAO_Thread_Lane::lane_priority (void) const
return this->lane_priority_;
}
+CORBA::Short
+TAO_Thread_Lane::native_priority (void) const
+{
+ return this->native_priority_;
+}
+
CORBA::ULong
TAO_Thread_Lane::static_threads (void) const
{
@@ -139,13 +344,32 @@ TAO_Thread_Lane::dynamic_threads (void) const
return this->dynamic_threads_;
}
+CORBA::ULong
+TAO_Thread_Lane::current_threads (void) const
+{
+ return this->current_threads_;
+}
+
+void
+TAO_Thread_Lane::current_threads (CORBA::ULong current_threads)
+{
+ this->current_threads_ = current_threads;
+}
+
TAO_Thread_Pool_Threads &
TAO_Thread_Lane::threads (void)
{
return this->threads_;
}
-TAO_Thread_Pool::TAO_Thread_Pool (CORBA::ULong id,
+TAO_Thread_Lane_Resources &
+TAO_Thread_Lane::resources (void)
+{
+ return this->resources_;
+}
+
+TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
+ CORBA::ULong id,
CORBA::ULong stack_size,
CORBA::ULong static_threads,
CORBA::ULong dynamic_threads,
@@ -153,8 +377,9 @@ TAO_Thread_Pool::TAO_Thread_Pool (CORBA::ULong id,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size,
- CORBA::ORB_ptr orb)
- : id_ (id),
+ CORBA::Environment &ACE_TRY_ENV)
+ : manager_ (manager),
+ id_ (id),
stack_size_ (stack_size),
allow_borrowing_ (0),
allow_request_buffering_ (allow_request_buffering),
@@ -162,26 +387,34 @@ TAO_Thread_Pool::TAO_Thread_Pool (CORBA::ULong id,
max_request_buffer_size_ (max_request_buffer_size),
lanes_ (0),
number_of_lanes_ (1),
- thread_manager_ (*orb->orb_core ()->thr_mgr ()),
- orb_ (CORBA::ORB::_duplicate (orb))
+ with_lanes_ (0)
{
+ // No support for buffering.
+ if (allow_request_buffering)
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+
+ // Create one lane.
this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
- this->lanes_[0] = new TAO_Thread_Lane (*this,
- 0,
- default_priority,
- static_threads,
- dynamic_threads);
+ this->lanes_[0] =
+ new TAO_Thread_Lane (*this,
+ 0,
+ default_priority,
+ static_threads,
+ dynamic_threads,
+ ACE_TRY_ENV);
}
-TAO_Thread_Pool::TAO_Thread_Pool (CORBA::ULong id,
+TAO_Thread_Pool::TAO_Thread_Pool (TAO_Thread_Pool_Manager &manager,
+ CORBA::ULong id,
CORBA::ULong stack_size,
const RTCORBA::ThreadpoolLanes &lanes,
CORBA::Boolean allow_borrowing,
CORBA::Boolean allow_request_buffering,
CORBA::ULong max_buffered_requests,
CORBA::ULong max_request_buffer_size,
- CORBA::ORB_ptr orb)
- : id_ (id),
+ CORBA::Environment &ACE_TRY_ENV)
+ : manager_ (manager),
+ id_ (id),
stack_size_ (stack_size),
allow_borrowing_ (allow_borrowing),
allow_request_buffering_ (allow_request_buffering),
@@ -189,22 +422,43 @@ TAO_Thread_Pool::TAO_Thread_Pool (CORBA::ULong id,
max_request_buffer_size_ (max_request_buffer_size),
lanes_ (0),
number_of_lanes_ (lanes.length ()),
- thread_manager_ (*orb->orb_core ()->thr_mgr ()),
- orb_ (CORBA::ORB::_duplicate (orb))
+ with_lanes_ (1)
{
+ // No support for buffering or borrowing.
+ if (allow_borrowing ||
+ allow_request_buffering)
+ ACE_THROW (CORBA::NO_IMPLEMENT ());
+
+ // Create multiple lane.
this->lanes_ = new TAO_Thread_Lane *[this->number_of_lanes_];
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
- this->lanes_[i] = new TAO_Thread_Lane (*this,
- i,
- lanes[i].lane_priority,
- lanes[i].static_threads,
- lanes[i].dynamic_threads);
+ this->lanes_[i] =
+ new TAO_Thread_Lane (*this,
+ i,
+ lanes[i].lane_priority,
+ lanes[i].static_threads,
+ lanes[i].dynamic_threads,
+ ACE_TRY_ENV);
+}
+
+void
+TAO_Thread_Pool::open (CORBA::Environment &ACE_TRY_ENV)
+{
+ // Open all the lanes.
+ for (CORBA::ULong i = 0;
+ i != this->number_of_lanes_;
+ ++i)
+ {
+ this->lanes_[i]->open (ACE_TRY_ENV);
+ ACE_CHECK;
+ }
}
TAO_Thread_Pool::~TAO_Thread_Pool (void)
{
+ // Delete all the lanes.
for (CORBA::ULong i = 0;
i != this->number_of_lanes_;
++i)
@@ -213,6 +467,54 @@ TAO_Thread_Pool::~TAO_Thread_Pool (void)
delete[] this->lanes_;
}
+void
+TAO_Thread_Pool::finalize (void)
+{
+ // Finalize all the lanes.
+ for (CORBA::ULong i = 0;
+ i != this->number_of_lanes_;
+ ++i)
+ this->lanes_[i]->finalize ();
+}
+
+void
+TAO_Thread_Pool::shutdown_reactor (void)
+{
+ // Finalize all the lanes.
+ for (CORBA::ULong i = 0;
+ i != this->number_of_lanes_;
+ ++i)
+ this->lanes_[i]->shutdown_reactor ();
+}
+
+void
+TAO_Thread_Pool::wait (void)
+{
+ // Finalize all the lanes.
+ for (CORBA::ULong i = 0;
+ i != this->number_of_lanes_;
+ ++i)
+ this->lanes_[i]->wait ();
+}
+
+int
+TAO_Thread_Pool::is_collocated (const TAO_MProfile &mprofile)
+{
+ // Finalize all the lanes.
+ for (CORBA::ULong i = 0;
+ i != this->number_of_lanes_;
+ ++i)
+ {
+ int result =
+ this->lanes_[i]->is_collocated (mprofile);
+
+ if (result)
+ return result;
+ }
+
+ return 0;
+}
+
int
TAO_Thread_Pool::create_static_threads (void)
{
@@ -232,6 +534,18 @@ TAO_Thread_Pool::create_static_threads (void)
return 0;
}
+int
+TAO_Thread_Pool::with_lanes (void) const
+{
+ return this->with_lanes_;
+}
+
+TAO_Thread_Pool_Manager &
+TAO_Thread_Pool::manager (void) const
+{
+ return this->manager_;
+}
+
CORBA::ULong
TAO_Thread_Pool::id (void) const
{
@@ -280,18 +594,6 @@ TAO_Thread_Pool::number_of_lanes (void) const
return this->number_of_lanes_;
}
-ACE_Thread_Manager &
-TAO_Thread_Pool::thread_manager (void)
-{
- return this->thread_manager_;
-}
-
-CORBA::ORB_ptr
-TAO_Thread_Pool::orb (void) const
-{
- return CORBA::ORB::_duplicate (this->orb_.in ());
-}
-
#define TAO_THREAD_POOL_MANAGER_GUARD \
ACE_GUARD_THROW_EX ( \
ACE_SYNCH_MUTEX, \
@@ -303,22 +605,71 @@ TAO_Thread_Pool::orb (void) const
0), \
CORBA::COMPLETED_NO));
-TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core *orb_core)
+TAO_Thread_Pool_Manager::TAO_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
: orb_core_ (orb_core),
thread_pools_ (),
- thread_pool_id_counter_ (0),
+ thread_pool_id_counter_ (1),
lock_ ()
{
}
TAO_Thread_Pool_Manager::~TAO_Thread_Pool_Manager (void)
{
+ // Delete all the pools.
for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
iterator != this->thread_pools_.end ();
++iterator)
delete (*iterator).int_id_;
}
+void
+TAO_Thread_Pool_Manager::finalize (void)
+{
+ // Finalize all the pools.
+ for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
+ iterator != this->thread_pools_.end ();
+ ++iterator)
+ (*iterator).int_id_->finalize ();
+}
+
+void
+TAO_Thread_Pool_Manager::shutdown_reactor (void)
+{
+ // Finalize all the pools.
+ for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
+ iterator != this->thread_pools_.end ();
+ ++iterator)
+ (*iterator).int_id_->shutdown_reactor ();
+}
+
+void
+TAO_Thread_Pool_Manager::wait (void)
+{
+ // Finalize all the pools.
+ for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
+ iterator != this->thread_pools_.end ();
+ ++iterator)
+ (*iterator).int_id_->wait ();
+}
+
+int
+TAO_Thread_Pool_Manager::is_collocated (const TAO_MProfile &mprofile)
+{
+ // Finalize all the pools.
+ for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
+ iterator != this->thread_pools_.end ();
+ ++iterator)
+ {
+ int result =
+ (*iterator).int_id_->is_collocated (mprofile);
+
+ if (result)
+ return result;
+ }
+
+ return 0;
+}
+
RTCORBA::ThreadpoolId
TAO_Thread_Pool_Manager::create_threadpool (CORBA::ULong stacksize,
CORBA::ULong static_threads,
@@ -393,7 +744,8 @@ TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
TAO_Thread_Pool *thread_pool = 0;
ACE_NEW_THROW_EX (thread_pool,
- TAO_Thread_Pool (this->thread_pool_id_counter_,
+ TAO_Thread_Pool (*this,
+ this->thread_pool_id_counter_,
stacksize,
static_threads,
dynamic_threads,
@@ -401,7 +753,7 @@ TAO_Thread_Pool_Manager::create_threadpool_i (CORBA::ULong stacksize,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size,
- this->orb_core_->orb ()),
+ ACE_TRY_ENV),
CORBA::NO_MEMORY ());
ACE_CHECK_RETURN (0);
@@ -423,14 +775,15 @@ TAO_Thread_Pool_Manager::create_threadpool_with_lanes_i (CORBA::ULong stacksize,
TAO_Thread_Pool *thread_pool = 0;
ACE_NEW_THROW_EX (thread_pool,
- TAO_Thread_Pool (this->thread_pool_id_counter_,
+ TAO_Thread_Pool (*this,
+ this->thread_pool_id_counter_,
stacksize,
lanes,
allow_borrowing,
allow_request_buffering,
max_buffered_requests,
max_request_buffer_size,
- this->orb_core_->orb ()),
+ ACE_TRY_ENV),
CORBA::NO_MEMORY ());
ACE_CHECK_RETURN (0);
@@ -446,11 +799,15 @@ TAO_Thread_Pool_Manager::create_threadpool_helper (TAO_Thread_Pool *thread_pool,
// Make sure of safe deletion in case of errors.
auto_ptr<TAO_Thread_Pool> safe_thread_pool (thread_pool);
+ // Open the pool.
+ thread_pool->open (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (0);
+
// Create the static threads.
int result =
thread_pool->create_static_threads ();
- // Throw exceptin in case of errors.
+ // Throw exception in case of errors.
if (result != 0)
ACE_THROW_RETURN (CORBA::INTERNAL (),
result);
@@ -493,10 +850,31 @@ TAO_Thread_Pool_Manager::destroy_threadpool_i (RTCORBA::ThreadpoolId thread_pool
if (result != 0)
ACE_THROW (RTCORBA::RTORB::InvalidThreadpool ());
+ // Shutdown reactor.
+ thread_pool->shutdown_reactor ();
+
+ // Wait for the threads.
+ thread_pool->wait ();
+
+ // Finalize resources.
+ thread_pool->finalize ();
+
// Delete the thread pool.
delete thread_pool;
}
+TAO_ORB_Core &
+TAO_Thread_Pool_Manager::orb_core (void) const
+{
+ return this->orb_core_;
+}
+
+ACE_SYNCH_MUTEX &
+TAO_Thread_Pool_Manager::lock (void)
+{
+ return this->lock_;
+}
+
TAO_Thread_Pool_Manager::THREAD_POOLS &
TAO_Thread_Pool_Manager::thread_pools (void)
{
@@ -529,4 +907,3 @@ template class ACE_Auto_Basic_Array_Ptr<size_t>;
#pragma instantiate ACE_Auto_Basic_Array_Ptr<size_t>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-