summaryrefslogtreecommitdiff
path: root/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp')
-rw-r--r--TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp313
1 files changed, 0 insertions, 313 deletions
diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp
deleted file mode 100644
index adc18ab7c7f..00000000000
--- a/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp
+++ /dev/null
@@ -1,313 +0,0 @@
-// $Id$
-
-#include "tao/CSD_ThreadPool/CSD_TP_Task.h"
-#include "tao/CSD_ThreadPool/CSD_TP_Request.h"
-#include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
-#include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
-
-ACE_RCSID (CSD_ThreadPool,
- TP_Task,
- "$Id$")
-
-#if !defined (__ACE_INLINE__)
-# include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
-#endif /* ! __ACE_INLINE__ */
-
-TAO_BEGIN_VERSIONED_NAMESPACE_DECL
-
-TAO::CSD::TP_Task::~TP_Task()
-{
-}
-
-
-bool
-TAO::CSD::TP_Task::add_request(TP_Request* request)
-{
- GuardType guard(this->lock_);
-
- if (!this->accepting_requests_)
- {
- ACE_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - "
- "not accepting requests\n"));
- return false;
- }
-
- // We have made the decision that the request is going to be placed upon
- // the queue_. Inform the request that it is about to be placed into
- // a request queue. Some requests may not need to do anything in
- // preparation of being placed into a queue. Others, however, may need
- // to perfom a "clone" operation on some underlying request data before
- // the request can be properly placed into a queue.
- request->prepare_for_queue();
-
- this->queue_.put(request);
-
- this->work_available_.signal();
-
- return true;
-}
-
-
-int
-TAO::CSD::TP_Task::open(void* num_threads_ptr)
-{
- Thread_Counter num = 1;
-
- if (num_threads_ptr != 0)
- {
- Thread_Counter* tmp = static_cast<Thread_Counter*> (num_threads_ptr);
-
- if (tmp == 0)
- {
- ACE_ERROR_RETURN((LM_ERROR,
- "(%P|%t) TP_Task failed to open. "
- "Invalid argument type passed to open().\n"),
- -1);
- }
-
- num = *tmp;
- }
-
- // We can't activate 0 threads. Make sure this isn't the case.
- if (num < 1)
- {
- ACE_ERROR_RETURN((LM_ERROR,
- "(%P|%t) TP_Task failed to open. "
- "num_threads (%u) is less-than 1.\n",
- num),
- -1);
- }
-
- // Likewise, we can't activate too many. Make sure this isn't the case.
- if (num > MAX_THREADPOOL_TASK_WORKER_THREADS)
- {
- ACE_ERROR_RETURN((LM_ERROR,
- "(%P|%t) TP_Task failed to open. "
- "num_threads (%u) is too large. Max is %d.\n",
- num, MAX_THREADPOOL_TASK_WORKER_THREADS),
- -1);
- }
-
- // We need the lock acquired from here on out.
- GuardType guard(this->lock_);
-
- // We can assume that we are in the proper state to handle this open()
- // call as long as we haven't been open()'ed before.
- if (this->opened_)
- {
- ACE_ERROR_RETURN((LM_ERROR,
- "(%P|%t) TP_Task failed to open. "
- "Task has previously been open()'ed.\n"),
- -1);
- }
-
- // Activate this task object with 'num' worker threads.
- if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0)
- {
- // Assumes that when activate returns non-zero return code that
- // no threads were activated.
- ACE_ERROR_RETURN((LM_ERROR,
- "(%P|%t) TP_Task failed to activate "
- "(%d) worker threads.\n",
- num),
- -1);
- }
-
- // Now we have past the point where we can say we've been open()'ed before.
- this->opened_ = true;
-
- // Now we wait until all of the threads have started.
- while (this->num_threads_ != num)
- {
- this->active_workers_.wait();
- }
-
- // We can now accept requests (via our add_request() method).
- this->accepting_requests_ = true;
-
- return 0;
-}
-
-
-int
-TAO::CSD::TP_Task::svc()
-{
- // Account for this current worker thread having started the
- // execution of this svc() method.
- {
- GuardType guard(this->lock_);
- // Put the thread id into a collection which is used to check whether
- // the orb shutdown is called by one of the threads in the pool.
- ACE_thread_t thr_id = ACE_OS::thr_self ();
- if (this->activated_threads_.set(thr_id, this->num_threads_) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT("(%P|%t)TP_Task::svc: number of threads is out of range \n")),
- 0);
- }
- ++this->num_threads_;
- this->active_workers_.signal();
- }
-
- // This visitor object will be re-used over and over again as part of
- // the "GetWork" logic below.
- TP_Dispatchable_Visitor dispatchable_visitor;
-
- // Start the "GetWork-And-PerformWork" loop for the current worker thread.
- while (1)
- {
- TP_Request_Handle request;
-
- // Do the "GetWork" step.
- {
- // Acquire the lock until just before we decide to "PerformWork".
- GuardType guard(this->lock_);
-
- // Start the "GetWork" loop.
- while (request.is_nil())
- {
- if (this->shutdown_initiated_)
- {
- // This breaks us out of all loops with one fell swoop.
- return 0;
- }
-
- // There is no need to visit the queue if it is empty.
- if (!this->queue_.is_empty())
- {
- // Reset the visitor since we use it over and over. This
- // will cause the visitor to drop any reference to
- // a request that it may still be holding from a prior
- // call to accept_visitor().
- dispatchable_visitor.reset();
-
- // Visit the requests in the queue in hopes of
- // locating the first "dispatchable" (ie, not busy) request.
- // If a dispatchable request is located, it is extracted
- // from the queue and saved in a handle data member in the
- // visitor object.
- this->queue_.accept_visitor(dispatchable_visitor);
-
- // If a dispatchable request is located, it is extracted
- // from the queue and saved in a handle data member in the
- // visitor object. Let's get a "copy" (or a NULL pointer
- // if the visitor didn't locate/extract one).
- request = dispatchable_visitor.request();
- }
-
- // Either the queue is empty or we couldn't find any dispatchable
- // requests in the queue at this time.
- if (request.is_nil())
- {
- // Let's wait until we hear about the possibility of
- // work before we go look again.
- this->work_available_.wait();
- }
- }
-
- // We have dropped out of the "while (request.is_nil())" loop.
- // We only get here is we located/extracted a dispatchable request
- // from the queue. Note that the visitor will have already
- // marked the target servant as now being busy (because of us).
- // We can now safely release the lock.
- }
-
- // Do the "PerformWork" step. We don't need the lock_ to do this.
- request->dispatch();
-
- // Now that the request has been dispatched, we need to mark the target
- // servant as no longer being busy, and we need to signal any wait()'ing
- // worker threads that there may be some dispatchable requests in the
- // queue now for this not-busy servant. We need the lock_ to do this.
- {
- GuardType guard(this->lock_);
- request->mark_as_ready();
- this->work_available_.signal();
- }
-
- // Note that the request will be "released" here when the request
- // handle falls out of scope and its destructor performs the
- // _remove_ref() call on the underlying TP_Request object.
- }
-
- // This will never get executed.
- return 0;
-}
-
-
-int
-TAO::CSD::TP_Task::close(u_long flag)
-{
- GuardType guard(this->lock_);
-
- if (flag == 0)
- {
- // Worker thread is closing.
- --this->num_threads_;
- this->active_workers_.signal();
- }
- else
- {
- // Strategy object is shutting down the task.
-
- // Do nothing if this task has never been open()'ed.
- if (!this->opened_)
- {
- return 0;
- }
-
- // Set the shutdown flag to true.
- this->shutdown_initiated_ = true;
-
- // Stop accepting requests.
- this->accepting_requests_ = false;
-
- // Signal all worker threads waiting on the work_available_ condition.
- this->work_available_.broadcast();
-
- size_t num_waiting_threads = 0;
-
- ACE_thread_t my_thr_id = ACE_OS::thr_self ();
-
- // Check whether the calling thread(calling orb shutdown) is one of the
- // threads in the pool. If it is then it should not wait itself.
- size_t size = this->activated_threads_.size ();
-
- for (size_t i = 0; i < size; i ++)
- {
- ACE_thread_t thr_id = 0;
- if (activated_threads_.get (thr_id, i) == 0 && thr_id == my_thr_id)
- {
- num_waiting_threads = 1;
- break;
- }
- }
-
- // Wait until all worker threads have shutdown.
- while (this->num_threads_ != num_waiting_threads)
- {
- this->active_workers_.wait();
- }
-
- // Cancel all requests.
- TP_Cancel_Visitor cancel_visitor;
- this->queue_.accept_visitor(cancel_visitor);
- }
-
- return 0;
-}
-
-
-
-void
-TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant
- ACE_ENV_ARG_DECL)
-{
- GuardType guard(this->lock_);
-
- // Cancel the requests targeted for the provided servant.
- TP_Cancel_Visitor cancel_visitor(servant);
- this->queue_.accept_visitor(cancel_visitor);
-}
-
-TAO_END_VERSIONED_NAMESPACE_DECL