summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authorPhil Mesnier <mesnier_p@ociweb.com>2014-05-09 16:56:15 +0000
committerPhil Mesnier <mesnier_p@ociweb.com>2014-05-09 16:56:15 +0000
commitb857acbb868bf9c3d4ce689a5df9fd92f146de29 (patch)
treec9ae9b11bc5b792fe04d34e0a2929d8716ea7bde /TAO/tao
parent23bd5c7d4b5a4edecfeccd27ee7620ddf6965c3f (diff)
downloadATCD-b857acbb868bf9c3d4ce689a5df9fd92f146de29.tar.gz
Fri May 9 16:53:27 UTC 2014 Phil Mesnier <mesnier_p@ociweb.com>
* tao/Dynamic_TP/DTP_Task.h: * tao/Dynamic_TP/DTP_Task.cpp: Clean up a race condition that could allow too many threads to exit if they were started concurrently.
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Dynamic_TP/DTP_Task.cpp129
-rw-r--r--TAO/tao/Dynamic_TP/DTP_Task.h12
2 files changed, 99 insertions, 42 deletions
diff --git a/TAO/tao/Dynamic_TP/DTP_Task.cpp b/TAO/tao/Dynamic_TP/DTP_Task.cpp
index 0f32ad86680..e13dff7ad8d 100644
--- a/TAO/tao/Dynamic_TP/DTP_Task.cpp
+++ b/TAO/tao/Dynamic_TP/DTP_Task.cpp
@@ -20,6 +20,7 @@ TAO_DTP_Task::TAO_DTP_Task ()
work_lock_ (),
work_available_ (this->work_lock_),
active_workers_ (this->aw_lock_),
+ active_count_ (0),
accepting_requests_ (false),
shutdown_ (false),
opened_ (false),
@@ -94,36 +95,36 @@ TAO_DTP_Task::add_request (TAO::CSD::TP_Request* request)
size_t
TAO_DTP_Task::get_init_pool_threads ()
{
- return (this->init_pool_threads_);
+ return this->init_pool_threads_;
}
size_t
TAO_DTP_Task::get_min_pool_threads ()
{
- return(this->min_pool_threads_);
+ return this->min_pool_threads_;
}
size_t TAO_DTP_Task::get_max_pool_threads ()
{
- return(this->max_pool_threads_);
+ return this->max_pool_threads_;
}
size_t
TAO_DTP_Task::get_max_request_queue_depth ()
{
- return(this->max_request_queue_depth_);
+ return this->max_request_queue_depth_;
}
size_t
TAO_DTP_Task::get_thread_stack_size ()
{
- return(this->thread_stack_size_);
+ return this->thread_stack_size_;
}
time_t
TAO_DTP_Task::get_thread_idle_time ()
{
- return(this->thread_idle_time_.sec());
+ return this->thread_idle_time_.sec();
}
int
@@ -277,17 +278,67 @@ TAO_DTP_Task::clear_request (TAO::CSD::TP_Request_Handle &r)
r->mark_as_ready ();
}
+void
+TAO_DTP_Task::add_busy (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
+ ++this->busy_threads_;
+}
+
+void
+TAO_DTP_Task::remove_busy (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
+ --this->busy_threads_;
+}
+
+void
+TAO_DTP_Task::add_active (void)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
+ ++this->active_count_;
+}
+
+bool
+TAO_DTP_Task::remove_active (bool force)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->aw_lock_, false);
+ if (force || this->above_minimum())
+ {
+ --this->active_count_;
+ this->active_workers_.signal ();
+ return true;
+ }
+ return false;
+}
+
+bool
+TAO_DTP_Task::need_active (void)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->aw_lock_, false);
+ return ((this->busy_threads_ == this->active_count_) &&
+ ((this->max_pool_threads_ < 1) ||
+ (this->active_count_ < this->max_pool_threads_)));
+}
+
+bool
+TAO_DTP_Task::above_minimum (void)
+{
+ return this->min_pool_threads_ > 0 &&
+ this->active_count_ > this->min_pool_threads_;
+}
+
int
TAO_DTP_Task::svc (void)
{
- ++this->busy_threads_;
+ this->add_active ();
+ this->add_busy ();
if (TAO_debug_level > 4)
{
TAOLIB_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
ACE_TEXT ("New thread created.\n")));
}
-
TAO::CSD::TP_Dispatchable_Visitor dispatchable_visitor;
while (!this->shutdown_)
{
@@ -297,7 +348,7 @@ TAO_DTP_Task::svc (void)
{
if (!this->request_ready (dispatchable_visitor, request))
{
- --this->busy_threads_;
+ this->remove_busy ();
if (TAO_debug_level > 4)
{
@@ -321,9 +372,7 @@ TAO_DTP_Task::svc (void)
return 0;
if (wait_state == -1)
{
- if (errno != ETIME ||
- (this->thr_count() > this->min_pool_threads_ &&
- this->min_pool_threads_ > 0))
+ if (errno != ETIME || this->remove_active (false))
{
if (TAO_debug_level > 4)
{
@@ -336,7 +385,7 @@ TAO_DTP_Task::svc (void)
}
}
- ++this->busy_threads_;
+ this->add_busy ();
if (TAO_debug_level > 4)
{
TAOLIB_DEBUG ((LM_DEBUG,
@@ -348,49 +397,47 @@ TAO_DTP_Task::svc (void)
}
}
- size_t count = this->thr_count ();
- if ((this->busy_threads_ == count) &&
- ((this->max_pool_threads_ == 0) ||
- (count < this->max_pool_threads_)))
+ if (this->need_active ())
{
- if (this->activate(THR_NEW_LWP | THR_DETACHED,
- 1,
- 1,
- ACE_DEFAULT_THREAD_PRIORITY,
- -1,
- 0,
- 0,
- 0,
- this->thread_stack_size_ == 0 ? 0 :
- &this->thread_stack_size_) != 0)
+ if (this->activate (THR_NEW_LWP | THR_DETACHED,
+ 1,
+ 1,
+ ACE_DEFAULT_THREAD_PRIORITY,
+ -1,
+ 0,
+ 0,
+ 0,
+ this->thread_stack_size_ == 0 ? 0 :
+ &this->thread_stack_size_) != 0)
{
TAOLIB_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) DTP_Task::svc() failed to grow ")
- ACE_TEXT ("to %d worker threads.\n"), count));
+ ACE_TEXT ("(%P|%t) DTP_Task::svc() failed to ")
+ ACE_TEXT ("grow thread pool.\n")));
}
else
- {
- if (TAO_debug_level > 4)
- {
- TAOLIB_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
- ACE_TEXT ("Growing threadcount. ")
- ACE_TEXT ("New thread count:%d\n"),
- this->thr_count ()));
- }
- }
+ {
+ if (TAO_debug_level > 4)
+ {
+ TAOLIB_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
+ ACE_TEXT ("Growing threadcount. ")
+ ACE_TEXT ("New thread count:%d\n"),
+ this->thr_count ()));
+ }
+ }
}
request->dispatch ();
this->clear_request (request);
dispatchable_visitor.reset ();
}
+ this->remove_active (true);
return 0;
}
int
-TAO_DTP_Task::close(u_long flag)
+TAO_DTP_Task::close (u_long flag)
{
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->aw_lock_, 0);
@@ -423,7 +470,7 @@ TAO_DTP_Task::close(u_long flag)
in_task, this->thr_count ()));
}
- while (this->thr_count () != in_task)
+ while (this->thr_count () > in_task)
{
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->aw_lock_, 0);
this->active_workers_.wait ();
diff --git a/TAO/tao/Dynamic_TP/DTP_Task.h b/TAO/tao/Dynamic_TP/DTP_Task.h
index ace179e295a..ceed9a9d44d 100644
--- a/TAO/tao/Dynamic_TP/DTP_Task.h
+++ b/TAO/tao/Dynamic_TP/DTP_Task.h
@@ -126,7 +126,12 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
/// release the request, reset the accepting flag if necessary
void clear_request (TAO::CSD::TP_Request_Handle &r);
-
+ void add_busy (void);
+ void remove_busy (void);
+ void add_active (void);
+ bool remove_active (bool);
+ bool need_active (void);
+ bool above_minimum (void);
typedef TAO_SYNCH_MUTEX LockType;
typedef TAO_Condition<LockType> ConditionType;
@@ -151,6 +156,11 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
/// of the worker threads have stopped running.
ConditionType active_workers_;
+ /// The number of threads that are currently active. This may be
+ /// different than the total number of threads since the latter
+ /// may include threads that are shutting down but not reaped.
+ size_t active_count_;
+
/// Flag used to indicate when this task will (or will not) accept
/// requests via the the add_request() method.
bool accepting_requests_;