summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp181
1 files changed, 97 insertions, 84 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
index 01a7a51711f..a8299c2ecfa 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp
@@ -1,22 +1,21 @@
// $Id$
-#include "orbsvcs/Notify/ThreadPool_Task.h"
+#include "ThreadPool_Task.h"
ACE_RCSID(Notify, TAO_Notify_ThreadPool_Task, "$Id$")
-#include "orbsvcs/Notify/Properties.h"
-#include "orbsvcs/Notify/Timer_Queue.h"
-#include "orbsvcs/Notify/Buffering_Strategy.h"
+#include "Properties.h"
+#include "Timer_Queue.h"
+#include "Buffering_Strategy.h"
#include "tao/debug.h"
#include "tao/ORB_Core.h"
#include "ace/OS_NS_errno.h"
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
TAO_Notify_ThreadPool_Task::TAO_Notify_ThreadPool_Task (void)
-: shutdown_ (false)
+ : shutdown_ (false)
+ , shutdown_handler_(this)
{
}
@@ -37,27 +36,27 @@ TAO_Notify_ThreadPool_Task::timer (void)
}
void
-TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params,
- const TAO_Notify_AdminProperties::Ptr& admin_properties ACE_ENV_ARG_DECL)
+TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params, TAO_Notify_AdminProperties::Ptr& admin_properties ACE_ENV_ARG_DECL)
{
ACE_ASSERT (this->timer_.get() == 0);
TAO_Notify_Timer_Queue* timer = 0;
ACE_NEW_THROW_EX (timer,
- TAO_Notify_Timer_Queue (),
- CORBA::NO_MEMORY ());
+ TAO_Notify_Timer_Queue (),
+ CORBA::NO_MEMORY ());
ACE_CHECK;
this->timer_.reset (timer);
TAO_Notify_Buffering_Strategy* buffering_strategy = 0;
ACE_NEW_THROW_EX (buffering_strategy,
- TAO_Notify_Buffering_Strategy (*msg_queue (), admin_properties),
- CORBA::NO_MEMORY ());
+ TAO_Notify_Buffering_Strategy (*msg_queue (), admin_properties),
+ CORBA::NO_MEMORY ());
this->buffering_strategy_.reset (buffering_strategy);
ACE_CHECK;
- long flags = THR_NEW_LWP | THR_DETACHED;
+ long flags = THR_NEW_LWP | THR_JOINABLE;
+
CORBA::ORB_var orb =
TAO_Notify_PROPERTIES::instance()->orb ();
@@ -74,27 +73,28 @@ TAO_Notify_ThreadPool_Task::init (const NotifyExt::ThreadPoolParams& tp_params,
// Become an active object.
if (this->ACE_Task <ACE_NULL_SYNCH>::activate (flags,
- tp_params.static_threads,
- 0,
- ACE_THR_PRI_OTHER_DEF) == -1)
- {
- // Undo the ref counts on error
- for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
+ tp_params.static_threads,
+ 0,
+ ACE_THR_PRI_OTHER_DEF) == -1)
{
- this->_decr_refcnt();
- }
+ // Undo the ref counts on error
+ for ( CORBA::ULong i = 0; i < tp_params.static_threads; ++i )
+ {
+ this->_decr_refcnt();
+ }
- if (TAO_debug_level > 0)
- {
- if (ACE_OS::last_error () == EPERM)
- ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Insufficient privilege.\n")));
- else
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("(%t) task activation at priority %d failed\n")
- ACE_TEXT ("exiting!\n%a"),
- tp_params.default_priority));
- }
- ACE_THROW (CORBA::BAD_PARAM ());
+ if (TAO_debug_level > 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Insufficient privilege.\n")));
+ else
+ ACE_DEBUG ((LM_ERROR,
+ ACE_TEXT ("(%t) task activation at priority %d failed\n")
+ ACE_TEXT ("exiting!\n%a"),
+ tp_params.default_priority));
+ }
+
+ ACE_THROW (CORBA::BAD_PARAM ());
}
}
@@ -103,15 +103,11 @@ TAO_Notify_ThreadPool_Task::execute (TAO_Notify_Method_Request& method_request A
{
if (!shutdown_)
{
- TAO_Notify_Method_Request_Queueable* request_copy = method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ TAO_Notify_Method_Request_Queueable& request_copy = *method_request.copy (ACE_ENV_SINGLE_ARG_PARAMETER);
- if (this->buffering_strategy_->enqueue (request_copy) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "NS_ThreadPool_Task (%P|%t) - "
- "failed to enqueue\n"));
- }
+ // Ignore the return value, because we already print an error
+ // message if enqueing fails.
+ this->buffering_strategy_->enqueue (request_copy);
}
}
@@ -121,45 +117,45 @@ TAO_Notify_ThreadPool_Task::svc (void)
TAO_Notify_Method_Request_Queueable* method_request;
while (!shutdown_)
- {
- ACE_TRY_NEW_ENV
{
- ACE_Time_Value* dequeue_blocking_time = 0;
- ACE_Time_Value earliest_time;
-
- if (!this->timer_->impl().is_empty ())
- {
- earliest_time = this->timer_->impl().earliest_time ();
- dequeue_blocking_time = &earliest_time;
- }
-
- // Dequeue 1 item
- int result = buffering_strategy_->dequeue (method_request, dequeue_blocking_time);
-
- if (result > 0)
- {
- method_request->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- ACE_Message_Block::release (method_request);
- }
- else if (errno == ETIME)
- {
- this->timer_->impl ().expire ();
- }
- else
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "ThreadPool_Task dequeue failed\n"));
- }
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- "ThreadPool_Task (%P|%t) exception in method request\n");
- }
- ACE_ENDTRY;
- } /* while */
+ ACE_TRY_NEW_ENV
+ {
+ ACE_Time_Value* dequeue_blocking_time = 0;
+ ACE_Time_Value earliest_time;
+
+ if (!this->timer_->impl().is_empty ())
+ {
+ earliest_time = this->timer_->impl().earliest_time ();
+ dequeue_blocking_time = &earliest_time;
+ }
+
+ // Dequeue 1 item
+ int result = buffering_strategy_->dequeue (method_request, dequeue_blocking_time);
+
+ if (result > 0)
+ {
+ method_request->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_Message_Block::release (method_request);
+ }
+ else if (errno == ETIME)
+ {
+ this->timer_->impl ().expire ();
+ }
+ else if (result == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "ThreadPool_Task dequeue failed\n"));
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "ThreadPool_Task (%P|%t) exception in method request\n");
+ }
+ ACE_ENDTRY;
+ } /* while */
return 0;
}
@@ -169,12 +165,21 @@ TAO_Notify_ThreadPool_Task::shutdown (void)
{
if (this->shutdown_)
{
- return;
+ return;
}
this->shutdown_ = true;
this->buffering_strategy_->shutdown ();
+
+ // be sure this object is not deleted until wait() returns
+ this->_incr_refcnt ();
+
+ // get another thread to wait() for the thread(s) running svc() to exit
+ // otherwise the thread is a zombie on Solaris and just hangs around
+ // on windows.
+ TAO_Notify_PROPERTIES::instance()
+ ->orb ()->orb_core ()->reactor ()->notify (&shutdown_handler_);
}
void
@@ -184,17 +189,25 @@ TAO_Notify_ThreadPool_Task::release (void)
}
int
-TAO_Notify_ThreadPool_Task::close (u_long)
+TAO_Notify_ThreadPool_Task::close (u_long /*flags*/)
{
- // _incr_refcnt() for each spawned thread in init()
+ // Undo the thread spawn guard. close is called per thread spawned.
this->_decr_refcnt();
return 0;
}
void
+TAO_Notify_ThreadPool_Task::wait_for_shutdown ()
+{
+ // wait for thread(s) running svc() to return.
+ this->wait ();
+
+ // Undo the shutdown request guard.
+ this->_decr_refcnt ();
+}
+
+void
TAO_Notify_ThreadPool_Task::update_qos_properties (const TAO_Notify_QoSProperties& qos_properties)
{
this->buffering_strategy_->update_qos_properties (qos_properties);
}
-
-TAO_END_VERSIONED_NAMESPACE_DECL