diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
commit | 0e49389337be86641451a5c36c24bf742fe97523 (patch) | |
tree | 197c810e5f5bce17b1233a7cb8d7b50c0bcd25e2 /TAO/tao/CSD_ThreadPool | |
parent | 8008dd09ccf88d4edef237a184a698cac42f2952 (diff) | |
download | ATCD-0e49389337be86641451a5c36c24bf742fe97523.tar.gz |
Repo restructuring
Diffstat (limited to 'TAO/tao/CSD_ThreadPool')
63 files changed, 4464 insertions, 0 deletions
diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.cpp new file mode 100644 index 00000000000..35b6b316012 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.cpp @@ -0,0 +1,48 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h" +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" + +ACE_RCSID (CSD_TP, + Cancel_Visitor, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.inl" +#endif /* ! __ACE_INLINE__ */ + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Cancel_Visitor::~TP_Cancel_Visitor() +{ +} + + +bool +TAO::CSD::TP_Cancel_Visitor::visit_request(TP_Request* request, + bool& remove_flag) +{ + // If our servant_ data member is in the 'nil' state, then + // we are supposed to cancel *ALL* requests that we visit. + // + // Otherwise, if our servant_ data member is not in the 'nil' state, + // we are supposed to cancel only requests that target our specific + // servant_. + + if ((this->servant_.in() == 0) || (request->is_target(this->servant_.in()))) + { + // Set the remove_flag to true so that this request is removed + // (and released) from the queue when we finish our visit. + remove_flag = true; + + // Cancel the request + request->cancel(); + } + + // Since we are either cancelling requests to any servant or a + // specific servant, always continue visitation. + return true; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h new file mode 100644 index 00000000000..d9b2472e507 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h @@ -0,0 +1,87 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Cancel_Visitor.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CANCEL_VISITOR_H +#define TAO_CSD_TP_CANCEL_VISITOR_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" +#include "tao/PortableServer/Servant_Base.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h" + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + /** + * @class TP_Cancel_Visitor + * + * @brief Used to extract/cancel request(s) from the queue. + * + * This visitor is used to cancel certain requests in the queue + * as they are visited. + * + * Note that this is currently implemented to cancel *all* + * requests in the queue, or requests that are targeted for a specific + * servant. This could be extended in the future to perhaps + * cancel all requests that have the same operation name, or something + * else. + * + */ + class TAO_CSD_TP_Export TP_Cancel_Visitor : public TP_Queue_Visitor + { + public: + + /// Default Constructor - cancel *all* requests. + TP_Cancel_Visitor(); + + /// Constructor with provided servant - cancel requests that + /// target the supplied servant. + TP_Cancel_Visitor(PortableServer::Servant servant); + + /// Virtual Destructor. + virtual ~TP_Cancel_Visitor(); + + /// Returns true to continue visitation. Returns false to stop + /// visitation. Sets the remove_flag to true if the request should + /// be removed from the queue as a result of the visit. Leaves the + /// remove_flag alone otherwise. + virtual bool visit_request(TP_Request* request, bool& remove_flag); + + private: + + /// Left as nil if we are to cancel all requests, or set to a specific + /// servant if only requests targeting that servant should be cancelled. + PortableServer::ServantBase_var servant_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_DISPATCHABLE_VISITOR_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.inl new file mode 100644 index 00000000000..e1602e2dde3 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.inl @@ -0,0 +1,32 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Cancel_Visitor::TP_Cancel_Visitor() +{ +} + + +ACE_INLINE +TAO::CSD::TP_Cancel_Visitor::TP_Cancel_Visitor(PortableServer::Servant servant) + : servant_(servant) +{ + // This try-catch block is not really necessary for current implementation + // since the _add_ref does not throw exception, but we have to add it to + // satisfy the non-exception builds. If _add_ref really throws an exception + // then this constructor needs deal with the exception. + ACE_TRY_NEW_ENV + { + this->servant_->_add_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL + { + } + ACE_ENDTRY; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.cpp new file mode 100644 index 00000000000..0e4c596f682 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.cpp @@ -0,0 +1,60 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Collocated_Asynch_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.inl" +#endif /* ! __ACE_INLINE__ */ + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Collocated_Asynch_Request::~TP_Collocated_Asynch_Request() +{ +} + + +void +TAO::CSD::TP_Collocated_Asynch_Request::prepare_for_queue_i() +{ + this->do_clone(); +} + + +void +TAO::CSD::TP_Collocated_Asynch_Request::dispatch_i() +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->do_dispatch(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#endif + ACE_ENDTRY; +} + +void +TAO::CSD::TP_Collocated_Asynch_Request::cancel_i() +{ + this->do_cancel(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h new file mode 100644 index 00000000000..6cfa149f73a --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h @@ -0,0 +1,90 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Collocated_Asynch_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_COLLOCATED_ASYNCH_REQUEST_H +#define TAO_CSD_TP_COLLOCATED_ASYNCH_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Collocated_Asynch_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Collocated_Asynch_Request> + TP_Collocated_Asynch_Request_Handle; + + /** + * @class TP_Collocated_Asynch_Request + * + * @brief Represents a "queue-able", collocated, asynchronous, + * CORBA request. + * + * This kind request is one-way collocated request with the default + * SYNC_SCOPE policy (SYNC_WITH_TRANSPORT) applied. It is cloned + * before enqueuing and the "enqueuing" thread never blocks. + */ + class TAO_CSD_TP_Export TP_Collocated_Asynch_Request + : public TP_Corba_Request + { + public: + + /// Constructor. + TP_Collocated_Asynch_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Collocated_Asynch_Request(); + + + protected: + + /// Prepare this TP_Collocated_Asynch_Request object to be placed + /// into the request queue. This will cause the underlying + /// TAO_ServerRequest object to be cloned. + virtual void prepare_for_queue_i(); + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_COLLOCATED_ASYNCH_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.inl new file mode 100644 index 00000000000..d44f0bc56dd --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.inl @@ -0,0 +1,24 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Collocated_Asynch_Request::TP_Collocated_Asynch_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state) + : TP_Corba_Request(object_id, + poa, + operation, + servant, + servant_state, + server_request) +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.cpp new file mode 100644 index 00000000000..c7d7d00fcde --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.cpp @@ -0,0 +1,56 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Collocated_Synch_Request, + "$Id$") + +#include "tao/ORB_Core.h" + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Collocated_Synch_Request::~TP_Collocated_Synch_Request() +{ +} + + +void +TAO::CSD::TP_Collocated_Synch_Request::dispatch_i() +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->do_dispatch(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // We need to save off a copy of the exception. + this->exception_ = ACE_ANY_EXCEPTION._tao_duplicate(); + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + ACE_NEW (this->exception_ , + CORBA::UNKNOWN (CORBA::SystemException::_tao_minor_code + (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0), + CORBA::COMPLETED_MAYBE)); + } +#endif + ACE_ENDTRY; + + this->synch_helper_.dispatched(); +} + +void +TAO::CSD::TP_Collocated_Synch_Request::cancel_i() +{ + this->synch_helper_.cancelled(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h new file mode 100644 index 00000000000..cad5fa52bd5 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h @@ -0,0 +1,109 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Collocated_Synch_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_COLLOCATED_SYNCH_REQUEST_H +#define TAO_CSD_TP_COLLOCATED_SYNCH_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h" +#include "tao/Exception.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Collocated_Synch_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Collocated_Synch_Request> + TP_Collocated_Synch_Request_Handle; + + /** + * @class TP_Collocated_Synch_Request + * + * @brief Represents a "queue-able", synchronous, collocated, + * CORBA request. + * + * This kind request is the two-way or oneway(with SYNC_WITH_TARGET + * policy applied) collocated request. It is NOT cloned before + * enqueuing and the "enqueuing" thread will block until the request + * is dispatched/handled or cancelled. + */ + class TAO_CSD_TP_Export TP_Collocated_Synch_Request + : public TP_Corba_Request + { + public: + + /// Constructor. + TP_Collocated_Synch_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Collocated_Synch_Request(); + + /// Wait until the request has been dispatched (and completed), or + /// until it has been cancelled. + /// Returns true if the request has been dispatched, and returns + /// false if the request has been cancelled. + bool wait(ACE_ENV_SINGLE_ARG_DECL); + + + protected: + + /// Note that we do not override our base class implementation of + /// prepare_for_queue_i() (which does nothing), because we don't + /// need to clone the Server Request object. + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + + + private: + + /// Helper used to block and unblock the thread that invokes our + /// wait() method. + TP_Synch_Helper synch_helper_; + + /// Set to NULL initially, and will only be set thereafter if an + /// exception is raised from the dispatch() call on the server_request_. + CORBA::Exception* exception_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_COLLOCATED_SYNCH_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.inl new file mode 100644 index 00000000000..c92dfe4719f --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.inl @@ -0,0 +1,59 @@ +// -*- C++ -*- +// +// $Id$ + +#include "ace/Auto_Ptr.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Collocated_Synch_Request::TP_Collocated_Synch_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state) + : TP_Corba_Request(object_id, + poa, + operation, + servant, + servant_state, + server_request), + exception_(0) +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Collocated_Synch_Request::wait(ACE_ENV_SINGLE_ARG_DECL) +{ + bool dispatched = this->synch_helper_.wait_while_pending(); + + if (dispatched) + { + // Check to see if the dispatching caused an exception to be raised. + if (this->exception_ != 0) + { + // An exception was raised during the actual dispatching to + // the servant. We need to raise the exception to our caller, + // which is the thread that made the collocated request in the + // first place. + CORBA::Exception* ex = this->exception_; + this->exception_ = 0; + +#if defined (TAO_HAS_EXCEPTIONS) + ACE_Auto_Basic_Ptr<CORBA::Exception> ex_holder(ex); + ex->_raise (); +#else + ACE_TRY_ENV.exception (ex); +#endif /* ACE_HAS_EXCEPTIONS */ + } + } + + return dispatched; + +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.cpp new file mode 100644 index 00000000000..70f43c863c7 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.cpp @@ -0,0 +1,72 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Collocated_Synch_With_Server_Request, + "$Id$") + +#include "tao/Exception.h" + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Collocated_Synch_With_Server_Request::~TP_Collocated_Synch_With_Server_Request() +{ +} + + +void +TAO::CSD::TP_Collocated_Synch_With_Server_Request::prepare_for_queue_i() +{ + // NOTE: We *NEED* clone the TAO_ServerRequest for a collocated, + // one-way SYNC_WITH_SERVER request. This is because the + // calling thread is signalled just *before* the request is + // dispatched. It's (very) possible that the calling thread + // will destroy the underlying TAO_ServerRequest object while + // the request is dispatching to servant. This is why we make + // a clone - so that we have our own copy that won't be destroyed + // while we are using it. + this->do_clone(); +} + + +void +TAO::CSD::TP_Collocated_Synch_With_Server_Request::dispatch_i() +{ + // This is done *before* we do_dispatch(). + this->synch_helper_.dispatched(); + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->do_dispatch(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#endif + ACE_ENDTRY; +} + +void +TAO::CSD::TP_Collocated_Synch_With_Server_Request::cancel_i() +{ + this->synch_helper_.cancelled(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h new file mode 100644 index 00000000000..0b54d87ed06 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h @@ -0,0 +1,110 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Collocated_Synch_With_Server_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_COLLOCATED_SYNCH_WITH_SERVER_REQUEST_H +#define TAO_CSD_TP_COLLOCATED_SYNCH_WITH_SERVER_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Collocated_Synch_With_Server_Request; + typedef TAO_Intrusive_Ref_Count_Handle + <TP_Collocated_Synch_With_Server_Request> + TP_Collocated_Synch_With_Server_Request_Handle; + + /** + * @class TP_Collocated_Synch_With_Server_Request + * + * @brief Represents a "queue-able", one-way, collocated, CORBA + * request with a "Synch Scope" policy of SYNC_WITH_SERVER. + * + * This kind of request is one-way request with the SYNC_WITH_SERVER + * policy applied. It is cloned before enqueuing and the "enqueuing" + * thread will block until it is signalled by the TP_Task thread that + * will happen just before the request is dispatched or the request + * is cancelled. + */ + class TAO_CSD_TP_Export TP_Collocated_Synch_With_Server_Request + : public TP_Corba_Request + { + public: + + /// Constructor. + TP_Collocated_Synch_With_Server_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Collocated_Synch_With_Server_Request(); + + /// Wait until the request has been dispatched (but not completed), or + /// until it has been cancelled. Note that this will wait until just + /// *before* the request is dispatched by a worker thread. This is + /// different than the TP_Collocated_Synch_Request which waits until + /// just *after* the request is dispatched by a worker thread. + /// Returns true if the request has been dispatched, and returns + /// false if the request has been cancelled. + bool wait(ACE_ENV_SINGLE_ARG_DECL); + + + protected: + + /// Prepare this TP_Collocated_Synch_With_Server_Request object to be + /// placed into the request queue. This will cause the underlying + /// TAO_ServerRequest object to be cloned. + virtual void prepare_for_queue_i(); + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + + + private: + + /// Helper used to block and unblock the thread that invokes our + /// wait() method. + TP_Synch_Helper synch_helper_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_COLLOCATED_SYNCH_WITH_SERVER_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.inl new file mode 100644 index 00000000000..1ff0407d6fd --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.inl @@ -0,0 +1,32 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Collocated_Synch_With_Server_Request::TP_Collocated_Synch_With_Server_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state) + : TP_Corba_Request(object_id, + poa, + operation, + servant, + servant_state, + server_request) +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Collocated_Synch_With_Server_Request::wait(ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + return this->synch_helper_.wait_while_pending(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.cpp new file mode 100644 index 00000000000..ff2aa4709c9 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Corba_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Corba_Request::~TP_Corba_Request() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.h new file mode 100644 index 00000000000..2e0a6c7996f --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.h @@ -0,0 +1,103 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Corba_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CORBA_REQUEST_H +#define TAO_CSD_TP_CORBA_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" +#include "tao/CSD_Framework/CSD_FW_Server_Request_Wrapper.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/PortableServer/Servant_Base.h" +#include "ace/SString.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Corba_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Corba_Request> + TP_Corba_Request_Handle; + + /** + * @class TP_Corba_Request + * + * @brief Base class for "queue-able" CORBA requests. + * + * TBD - Add description + * + */ + class TAO_CSD_TP_Export TP_Corba_Request : public TP_Request + { + public: + + /// Virtual Destructor. + virtual ~TP_Corba_Request(); + + + protected: + + /// Constructor. + TP_Corba_Request(const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state, + TAO_ServerRequest& server_request); + + /// Delegate to the FW_Server_Request_Wrapper clone() method. + void do_clone(); + + /// Delegate to the FW_Server_Request_Wrapper dispatch() method. + void do_dispatch(ACE_ENV_SINGLE_ARG_DECL); + + /// Delegate to the FW_Server_Request_Wrapper cancel() method. + void do_cancel(); + + + private: + + /// The ObjectId for the target servant. + PortableServer::ObjectId object_id_; + + /// The POA. + PortableServer::POA_var poa_; + + /// The name of the IDL operation. + ACE_CString operation_; + + /// The TAO_ServerRequest object wrapper. + FW_Server_Request_Wrapper server_request_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_CORBA_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.inl new file mode 100644 index 00000000000..18d273c34a5 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Corba_Request.inl @@ -0,0 +1,45 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Corba_Request::TP_Corba_Request + (const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state, + TAO_ServerRequest& server_request) + : TP_Request(servant,servant_state), + object_id_(object_id), + operation_(operation), + server_request_(server_request) +{ + this->poa_ = PortableServer::POA::_duplicate(poa); +} + +ACE_INLINE +void +TAO::CSD::TP_Corba_Request::do_clone() +{ + this->server_request_.clone(); +} + +ACE_INLINE +void +TAO::CSD::TP_Corba_Request::do_dispatch(ACE_ENV_SINGLE_ARG_DECL) +{ + this->server_request_.dispatch(this->servant() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +ACE_INLINE +void +TAO::CSD::TP_Corba_Request::do_cancel() +{ + this->server_request_.cancel(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.cpp new file mode 100644 index 00000000000..6765f02eff6 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.cpp @@ -0,0 +1,52 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Custom_Asynch_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Custom_Asynch_Request::~TP_Custom_Asynch_Request() +{ +} + + +void +TAO::CSD::TP_Custom_Asynch_Request::dispatch_i() +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->execute_op(); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#endif + ACE_ENDTRY; +} + +void +TAO::CSD::TP_Custom_Asynch_Request::cancel_i() +{ + this->cancel_op(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h new file mode 100644 index 00000000000..aed6d8f88f5 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h @@ -0,0 +1,79 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Custom_Asynch_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CUSTOM_ASYNCH_REQUEST_H +#define TAO_CSD_TP_CUSTOM_ASYNCH_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Request.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Custom_Asynch_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Custom_Asynch_Request> + TP_Custom_Asynch_Request_Handle; + + /** + * @class TP_Custom_Asynch_Request + * + * @brief Base class for "queue-able", Custom (non-CORBA), + * Synchronous requests. + * + * TBD - Add description + * + */ + class TAO_CSD_TP_Export TP_Custom_Asynch_Request + : public TP_Custom_Request + { + public: + + /// Constructor. + TP_Custom_Asynch_Request(TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Custom_Asynch_Request(); + + + protected: + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_CUSTOM_ASYNCH_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.inl new file mode 100644 index 00000000000..4242ce41e5d --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.inl @@ -0,0 +1,15 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Custom_Asynch_Request::TP_Custom_Asynch_Request + (TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state) + : TP_Custom_Request(op,servant_state) +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.cpp new file mode 100644 index 00000000000..fbe24fc0c53 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Custom_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Custom_Request::~TP_Custom_Request() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.h new file mode 100644 index 00000000000..c4c5bcbb0b4 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.h @@ -0,0 +1,81 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Custom_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CUSTOM_REQUEST_H +#define TAO_CSD_TP_CUSTOM_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.h" +#include "tao/PortableServer/Servant_Base.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Custom_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Custom_Request> + TP_Custom_Request_Handle; + + /** + * @class TP_Custom_Request + * + * @brief Base class for "queue-able" Custom (non-CORBA) requests. + * + * TBD - Add description + * + */ + class TAO_CSD_TP_Export TP_Custom_Request : public TP_Request + { + public: + + /// Virtual Destructor. + virtual ~TP_Custom_Request(); + + + protected: + + /// Constructor. + TP_Custom_Request(TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state); + + void execute_op(); + void cancel_op(); + + + private: + + TP_Custom_Request_Operation_Handle op_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_CUSTOM_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.inl new file mode 100644 index 00000000000..c63696ab628 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request.inl @@ -0,0 +1,48 @@ +// -*- C++ - +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Custom_Request::TP_Custom_Request + (TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state) + : TP_Request(op->servant(),servant_state), + op_(op, false) +{ +} + + +ACE_INLINE +void +TAO::CSD::TP_Custom_Request::execute_op() +{ + this->op_->execute(); + + // Now drop the reference to the custom operation object. + // This is necessary so that custom operation objects can be created + // on the stack for synchronous custom requests. If we do not do this, + // then there is a race condition which could result in the stack-created + // custom operation object having a reference count of 2 when it falls + // out of scope (and destructs). Our op_ data member would be the one + // that held the other reference, and when our op_ data member destructs, + // it attempts to perform a _remove_ref() on the underlying operation + // object - which has already been destructed! Thus, we reset the op_ + // data member here to the 'nil' state - causing the _remove_ref() to + // be performed now. + this->op_ = 0; +} + + +ACE_INLINE +void +TAO::CSD::TP_Custom_Request::cancel_op() +{ + this->op_->cancel(); + + // See comments in the execute_op() method. + this->op_ = 0; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.cpp new file mode 100644 index 00000000000..e2702b6c5a5 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Custom_Request_Operation, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Custom_Request_Operation::~TP_Custom_Request_Operation() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.h new file mode 100644 index 00000000000..e3b85853f9f --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.h @@ -0,0 +1,99 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Custom_Request_Operation.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CUSTOM_REQUEST_OPERATION_H +#define TAO_CSD_TP_CUSTOM_REQUEST_OPERATION_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Intrusive_Ref_Count_Base_T.h" +#include "tao/Intrusive_Ref_Count_Handle_T.h" +#include "tao/PortableServer/Servant_Base.h" +#include "ace/Synch.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Custom_Request_Operation; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Custom_Request_Operation> + TP_Custom_Request_Operation_Handle; + + + /** + * @class TP_Custom_Request_Operation + * + * @brief Base class for all custom request operations. + * + * @note The caller that creates a new TP_Custom_Request_Operation + * object needs call _add_ref () on the servant before + * constructing it and the TP_Custom_Request_Operation object + * is responsible to decrement the reference count. + * + * TBD - Add description + */ + class TAO_CSD_TP_Export TP_Custom_Request_Operation + : public TAO_Intrusive_Ref_Count_Base<ACE_SYNCH_MUTEX> + { + public: + + /// Virtual Destructor. + virtual ~TP_Custom_Request_Operation(); + + /// Invoked by a worker thread to perform the operation. + void execute(); + + /// Invoked when the request has been cancelled. + void cancel(); + + /// Used by the TP_Strategy to obtain the target servant in order + /// to construct the custom request object. Returns the servant as + /// an "in" argument (the caller does not get a new 'copy'). This + /// is useful for chaining. + PortableServer::Servant servant(); + + + protected: + + /// Constructor. + TP_Custom_Request_Operation(PortableServer::Servant servant); + + virtual void execute_i() = 0; + virtual void cancel_i() = 0; + + + private: + + PortableServer::ServantBase_var servant_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_CUSTOM_REQUEST_OPERATION_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.inl new file mode 100644 index 00000000000..6a05c5df7ee --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Request_Operation.inl @@ -0,0 +1,51 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Custom_Request_Operation::TP_Custom_Request_Operation + (PortableServer::Servant servant) +: servant_ (servant) +{ + // This try-catch block is not really necessary for current implementation + // since the _add_ref does not throw exception, but we have to add it to + // satisfy the non-exception builds. If _add_ref really throws an exception + // then this constructor needs deal with the exception. + ACE_TRY_NEW_ENV + { + this->servant_->_add_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL + { + } + ACE_ENDTRY; +} + + +ACE_INLINE +void +TAO::CSD::TP_Custom_Request_Operation::execute() +{ + this->execute_i(); +} + + +ACE_INLINE +void +TAO::CSD::TP_Custom_Request_Operation::cancel() +{ + this->cancel_i(); +} + + +ACE_INLINE +PortableServer::Servant +TAO::CSD::TP_Custom_Request_Operation::servant() +{ + return this->servant_.in(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.cpp new file mode 100644 index 00000000000..287e516d7b4 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.cpp @@ -0,0 +1,35 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Custom_Synch_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Custom_Synch_Request::~TP_Custom_Synch_Request() +{ +} + + +void +TAO::CSD::TP_Custom_Synch_Request::dispatch_i() +{ + this->execute_op(); + this->synch_helper_.dispatched(); +} + + +void +TAO::CSD::TP_Custom_Synch_Request::cancel_i() +{ + this->cancel_op(); + this->synch_helper_.cancelled(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h new file mode 100644 index 00000000000..02d66b20597 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h @@ -0,0 +1,92 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Custom_Synch_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_CUSTOM_SYNCH_REQUEST_H +#define TAO_CSD_TP_CUSTOM_SYNCH_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Custom_Synch_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Custom_Synch_Request> + TP_Custom_Synch_Request_Handle; + + /** + * @class TP_Custom_Synch_Request + * + * @brief Base class for "queue-able", Custom (non-CORBA), + * Synchronous requests. + * + * TBD - Add description + * + */ + class TAO_CSD_TP_Export TP_Custom_Synch_Request : public TP_Custom_Request + { + public: + + /// Constructor. + TP_Custom_Synch_Request(TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Custom_Synch_Request(); + + /// Wait until the request has been executed (and completes), or + /// until it has been cancelled. Returns true if the request has + /// been executed/completed, and returns false if the request has + /// been cancelled. + bool wait(); + + + protected: + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + + + private: + + /// Helper used to block and unblock the thread that invokes our + /// wait() method. + TP_Synch_Helper synch_helper_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_CUSTOM_SYNCH_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.inl new file mode 100644 index 00000000000..cec37afcdac --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.inl @@ -0,0 +1,23 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Custom_Synch_Request::TP_Custom_Synch_Request + (TP_Custom_Request_Operation* op, + TP_Servant_State* servant_state) + : TP_Custom_Request(op,servant_state) +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Custom_Synch_Request::wait() +{ + return this->synch_helper_.wait_while_pending(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.cpp new file mode 100644 index 00000000000..aeb5454c252 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.cpp @@ -0,0 +1,53 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h" + +ACE_RCSID (CSD_TP, + Dispatchable_Visitor, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Dispatchable_Visitor::~TP_Dispatchable_Visitor() +{ +} + + +bool +TAO::CSD::TP_Dispatchable_Visitor::visit_request(TP_Request* request, + bool& remove_flag) +{ + // Ask the request object if the target servant is "ready" to accept + // a request being dispatched to it. + if (request->is_ready()) + { + // Ok. This request is a "dispatchable" request. It is what we were + // hoping to find. + + // Save a copy of the request in our handle data member. + request->_add_ref(); + this->request_ = request; + + // Make sure that the queue will extract the request from the queue + // upon our return. + remove_flag = true; + + // Mark the target servant as being "busy". + request->mark_as_busy(); + + // Stop the visitation by returning false. + return false; + } + + // The target servant object of the request isn't ready, so the request + // is not considered to be a "dispatchable" request. + + // Return true to visit the next request in the queue (if there is one). + return true; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h new file mode 100644 index 00000000000..01aea59828a --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h @@ -0,0 +1,94 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Dispatchable_Visitor.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_DISPATCHABLE_VISITOR_H +#define TAO_CSD_TP_DISPATCHABLE_VISITOR_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h" +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + /** + * @class TP_Dispatchable_Visitor + * + * @brief Used to extract the first "dispatchable" request from the queue. + * + * An instance of this visitor class is used by one of the worker + * threads to locate the first "dispatchable" request in the queue. If + * such a request is visited, then this visitor will save a "copy" of + * the request, indicate that the request should be removed from the + * queue, and indicate that visitation should stop. + * + * An method is provided to retrieve a "copy" of the "dispatchable" + * request that was saved off during visitation. A nil reference + * (ie, a NULL pointer) will be returned if no dispatchable request + * was found. + * + */ + class TAO_CSD_TP_Export TP_Dispatchable_Visitor : public TP_Queue_Visitor + { + public: + + /// Default Constructor. + TP_Dispatchable_Visitor(); + + /// Virtual Destructor. + virtual ~TP_Dispatchable_Visitor(); + + /// Reset this visitor object in order to re-use it for another + /// visitation of the request queue. This sets the vistor's "result" + /// (the TP_Request* data member) to its default value (a nil handle). + void reset(); + + /// Returns true to continue visitation. Returns false to stop + /// visitation. Sets the remove_flag to true if the request should + /// be removed from the queue as a result of the visit. Leaves the + /// remove_flag alone otherwise. + virtual bool visit_request(TP_Request* request, bool& remove_flag); + + /// This returns a "copy" of the located request, or 0 if no request + /// was located. + TP_Request* request(); + + + private: + + /// A handle to the located request. + TP_Request_Handle request_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_DISPATCHABLE_VISITOR_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.inl new file mode 100644 index 00000000000..c2c16d34e26 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.inl @@ -0,0 +1,31 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Dispatchable_Visitor::TP_Dispatchable_Visitor() +{ +} + + +ACE_INLINE +void +TAO::CSD::TP_Dispatchable_Visitor::reset() +{ + // Set the handle to 0 to have it release any request it may currently + // be referencing. + this->request_ = 0; +} + + +ACE_INLINE +TAO::CSD::TP_Request* +TAO::CSD::TP_Dispatchable_Visitor::request() +{ + TP_Request_Handle handle(this->request_.in(), false); + return handle._retn(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Export.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Export.h new file mode 100644 index 00000000000..1eeb556e4f8 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Export.h @@ -0,0 +1,58 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl -s TAO_CSD_TP +// ------------------------------ +#ifndef TAO_CSD_TP_EXPORT_H +#define TAO_CSD_TP_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (TAO_CSD_TP_HAS_DLL) +# define TAO_CSD_TP_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && TAO_CSD_TP_HAS_DLL */ + +#if !defined (TAO_CSD_TP_HAS_DLL) +# define TAO_CSD_TP_HAS_DLL 1 +#endif /* ! TAO_CSD_TP_HAS_DLL */ + +#if defined (TAO_CSD_TP_HAS_DLL) && (TAO_CSD_TP_HAS_DLL == 1) +# if defined (TAO_CSD_TP_BUILD_DLL) +# define TAO_CSD_TP_Export ACE_Proper_Export_Flag +# define TAO_CSD_TP_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define TAO_CSD_TP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* TAO_CSD_TP_BUILD_DLL */ +# define TAO_CSD_TP_Export ACE_Proper_Import_Flag +# define TAO_CSD_TP_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define TAO_CSD_TP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* TAO_CSD_TP_BUILD_DLL */ +#else /* TAO_CSD_TP_HAS_DLL == 1 */ +# define TAO_CSD_TP_Export +# define TAO_CSD_TP_SINGLETON_DECLARATION(T) +# define TAO_CSD_TP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* TAO_CSD_TP_HAS_DLL == 1 */ + +// Set TAO_CSD_TP_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (TAO_CSD_TP_NTRACE) +# if (ACE_NTRACE == 1) +# define TAO_CSD_TP_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define TAO_CSD_TP_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !TAO_CSD_TP_NTRACE */ + +#if (TAO_CSD_TP_NTRACE == 1) +# define TAO_CSD_TP_TRACE(X) +#else /* (TAO_CSD_TP_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define TAO_CSD_TP_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (TAO_CSD_TP_NTRACE == 1) */ + +#endif /* TAO_CSD_TP_EXPORT_H */ + +// End of auto generated file. diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.cpp new file mode 100644 index 00000000000..cb7b0ecf8e1 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.cpp @@ -0,0 +1,127 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Queue.h" +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Queue, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Queue.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +void +TAO::CSD::TP_Queue::put(TP_Request* request) +{ + // The request is passed in as an "in" argument, and we would like to + // hold on to a "copy" within the queue (the linked list). We will + // perform an _add_ref() on the request now to make the queue's "copy". + request->_add_ref(); + + if (this->tail_ == 0) + { + // The tail_ is a NULL pointer only when the queue is empty. + // Make the request be the only element in the queue. + this->head_ = this->tail_ = request; + + // Make sure the request's prev_ and next_ pointers are set to NULL. + request->prev_ = request->next_ = 0; + } + else + { + // There is at least one request already in the queue. "Append" the + // supplied request object to the end of the queue. + request->prev_ = this->tail_; + request->next_ = 0; + this->tail_->next_ = request; + this->tail_ = request; + } +} + + +void +TAO::CSD::TP_Queue::accept_visitor(TP_Queue_Visitor& visitor) +{ + TP_Request* cur = this->head_; + + while (cur != 0) + { + TP_Request* prev = cur->prev_; + TP_Request* next = cur->next_; + + // Pass the current request to the visitor. Also pass-in a reference + // to the remove_from_queue flag. The visitor may decide that it + // wants to keep the current request for itself, and desires that the + // request be (surgically) removed from the queue. The visitor also + // gets to decide, via its return value, whether or not visitation + // should continue (or cease to continue). + bool remove_from_queue = false; + + bool continue_visitation = visitor.visit_request(cur,remove_from_queue); + + if (remove_from_queue) + { + // Create a local handle to release the current request once + // the handle falls out of scope. We need to do this because the + // queue "owns" a "copy" of each request in the queue. + TP_Request_Handle handle = cur; + + if (this->head_ == cur) + { + // The current request is at the front (the head_) of the queue. + + // Move the head_ to the next request in the queue. + this->head_ = next; + + if (this->head_ == 0) + { + // Not only was the current request at the front of the + // queue - it was the *only* request in the queue. + // Update the tail_ pointer now that the queue is empty. + this->tail_ = 0; + } + else + { + // Set the (new) head_ request's prev_ pointer to be NULL. + this->head_->prev_ = 0; + } + } + else if (this->tail_ == cur) + { + // The current request is not at the front of the queue, + // but it is at the back of the queue. This implies that + // the queue currently contains at least two requests - + // the current request (cur), and the previous request (prev). + // The point is that we can now assume that the 'prev' pointer + // is never NULL in this case. + this->tail_ = prev; + this->tail_->next_ = 0; + } + else + { + // The current request is not at the front or at the back. + // This implies that there are at least three requests in + // the queue. We can assume that the 'next' and 'prev' + // pointers are never NULL in this case. + prev->next_ = next; + next->prev_ = prev; + } + } + + if (!continue_visitation) + { + // The visitor doesn't want to procede with any further visitation. + // Break out of the visitation loop now. + break; + } + + // Move on to the next request in the queue. + cur = next; + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.h new file mode 100644 index 00000000000..9912c0e613a --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.h @@ -0,0 +1,101 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Queue.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_QUEUE_H +#define TAO_CSD_TP_QUEUE_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Versioned_Namespace.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Request; + class TP_Queue_Visitor; + + /** + * @class TP_Queue + * + * @brief Queue of servant requests that need to be dispatched. + * + * This is the queue of pending servant requests that is "owned" + * by a TP_Strategy object. When an ORB thread dispatches + * a servant request to the strategy object, it will create the + * appropriate (subclass of) TP_Request object to "wrap" + * the servant request in a "queue-friendly" wrapper. The ORB thread + * will then place the TP_Request object on to the queue. Note that + * this scenario pertains to what are being called "remote requests". + * There are other scenarios in which other types of requests can + * get added to this queue. + * + * The strategy object will employ a set of worker threads that are + * responsible for "servicing" the servant requests in the queue. + * + * Note: In the future, support will be added to allow the client + * application inject "custom" TP_Request objects into + * a TP_Strategy object, causing them to be placed in + * the queue. + */ + class TAO_CSD_TP_Export TP_Queue + { + public: + + /// Default Constructor. + TP_Queue(); + + /// Destructor. + ~TP_Queue(); + + /// Place a request at the end of the queue. + void put(TP_Request* request); + + /// Returns true if the queue is empty. Returns false otherwise. + bool is_empty() const; + + /// Visitors will visit each request in the queue, from front to back, + /// and have the ability to stop visiting at any time (ie, before + /// visiting every request). + void accept_visitor(TP_Queue_Visitor& visitor); + + + private: + + /// The request at the front of the queue. + TP_Request* head_; + + /// The request at the end of the queue. + TP_Request* tail_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_QUEUE_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.inl new file mode 100644 index 00000000000..9e26d265f10 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue.inl @@ -0,0 +1,29 @@ +// -*- C++ -*- +// +// $Id$ + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Queue::TP_Queue() + : head_(0), + tail_(0) +{ +} + + +ACE_INLINE +TAO::CSD::TP_Queue::~TP_Queue() +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Queue::is_empty() const +{ + return (this->head_ == 0); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.cpp new file mode 100644 index 00000000000..3ede8818521 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Queue_Visitor, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Queue_Visitor::~TP_Queue_Visitor() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h new file mode 100644 index 00000000000..f86467ae64d --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.h @@ -0,0 +1,79 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Queue_Visitor.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_QUEUE_VISITOR_H +#define TAO_CSD_TP_QUEUE_VISITOR_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Versioned_Namespace.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Request; + + + /** + * @class TP_Queue_Visitor + * + * @brief Base class for vistors of the elements in the TP_Queue. + * + * Provides a way to perform thread-safe iteration over the + * TP_Request objects contained within a TP_Queue object. + * + * This also provides a means to encapsulate each distinct algorithm + * within a distinct subclass of TP_Queue_Visitor. + * + */ + class TAO_CSD_TP_Export TP_Queue_Visitor + { + public: + + /// Virtual Destructor. + virtual ~TP_Queue_Visitor(); + + /// Returns true to continue visitation. Return false to stop + /// visitation. Sets the remove_flag to true if the request should + /// be removed from the queue as a result of the visit. Leaves the + /// remove_flag alone otherwise. + virtual bool visit_request(TP_Request* request, bool& remove_flag) = 0; + + + protected: + + /// Default Constructor. + TP_Queue_Visitor(); + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_QUEUE_VISITOR_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.inl new file mode 100644 index 00000000000..36bd9fc5608 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Queue_Visitor.inl @@ -0,0 +1,12 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Queue_Visitor::TP_Queue_Visitor() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.cpp new file mode 100644 index 00000000000..45529af5802 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.cpp @@ -0,0 +1,60 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Remote_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Remote_Request::~TP_Remote_Request() +{ +} + + +void +TAO::CSD::TP_Remote_Request::prepare_for_queue_i() +{ + this->do_clone(); +} + + +void +TAO::CSD::TP_Remote_Request::dispatch_i() +{ + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + this->do_dispatch(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#if defined (TAO_HAS_EXCEPTIONS) + ACE_CATCHALL + { + // Eat these. We probably should log these, but since we have already + // unblocked the requesting thread there is no point in saving it or + // doing anything with it. + } +#endif + ACE_ENDTRY; +} + + +void +TAO::CSD::TP_Remote_Request::cancel_i() +{ + this->do_cancel(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.h new file mode 100644 index 00000000000..57303e81e91 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.h @@ -0,0 +1,102 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Remote_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_REMOTE_REQUEST_H +#define TAO_CSD_TP_REMOTE_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Corba_Request.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Remote_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Remote_Request> + TP_Remote_Request_Handle; + + /** + * @class TP_Remote_Request + * + * @brief Represents a "queue-able", remote, CORBA request. + * Both syncronous and asynchronous remote CORBA requests + * are represented by the class. + * + * TBD - Go over the following comments and clean up. + * + * Since this class derives from the TP_Request class, it can be + * added to a TP_Queue (ie, it is a "queueable" request). It + * represents a servant request that has been made by a remote + * CORBA client (as opposed to a collocated CORBA client). The + * term "CORBA client" is being used here to distinguish CORBA + * servant requests (those made thru a CORBA object reference), and + * "Custom" servant requests that can be "dispatched" to the strategy + * directly by the client application code (ie, not thru a CORBA + * object reference). Thus, there are "CORBA clients" and + * "Direct clients". + * + * In summary, this class represents a servant request made when a + * remote client invokes a method on a CORBA object reference. + * + */ + class TAO_CSD_TP_Export TP_Remote_Request : public TP_Corba_Request + { + public: + + /// Constructor. + TP_Remote_Request(TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state); + + /// Virtual Destructor. + virtual ~TP_Remote_Request(); + + + protected: + + /// Prepare this TP_Remote_Request object to be placed into the + /// request queue. This will cause the underlying TAO_ServerRequest + /// object to be cloned. + virtual void prepare_for_queue_i(); + + /// Dispatch the request to the servant. + virtual void dispatch_i(); + + /// Cancel the request. + virtual void cancel_i(); + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_REMOTE_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.inl new file mode 100644 index 00000000000..14a37aa44a9 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Remote_Request.inl @@ -0,0 +1,24 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Remote_Request::TP_Remote_Request + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant, + TP_Servant_State* servant_state) + : TP_Corba_Request(object_id, + poa, + operation, + servant, + servant_state, + server_request) +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Request.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.cpp new file mode 100644 index 00000000000..e1926f93e8b --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.cpp @@ -0,0 +1,27 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Request, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Request.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Request::~TP_Request() +{ +} + + +void +TAO::CSD::TP_Request::prepare_for_queue_i() +{ + // Default implementation is to do nothing. Subclasses can provide + // their own implementation if needed. +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Request.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.h new file mode 100644 index 00000000000..104d741d3b9 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.h @@ -0,0 +1,136 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Request.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_REQUEST_H +#define TAO_CSD_TP_REQUEST_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Servant_State.h" +#include "tao/PortableServer/Servant_Base.h" +#include "tao/Intrusive_Ref_Count_Base_T.h" +#include "tao/Intrusive_Ref_Count_Handle_T.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Request; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Request> TP_Request_Handle; + + class TP_Queue; + + + /** + * @class TP_Request + * + * @brief Base class for "queue-able" requests. + * + * This class serves as the abstract base class for all types of + * "servant requests" that can be inserted into a TP_Queue + * object. + */ + class TAO_CSD_TP_Export TP_Request + : public TAO_Intrusive_Ref_Count_Base<ACE_SYNCH_MUTEX> + { + public: + + /// Virtual Destructor. + virtual ~TP_Request(); + + /// Prepare the request to be placed into the request queue. + void prepare_for_queue(); + + /// Invoked to dispatch the request to the servant. + void dispatch(); + + /// Invoked to cancel the request. + void cancel(); + + /// Is the target servant ready to accept a request? + bool is_ready() const; + + /// Mark the target servant as being busy. + void mark_as_busy(); + + /// Mark the target servant as being ready (ie, not busy). + void mark_as_ready(); + + /// This method returns true if this request targets the supplied + /// servant object. + bool is_target(PortableServer::Servant servant); + + + protected: + + /// Constructor. + TP_Request(PortableServer::Servant servant, + TP_Servant_State* servant_state); + + /// Accessor for the servant. Does not return a new (ref counted) + /// reference! This is used for chaining. + PortableServer::Servant servant(); + + /// The subclass knows if it needs to do anything in preparation + /// of being placed into the request queue. The default implementation + /// does nothing, so only subclasses that have something to do + /// need to provide their own implementation. + virtual void prepare_for_queue_i(); + + /// The subclass knows how to carry out its own way of dispatching + /// the request to the servant. + virtual void dispatch_i() = 0; + + /// Ask the subclass to perform its duties to carry out the cancellation. + virtual void cancel_i() = 0; + + + private: + + /// The TP_Queue class is our friend since it needs access to + /// the prev_ and next_ (private) data members. + friend class TP_Queue; + + /// The previous TP_Request object (in the queue). + TP_Request* prev_; + + /// The next TP_Request object (in the queue). + TP_Request* next_; + + /// Reference to the servant object. + PortableServer::ServantBase_var servant_; + + /// Reference to the servant "state" object (contains the busy flag). + TP_Servant_State::HandleType servant_state_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Request.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_REQUEST_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Request.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.inl new file mode 100644 index 00000000000..341a449091e --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Request.inl @@ -0,0 +1,111 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Request::TP_Request(PortableServer::Servant servant, + TP_Servant_State* servant_state) + : prev_(0), + next_(0), + servant_ (servant), + servant_state_(servant_state, false) +{ + // This try-catch block is not really necessary for current implementation + // since the _add_ref does not throw exception, but we have to add it to + // satisfy the non-exception builds. If _add_ref really throws an exception + // then this constructor needs deal with the exception. + ACE_TRY_NEW_ENV + { + this->servant_->_add_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHALL + { + } + ACE_ENDTRY; +} + + +ACE_INLINE +void +TAO::CSD::TP_Request::prepare_for_queue() +{ + this->prepare_for_queue_i(); +} + + +ACE_INLINE +PortableServer::Servant +TAO::CSD::TP_Request::servant() +{ + // Used for chaining so we do not return a new "copy". + return this->servant_.in(); +} + + +ACE_INLINE +bool +TAO::CSD::TP_Request::is_ready() const +{ + if (this->servant_state_.is_nil()) + { + // This means that the serialization of servants is off. + // We always answer true here to indicate that the servant is + // never busy. + return true; + } + + return !this->servant_state_->busy_flag(); +} + + +ACE_INLINE +void +TAO::CSD::TP_Request::mark_as_busy() +{ + if (!this->servant_state_.is_nil()) + { + this->servant_state_->busy_flag(true); + } +} + + +ACE_INLINE +void +TAO::CSD::TP_Request::mark_as_ready() +{ + if (!this->servant_state_.is_nil()) + { + this->servant_state_->busy_flag(false); + } +} + + +ACE_INLINE +bool +TAO::CSD::TP_Request::is_target(PortableServer::Servant servant) +{ + // Compare pointers. Return true only if these are the exact same object. + return (servant == this->servant_.in()); +} + + +ACE_INLINE +void +TAO::CSD::TP_Request::dispatch() +{ + this->dispatch_i(); + +} + + +ACE_INLINE +void +TAO::CSD::TP_Request::cancel() +{ + this->cancel_i(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.cpp new file mode 100644 index 00000000000..28fbc8da7ba --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Servant_State.h" + +ACE_RCSID (CSD_TP, + Servant_State, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Servant_State.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Servant_State::~TP_Servant_State() +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.h new file mode 100644 index 00000000000..dbfebedb64c --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.h @@ -0,0 +1,92 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Servant_State.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_SERVANT_STATE_H +#define TAO_CSD_TP_SERVANT_STATE_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/Intrusive_Ref_Count_Base_T.h" +#include "tao/Intrusive_Ref_Count_Handle_T.h" +#include "ace/Synch.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + /** + * @class TP_Servant_State + * + * @brief Maintains state information for a particular servant. + * + * This TP_Servant_State class is an intrusively reference-counted + * class. This allows it to be held in a "smart pointer" (aka, handle) + * object that will manage the reference-counting automagically. + * + * One TP_Servant_State object is created for each servant object for + * which a request is to be dispatched. The servant state objects are + * held (via smart pointers) in a TP_Servant_State_Map object. In turn, + * the TP_Servant_State_Map object is a data member of the TP_Stategy + * class. Each request placed on to the request queue will hold a + * reference (via a smart pointer) to the servant state object. + * + * Currently, the only "state" info held in this TP_Servant_State class + * is the servant's busy flag. + * + */ + class TAO_CSD_TP_Export TP_Servant_State + : public TAO_Intrusive_Ref_Count_Base<ACE_SYNCH_MUTEX> + { + public: + + /// Handle Type (aka, Smart Pointer Type). + typedef TAO_Intrusive_Ref_Count_Handle<TP_Servant_State> HandleType; + + /// Default Constructor. + TP_Servant_State(); + + /// Virtual Destructor. + virtual ~TP_Servant_State(); + + /// Accessor for the servant busy flag. + bool busy_flag() const; + + /// Mutator for the servant busy flag. + void busy_flag(bool new_value); + + private: + + /// The servant's current "busy" state (true == busy, false == not busy) + bool busy_flag_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Servant_State.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_SERVANT_STATE_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.inl new file mode 100644 index 00000000000..5ef996bcb6e --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State.inl @@ -0,0 +1,29 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Servant_State::TP_Servant_State() + : busy_flag_(false) +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Servant_State::busy_flag() const +{ + return this->busy_flag_; +} + + +ACE_INLINE +void +TAO::CSD::TP_Servant_State::busy_flag(bool new_value) +{ + this->busy_flag_ = new_value; +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.cpp new file mode 100644 index 00000000000..846f3a6e723 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.cpp @@ -0,0 +1,11 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.h" + +ACE_RCSID (CSD_TP, + Servant_State_Map, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.inl" +#endif /* ! __ACE_INLINE__ */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.h new file mode 100644 index 00000000000..e6726098b72 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.h @@ -0,0 +1,95 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Servant_State_Map.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_SERVANT_STATE_MAP_H +#define TAO_CSD_TP_SERVANT_STATE_MAP_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_ThreadPool/CSD_TP_Servant_State.h" +#include "tao/PortableServer/PortableServer.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Synch.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + /** + * @class TP_Servant_State_Map + * + * @brief Map of Servant_State objects - one per servant. + * + * A TP_Stategy object holds an instance of a TP_Servant_State_Map object + * as a (held-by-value) data member. The strategy uses this map to + * find or create the TP_Servant_State object for a particular servant + * object. + * + */ + class TAO_CSD_TP_Export TP_Servant_State_Map + { + public: + + /// Default Constructor. + TP_Servant_State_Map(); + + /// Destructor. + ~TP_Servant_State_Map(); + + /// Accessor for the servant busy flag. + TP_Servant_State* find(PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + /// Insert the servant to map. + void insert(PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + /// Remove the servant from map. + void remove(PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + + private: + + /// Underlying Map Type - Hash-Based - + /// Key Type: void*, Value Type: TP_Servant_State::HandleType + typedef ACE_Hash_Map_Manager_Ex<void*, + TP_Servant_State::HandleType, + ACE_Hash<void*>, + ACE_Equal_To<void*>, + ACE_SYNCH_MUTEX> MapType; + + /// The underlying map of servant state objects. + MapType map_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_SERVANT_STATE_MAP_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.inl new file mode 100644 index 00000000000..1faeb7ad536 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.inl @@ -0,0 +1,70 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Servant_State_Map::TP_Servant_State_Map() +{ +} + + +ACE_INLINE +TAO::CSD::TP_Servant_State_Map::~TP_Servant_State_Map() +{ +} + + +ACE_INLINE +TAO::CSD::TP_Servant_State* +TAO::CSD::TP_Servant_State_Map::find(PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + void* key = servant; + + TP_Servant_State::HandleType value; + + if (this->map_.find(key, value) != 0) + { + ACE_THROW_RETURN (PortableServer::POA::ServantNotActive (), 0); + } + + return value._retn(); +} + + +ACE_INLINE +void +TAO::CSD::TP_Servant_State_Map::insert(PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + void* key = servant; + + TP_Servant_State::HandleType value = new TP_Servant_State (); + + int result = this->map_.bind(key, value); + + if (result == 1) + { + ACE_THROW (PortableServer::POA::ServantAlreadyActive ()); + } + + ACE_ASSERT (result == 0); +} + + +ACE_INLINE +void +TAO::CSD::TP_Servant_State_Map::remove(PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + void* key = servant; + + if (this->map_.unbind(key) == -1) + { + ACE_THROW (PortableServer::POA::ServantNotActive ()); + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.cpp new file mode 100644 index 00000000000..1a9c86ec8da --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.cpp @@ -0,0 +1,289 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Strategy.h" +#include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h" +#include "ace/Trace.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Strategy, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Strategy.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Strategy::~TP_Strategy() +{ +} + + + +TAO::CSD::TP_Strategy::CustomRequestOutcome +TAO::CSD::TP_Strategy::custom_synch_request(TP_Custom_Request_Operation* op + ACE_ENV_ARG_DECL) +{ + TP_Servant_State::HandleType servant_state = + this->get_servant_state(op->servant() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (REQUEST_REJECTED); + + TP_Custom_Synch_Request_Handle request = new + TP_Custom_Synch_Request(op, servant_state.in()); + + if (!this->task_.add_request(request.in())) + { + // The request was rejected by the task. + return REQUEST_REJECTED; + } + + // Now we wait until the request is handled (executed or cancelled). + return (request->wait()) ? REQUEST_EXECUTED : REQUEST_CANCELLED; +} + + +TAO::CSD::TP_Strategy::CustomRequestOutcome +TAO::CSD::TP_Strategy::custom_asynch_request(TP_Custom_Request_Operation* op + ACE_ENV_ARG_DECL) +{ + TP_Servant_State::HandleType servant_state = + this->get_servant_state(op->servant() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (REQUEST_REJECTED); + + TP_Custom_Asynch_Request_Handle request = new + TP_Custom_Asynch_Request(op, servant_state.in()); + + return (this->task_.add_request(request.in())) + ? REQUEST_DISPATCHED : REQUEST_REJECTED; +} + + +bool +TAO::CSD::TP_Strategy::poa_activated_event_i() +{ + // Activates the worker threads, and waits until all have been started. + return (this->task_.open(&(this->num_threads_)) == 0); +} + + +void +TAO::CSD::TP_Strategy::poa_deactivated_event_i() +{ + // Passing in a value of 1 means that we want to shutdown the task, which + // equates to causing all worker threads to shutdown. The worker threads + // themselves will also invoke the close() method, but the passed-in value + // will be 0. So, a 1 means "shutdown", and a 0 means "a single worker + // thread is going away". + this->task_.close(1); +} + + +TAO::CSD::Strategy_Base::DispatchResult +TAO::CSD::TP_Strategy::dispatch_remote_request_i + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + TP_Servant_State::HandleType servant_state = + this->get_servant_state(servant + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (DISPATCH_REJECTED); + + // Now we can create the TP_Remote_Request object, and then add it to our + // task_'s "request queue". + // + // TBD-CSD: Need to use a Cached Allocator to "create" the + // TP_Remote_Request objects. For now, use the heap. + TP_Remote_Request_Handle request = + new TP_Remote_Request(server_request, + object_id, + poa, + operation, + servant, + servant_state.in()); + + // Hand the request object to our task so that it can add the request + // to its "request queue". + if (!this->task_.add_request(request.in())) + { + // Return the DISPATCH_REJECTED return code so that the caller (our + // base class' dispatch_request() method) knows that we did + // not handle the request, and that it should be rejected. + return TAO::CSD::Strategy_Base::DISPATCH_REJECTED; + } + + return TAO::CSD::Strategy_Base::DISPATCH_HANDLED; +} + + +TAO::CSD::Strategy_Base::DispatchResult +TAO::CSD::TP_Strategy::dispatch_collocated_request_i + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + TP_Servant_State::HandleType servant_state = + this->get_servant_state(servant + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (DISPATCH_REJECTED); + + bool is_sync_with_server = server_request.sync_with_server(); + bool is_synchronous = server_request.response_expected(); + + TP_Collocated_Synch_Request_Handle synch_request; + TP_Collocated_Synch_With_Server_Request_Handle synch_with_server_request; + TP_Request_Handle request; + + // Create the request object using the appropriate concrete type. + if (is_sync_with_server) + { + synch_with_server_request = + new TP_Collocated_Synch_With_Server_Request + (server_request, + object_id, + poa, + operation, + servant, + servant_state.in()); + + // Give the request handle its own "copy". + synch_with_server_request->_add_ref(); + request = synch_with_server_request.in(); + } + else if (is_synchronous) + { + synch_request = new TP_Collocated_Synch_Request(server_request, + object_id, + poa, + operation, + servant, + servant_state.in()); + + // Give the request handle its own "copy". + synch_request->_add_ref(); + request = synch_request.in(); + } + else + { + // Just use the (base) request handle to hold the request object. + request = new TP_Collocated_Asynch_Request(server_request, + object_id, + poa, + operation, + servant, + servant_state.in()); + } + + // Hand the request object to our task so that it can add the request + // to its "request queue". + if (!this->task_.add_request(request.in())) + { + // Return the DISPATCH_REJECTED return code so that the caller (our + // base class' dispatch_request() method) knows that we did + // not handle the request, and that it should be rejected. + return DISPATCH_REJECTED; + } + + // We need to wait on the request object if the request type is a + // synchronous request. + if (!synch_request.is_nil()) + { + int srw = synch_request->wait(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (DISPATCH_REJECTED); + if (srw == false) + { + // Raise exception when request was cancelled. + ACE_THROW_RETURN(CORBA::NO_IMPLEMENT(), DISPATCH_REJECTED); + } + } + else if (!synch_with_server_request.is_nil()) + { + bool swsr = synch_with_server_request->wait(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (DISPATCH_REJECTED); + if (swsr == false) + { + // Raise exception when request was cancelled. + ACE_THROW_RETURN(CORBA::NO_IMPLEMENT(), DISPATCH_REJECTED); + } + } + + return DISPATCH_HANDLED; +} + + +void +TAO::CSD::TP_Strategy::servant_activated_event_i + (PortableServer::Servant servant, + const PortableServer::ObjectId& oid + ACE_ENV_ARG_DECL) +{ + ACE_UNUSED_ARG(oid); + + if (this->serialize_servants_) + { + // Add the servant to the servant state map. + this->servant_state_map_.insert(servant ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } +} + + +void +TAO::CSD::TP_Strategy::servant_deactivated_event_i + (PortableServer::Servant servant, + const PortableServer::ObjectId& oid + ACE_ENV_ARG_DECL) +{ + ACE_UNUSED_ARG(oid); + + // Cancel all requests stuck in the queue for the specified servant. + this->task_.cancel_servant(servant ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (this->serialize_servants_) + { + // Remove the servant from the servant state map. + this->servant_state_map_.remove(servant ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } +} + + +void +TAO::CSD::TP_Strategy::cancel_requests(PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + // Cancel all requests stuck in the queue for the specified servant. + this->task_.cancel_servant(servant ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + + +TAO::CSD::TP_Servant_State::HandleType +TAO::CSD::TP_Strategy::get_servant_state(PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + TP_Servant_State::HandleType servant_state; + + if (this->serialize_servants_) + { + servant_state = this->servant_state_map_.find(servant + ACE_ENV_ARG_PARAMETER); + } + + return servant_state; +} +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.h new file mode 100644 index 00000000000..2e069caeb6d --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.h @@ -0,0 +1,210 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Strategy.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_STRATEGY_H +#define TAO_CSD_TP_STRATEGY_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Task.h" +#include "tao/CSD_ThreadPool/CSD_TP_Servant_State_Map.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "tao/CSD_Framework/CSD_Strategy_Base.h" +#include "tao/Intrusive_Ref_Count_Handle_T.h" + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + class TP_Strategy; + typedef TAO_Intrusive_Ref_Count_Handle<TP_Strategy> TP_Strategy_Handle; + + class TP_Custom_Request_Operation; + + /** + * @class TP_Strategy + * + * @brief A simple custom Thread-Pool servant dispatching strategy class. + * + * This class represents a concrete implementation of a "Custom + * Servant Dispatching Strategy". This implementation is being called + * the "Thread Pool Strategy" reference implementation. + * + * A custom servant dispatching strategy object can be applied to a + * POA object in order to carry out the servant dispatching duties + * for that POA. + * + */ + class TAO_CSD_TP_Export TP_Strategy + : public Strategy_Base + { + public: + + /// Constructor. + TP_Strategy(Thread_Counter num_threads = 1, + bool serialize_servants = true); + + /// Virtual Destructor. + virtual ~TP_Strategy(); + + /// Set the number of threads in the pool (must be > 0). + void set_num_threads(Thread_Counter num_threads); + + /// Turn on/off serialization of servants. + void set_servant_serialization(bool serialize_servants); + + /// Return codes for the custom dispatch_request() methods. + enum CustomRequestOutcome + { + /// The request was successfully put on the request queue. + REQUEST_DISPATCHED, + /// The request has been executed/completed by a worker thread. + REQUEST_EXECUTED, + /// The request was removed from the queue and cancelled. + REQUEST_CANCELLED, + /// The request queue rejected the request + REQUEST_REJECTED + }; + + /// Inject a synchronous, custom request into the request queue. + /// This will block the calling thread until the request is handled + /// (dispatched or cancelled) or rejected. + /// Will return REQUEST_EXECUTED, REQUEST_CANCELLED, or REQUEST_REJECTED. + CustomRequestOutcome custom_synch_request + (TP_Custom_Request_Operation* op + ACE_ENV_ARG_DECL); + + /// Inject an asynchronous, custom request into the request queue. + /// This will return control to the calling thread once the request + /// has been placed into the queue (or rejected). + /// Will return REQUEST_DISPATCHED or REQUEST_REJECTED. + CustomRequestOutcome custom_asynch_request + (TP_Custom_Request_Operation* op + ACE_ENV_ARG_DECL); + + /// Cancel all requests that are targeted for the provided servant. + /// This is requested on the user application level. + void cancel_requests(PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + protected: + + /// Handle the dispatching of a remote request. + /// + /// This will cause a new "request" object to be created and pushed + /// on to a "request queue". The worker threads are responsible for + /// servicing the queue, and performing the actual dispatch logic. + virtual Strategy_Base::DispatchResult dispatch_remote_request_i + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + /// Handle the dispatching of a collocated request. + /// + /// This will cause a new "request" object to be created and pushed + /// on to a "request queue". The worker threads are responsible for + /// servicing the queue, and performing the actual dispatch logic. + virtual Strategy_Base::DispatchResult dispatch_collocated_request_i + (TAO_ServerRequest& server_request, + const PortableServer::ObjectId& object_id, + PortableServer::POA_ptr poa, + const char* operation, + PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + /// Event - The POA has been activated. + /// This will activate the worker thread(s). + /// Returns true if the worker threads were activated successfully. + /// Otherwise, returns false. + virtual bool poa_activated_event_i(); + + /// Event - The POA has been deactivated. + /// This will shutdown the worker thread(s). + virtual void poa_deactivated_event_i(); + + /// Event - A servant has been activated + virtual void servant_activated_event_i + (PortableServer::Servant servant, + const PortableServer::ObjectId& oid + ACE_ENV_ARG_DECL); + + /// Event - A servant has been deactivated + virtual void servant_deactivated_event_i + (PortableServer::Servant servant, + const PortableServer::ObjectId& oid + ACE_ENV_ARG_DECL); + + + private: + + /** + * Helper method that is responsible for looking up the servant + * state object in the servant state map *if* the "serialize + * servants" flag is set to true. In the case where the + * "serialize servants" flag is set to false, then a "nil" + * servant state handle object is returned. + * + * @param servant - input - a pointer to the servant object. + * + * @returns a handle to a servant state object. + * + * @throw PortableServer::POA::ServantNotActive if the servant + * state cannot be determined. + */ + TP_Servant_State::HandleType get_servant_state + (PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + + /// This is the active object used by the worker threads. + /// The request queue is owned/managed by the task object. + /// The strategy object puts requests into the task's request + /// queue, and the worker threads service the queued requests + /// by performing the actual servant request dispatching logic. + TP_Task task_; + + /// The number of worker threads to use for the task. + Thread_Counter num_threads_; + + /// The "serialize servants" flag. + bool serialize_servants_; + + /// The map of servant state objects - only used when the + /// "serialize servants" flag is set to true. + TP_Servant_State_Map servant_state_map_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Strategy.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_STRATEGY_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.inl new file mode 100644 index 00000000000..312484f5c8c --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy.inl @@ -0,0 +1,35 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Strategy::TP_Strategy(Thread_Counter num_threads, + bool serialize_servants) + : num_threads_(num_threads), + serialize_servants_(serialize_servants) +{ + // Assumes that num_threads > 0. +} + + +ACE_INLINE +void +TAO::CSD::TP_Strategy::set_num_threads(Thread_Counter num_threads) +{ + // Simple Mutator. Assumes that num_threads > 0. + this->num_threads_ = num_threads; +} + + +ACE_INLINE +void +TAO::CSD::TP_Strategy::set_servant_serialization(bool serialize_servants) +{ + // Simple Mutator. + this->serialize_servants_ = serialize_servants; +} + + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.cpp new file mode 100644 index 00000000000..fd03281a261 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.cpp @@ -0,0 +1,154 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.h" +#include "tao/CSD_ThreadPool/CSD_TP_Strategy.h" +#include "tao/CSD_ThreadPool/CSD_ThreadPool.h" +#include "tao/CSD_Framework/CSD_Strategy_Repository.h" +#include "tao/debug.h" +#include "ace/Dynamic_Service.h" +#include "ace/OS_NS_strings.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Strategy_Factory, + "$Id$") + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Strategy_Factory::TP_Strategy_Factory() +{ +} + + +TAO::CSD::TP_Strategy_Factory::~TP_Strategy_Factory() +{ +} + + +int +TAO::CSD::TP_Strategy_Factory::init (int argc, + ACE_TCHAR* argv[]) +{ + ACE_TRACE ("TAO::CSD::TP_Strategy_Factory::init"); + + static int initialized = 0; + + // Only allow initialization once. + if (initialized) + return 0; + + initialized = 1; + TAO_CSD_Strategy_Repository *repo = + ACE_Dynamic_Service<TAO_CSD_Strategy_Repository>::instance ("TAO_CSD_Strategy_Repository"); + + if (repo != 0) + repo->init(0,0); + + // Parse any service configurator parameters. + for (int curarg = 0; curarg < argc; curarg++) + if (ACE_OS::strcasecmp (argv[curarg], + ACE_TEXT("-CSDtp")) == 0) + { + ACE_CString poa_name; + unsigned long num_threads = 1; + bool serialize_servants = true; + + curarg++; + if (curarg < argc) + { + // Parse the parameter + ACE_CString arg ((const char *)argv[curarg]); + ACE_CString::size_type pos = arg.find (':'); + + if (pos == ACE_CString::npos) + { + poa_name = arg; + } + else + { + poa_name = arg.substr (0, pos); + + ACE_CString arg_remainder = + arg.substr (pos + 1, arg.length () - pos); + + ACE_CString num_thread_str; + + pos = arg_remainder.find (':'); + + if (pos == ACE_CString::npos) + { + num_thread_str = arg_remainder; + } + else + { + num_thread_str = arg_remainder.substr (0, pos); + + ACE_CString off_str = + arg_remainder.substr (pos + 1, arg.length () - pos); + + // Case-insensitive string comparison. + if (ACE_OS::strcasecmp (ACE_TEXT_CHAR_TO_TCHAR (off_str.c_str()), + ACE_TEXT("OFF")) == 0) + { + serialize_servants = false; + } + } + + num_threads = ACE_OS::strtoul (num_thread_str.c_str (), 0, 10); + + if (num_threads == 0) + { + // Minimum of 1 thread required. + num_threads = 1; + } + } + + // Create the ThreadPool strategy for each named poa. + TP_Strategy* strategy = 0; + ACE_NEW_RETURN (strategy, + TP_Strategy (num_threads, serialize_servants), + -1); + CSD_Framework::Strategy_var objref = strategy; + + TAO_CSD_Strategy_Repository *repo = + ACE_Dynamic_Service<TAO_CSD_Strategy_Repository>::instance + ("TAO_CSD_Strategy_Repository"); + + if (repo == 0) + { + TAO_CSD_ThreadPool::init (); + repo = ACE_Dynamic_Service<TAO_CSD_Strategy_Repository>::instance ( + "TAO_CSD_Strategy_Repository" + ); + } + + + repo->add_strategy (poa_name, strategy); + } + } + else + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT("CSD_ORB_Loader: Unknown option ") + ACE_TEXT("<%s>.\n"), + argv[curarg])); + } + } + + + return 0; +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +ACE_FACTORY_NAMESPACE_DEFINE(TAO_CSD_TP, + TAO_CSD_TP_Strategy_Factory, + TAO::CSD::TP_Strategy_Factory) + +ACE_STATIC_SVC_DEFINE(TAO_CSD_TP_Strategy_Factory, + ACE_TEXT("TAO_CSD_TP_Strategy_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME(TAO_CSD_TP_Strategy_Factory), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0) diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.h new file mode 100644 index 00000000000..abc3da5b005 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.h @@ -0,0 +1,65 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Strategy_Factory.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_STRATEGY_FACTORY_H +#define TAO_CSD_TP_STRATEGY_FACTORY_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" +#include "tao/Versioned_Namespace.h" +#include "ace/Service_Object.h" +#include "ace/Service_Config.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + /** + * @class TP_Strategy_Factory + * + * @brief An ACE_Service_Object capable of creating TP_Strategy objects. + * + * TBD - Explain in more detail. + * + */ + class TAO_CSD_TP_Export TP_Strategy_Factory : public ACE_Service_Object + { + public: + + /// Constructor. + TP_Strategy_Factory(); + + /// Virtual Destructor. + virtual ~TP_Strategy_Factory(); + + int init (int argc, ACE_TCHAR* argv[]); + }; + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +ACE_STATIC_SVC_DECLARE_EXPORT(TAO_CSD_TP, TAO_CSD_TP_Strategy_Factory) +ACE_FACTORY_DECLARE(TAO_CSD_TP, TAO_CSD_TP_Strategy_Factory) + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_STRATEGY_FACTORY_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.cpp new file mode 100644 index 00000000000..79a0a195097 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.cpp @@ -0,0 +1,11 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Synch_Helper, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.inl" +#endif /* ! __ACE_INLINE__ */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h new file mode 100644 index 00000000000..2056f1ab032 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.h @@ -0,0 +1,110 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Synch_Helper.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_SYNCH_HELPER_H +#define TAO_CSD_TP_SYNCH_HELPER_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" +#include "tao/Condition.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Synch.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + + /** + * @class TP_Synch_Helper + * + * @brief Helper class for synchronous requests to block the requesting + * thread until the appropriate time (when it will be un-blocked). + * + * TBD - Description here + * + */ + class TAO_CSD_TP_Export TP_Synch_Helper + { + public: + + /// Constructor. Sets initial state to PENDING. + TP_Synch_Helper(); + + /// Destructor. + ~TP_Synch_Helper(); + + /// Returns true if the helper state is DISPATCHED, and false if + /// the helper state is CANCELLED. However, if the helper state + /// is PENDING, then this method will block the calling thread + /// until the state changes to something other than PENDING + /// (ie, DISPATCHED or CANCELLED). + bool wait_while_pending(); + + /// Change the state of this helper to DISPATCHED, which will cause + /// wait_while_pending() to unblock. + void dispatched(); + + /// Change the state of this helper to CANCELLED, which will cause + /// wait_while_pending() to unblock. + void cancelled(); + + + private: + + /// Enumeration Type for all possible states of this helper object. + enum HelperState + { + PENDING, + DISPATCHED, + CANCELLED + }; + + /// Thread lock type + typedef ACE_SYNCH_MUTEX LockType; + + /// Thread guard type + typedef ACE_Guard<LockType> GuardType; + + /// Thread condition type + typedef TAO_Condition<LockType> ConditionType; + + /// Lock used to protect the state and condition. + LockType lock_; + + /// Used to denote the state of the request dispatching. + HelperState state_; + + /// The condition used to block the calling thread until the + /// state is something other than the PENDING state. + ConditionType condition_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Synch_Helper.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_SYNCH_HELPER_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.inl new file mode 100644 index 00000000000..f4c5a1380c1 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Synch_Helper.inl @@ -0,0 +1,55 @@ +// -*- C++ -*- +// +// $Id$ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Synch_Helper::TP_Synch_Helper() + : state_(PENDING), + condition_(this->lock_) +{ +} + + +ACE_INLINE +TAO::CSD::TP_Synch_Helper::~TP_Synch_Helper() +{ +} + + +ACE_INLINE +bool +TAO::CSD::TP_Synch_Helper::wait_while_pending() +{ + GuardType guard(this->lock_); + + while (this->state_ == PENDING) + { + this->condition_.wait(); + } + + return (this->state_ == DISPATCHED); +} + + +ACE_INLINE +void +TAO::CSD::TP_Synch_Helper::dispatched() +{ + GuardType guard(this->lock_); + this->state_ = DISPATCHED; + this->condition_.signal(); +} + + +ACE_INLINE +void +TAO::CSD::TP_Synch_Helper::cancelled() +{ + GuardType guard(this->lock_); + this->state_ = CANCELLED; + this->condition_.signal(); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp new file mode 100644 index 00000000000..a196ea9583d --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.cpp @@ -0,0 +1,312 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_TP_Task.h" +#include "tao/CSD_ThreadPool/CSD_TP_Request.h" +#include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h" +#include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h" + +ACE_RCSID (CSD_ThreadPool, + TP_Task, + "$Id$") + +#if !defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Task.inl" +#endif /* ! __ACE_INLINE__ */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO::CSD::TP_Task::~TP_Task() +{ +} + + +bool +TAO::CSD::TP_Task::add_request(TP_Request* request) +{ + GuardType guard(this->lock_); + + if (!this->accepting_requests_) + { + ACE_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - " + "not accepting requests\n")); + return false; + } + + // We have made the decision that the request is going to be placed upon + // the queue_. Inform the request that it is about to be placed into + // a request queue. Some requests may not need to do anything in + // preparation of being placed into a queue. Others, however, may need + // to perfom a "clone" operation on some underlying request data before + // the request can be properly placed into a queue. + request->prepare_for_queue(); + + this->queue_.put(request); + + this->work_available_.signal(); + + return true; +} + + +int +TAO::CSD::TP_Task::open(void* num_threads_ptr) +{ + Thread_Counter num = 1; + + if (num_threads_ptr != 0) + { + Thread_Counter* tmp = static_cast<Thread_Counter*> (num_threads_ptr); + + if (tmp == 0) + { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) TP_Task failed to open. " + "Invalid argument type passed to open().\n"), + -1); + } + + num = *tmp; + } + + // We can't activate 0 threads. Make sure this isn't the case. + if (num < 1) + { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) TP_Task failed to open. " + "num_threads (%u) is less-than 1.\n", + num), + -1); + } + + // Likewise, we can't activate too many. Make sure this isn't the case. + if (num > MAX_THREADPOOL_TASK_WORKER_THREADS) + { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) TP_Task failed to open. " + "num_threads (%u) is too large. Max is %d.\n", + num, MAX_THREADPOOL_TASK_WORKER_THREADS), + -1); + } + + // We need the lock acquired from here on out. + GuardType guard(this->lock_); + + // We can assume that we are in the proper state to handle this open() + // call as long as we haven't been open()'ed before. + if (this->opened_) + { + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) TP_Task failed to open. " + "Task has previously been open()'ed.\n"), + -1); + } + + // Activate this task object with 'num' worker threads. + if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0) + { + // Assumes that when activate returns non-zero return code that + // no threads were activated. + ACE_ERROR_RETURN((LM_ERROR, + "(%P|%t) TP_Task failed to activate " + "(%d) worker threads.\n", + num), + -1); + } + + // Now we have past the point where we can say we've been open()'ed before. + this->opened_ = true; + + // Now we wait until all of the threads have started. + while (this->num_threads_ != num) + { + this->active_workers_.wait(); + } + + // We can now accept requests (via our add_request() method). + this->accepting_requests_ = true; + + return 0; +} + + +int +TAO::CSD::TP_Task::svc() +{ + // Account for this current worker thread having started the + // execution of this svc() method. + { + GuardType guard(this->lock_); + // Put the thread id into a collection which is used to check whether + // the orb shutdown is called by one of the threads in the pool. + ACE_thread_t thr_id = ACE_OS::thr_self (); + if (this->activated_threads_.set(thr_id, this->num_threads_) == -1) + { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("(%P|%t)TP_Task::svc: number of threads is out of range \n")), + 0); + } + ++this->num_threads_; + this->active_workers_.signal(); + } + + // This visitor object will be re-used over and over again as part of + // the "GetWork" logic below. + TP_Dispatchable_Visitor dispatchable_visitor; + + // Start the "GetWork-And-PerformWork" loop for the current worker thread. + while (1) + { + TP_Request_Handle request; + + // Do the "GetWork" step. + { + // Acquire the lock until just before we decide to "PerformWork". + GuardType guard(this->lock_); + + // Start the "GetWork" loop. + while (request.is_nil()) + { + if (this->shutdown_initiated_) + { + // This breaks us out of all loops with one fell swoop. + return 0; + } + + // There is no need to visit the queue if it is empty. + if (!this->queue_.is_empty()) + { + // Visit the requests in the queue in hopes of + // locating the first "dispatchable" (ie, not busy) request. + // If a dispatchable request is located, it is extracted + // from the queue and saved in a handle data member in the + // visitor object. + this->queue_.accept_visitor(dispatchable_visitor); + + // If a dispatchable request is located, it is extracted + // from the queue and saved in a handle data member in the + // visitor object. Let's get a "copy" (or a NULL pointer + // if the visitor didn't locate/extract one). + request = dispatchable_visitor.request(); + } + + // Either the queue is empty or we couldn't find any dispatchable + // requests in the queue at this time. + if (request.is_nil()) + { + // Let's wait until we hear about the possibility of + // work before we go look again. + this->work_available_.wait(); + } + } + + // We have dropped out of the "while (request.is_nil())" loop. + // We only get here is we located/extracted a dispatchable request + // from the queue. Note that the visitor will have already + // marked the target servant as now being busy (because of us). + // We can now safely release the lock. + } + + // Do the "PerformWork" step. We don't need the lock_ to do this. + request->dispatch(); + + // Now that the request has been dispatched, we need to mark the target + // servant as no longer being busy, and we need to signal any wait()'ing + // worker threads that there may be some dispatchable requests in the + // queue now for this not-busy servant. We need the lock_ to do this. + { + GuardType guard(this->lock_); + request->mark_as_ready(); + this->work_available_.signal(); + } + + // Reset the visitor since we use it over and over. This + // will cause the visitor to drop any reference to + // the dispatched request. + dispatchable_visitor.reset(); + + // Note that the request will be "released" here when the request + // handle falls out of scope and its destructor performs the + // _remove_ref() call on the underlying TP_Request object. + } + + // This will never get executed. + return 0; +} + + +int +TAO::CSD::TP_Task::close(u_long flag) +{ + GuardType guard(this->lock_); + + if (flag == 0) + { + // Worker thread is closing. + --this->num_threads_; + this->active_workers_.signal(); + } + else + { + // Strategy object is shutting down the task. + + // Do nothing if this task has never been open()'ed. + if (!this->opened_) + { + return 0; + } + + // Set the shutdown flag to true. + this->shutdown_initiated_ = true; + + // Stop accepting requests. + this->accepting_requests_ = false; + + // Signal all worker threads waiting on the work_available_ condition. + this->work_available_.broadcast(); + + size_t num_waiting_threads = 0; + + ACE_thread_t my_thr_id = ACE_OS::thr_self (); + + // Check whether the calling thread(calling orb shutdown) is one of the + // threads in the pool. If it is then it should not wait itself. + size_t size = this->activated_threads_.size (); + + for (size_t i = 0; i < size; i ++) + { + ACE_thread_t thr_id = 0; + if (activated_threads_.get (thr_id, i) == 0 && thr_id == my_thr_id) + { + num_waiting_threads = 1; + break; + } + } + + // Wait until all worker threads have shutdown. + while (this->num_threads_ != num_waiting_threads) + { + this->active_workers_.wait(); + } + + // Cancel all requests. + TP_Cancel_Visitor cancel_visitor; + this->queue_.accept_visitor(cancel_visitor); + } + + return 0; +} + + + +void +TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant + ACE_ENV_ARG_DECL) +{ + GuardType guard(this->lock_); + + // Cancel the requests targeted for the provided servant. + TP_Cancel_Visitor cancel_visitor(servant); + this->queue_.accept_visitor(cancel_visitor); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Task.h b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.h new file mode 100644 index 00000000000..65839aeb3b7 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.h @@ -0,0 +1,164 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_TP_Task.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_TP_TASK_H +#define TAO_CSD_TP_TASK_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Queue.h" +#include "tao/PortableServer/PortableServer.h" +#include "tao/Condition.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Task.h" +#include "ace/Synch.h" +#include "ace/Containers_T.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace TAO +{ + namespace CSD + { + /// Typedef for the number of threads. + typedef unsigned long Thread_Counter; + + /** + * @class TP_Task + * + * @brief Active Object managing a queue of request objects. + * + * There are two types of "users" of a TP_Task object: + * + * 1) The TP_Strategy object that "owns" this task object. + * 2) The worker threads that "run" this task object as an + * "active object". + * + * The TP_Strategy object that "owns" this task object dictates + * when the worker threads are activated and when they are shutdown. It + * also injects requests into this task's queue via calls to the + * add_request() method. It is also the TP_Strategy object that + * dictates the number of worker threads to be activated via a call to + * the set_num_threads() method. + * + * The active object pattern is implemented via the use of the + * the ACE_Task_Base base class, and each worker thread will + * invoke this task's svc() method, and when the svc() returns, the + * worker thread will invoke this task's close() method (with the + * flag argument equal to 0). + * + * @note I just wanted to document an idea... When the pool consists + * of only one worker thread, we could care less about checking + * if target servant objects are busy or not. The simple fact + * that only one thread will be dispatching all requests means + * that servant objects will never be busy when the thread + * tests to see if a request is "ready_for_dispatch()". I'm + * just wondering if this knowledge can be applied to the + * implementation such that the "pool with one worker thread" case + * performs more efficiently. This is STP vs SSTP. + * + */ + class TAO_CSD_TP_Export TP_Task : public ACE_Task_Base + { + public: + + /// Default Constructor. + TP_Task(); + + /// Virtual Destructor. + virtual ~TP_Task(); + + /// Put a request object on to the request queue. + /// Returns true if successful, false otherwise (it has been "rejected"). + bool add_request(TP_Request* request); + + /// Activate the worker threads + virtual int open(void* num_threads_ptr = 0); + + /// The "mainline" executed by each worker thread. + virtual int svc(); + + /// Multi-purpose: argument value is used to differentiate purpose. + /// + /// 0) Invoked by each worker thread after its invocation of the + /// svc() method has completed (ie, returned). + /// 1) Invoked by the strategy object to shutdown all worker threads. + virtual int close(u_long flag = 0); + + /// Cancel all requests that are targeted for the provided servant. + void cancel_servant (PortableServer::Servant servant + ACE_ENV_ARG_DECL); + + + private: + + typedef TAO_SYNCH_MUTEX LockType; + typedef ACE_Guard<LockType> GuardType; + typedef TAO_Condition<LockType> ConditionType; + + + /// Lock to protect the "state" (all of the data members) of this object. + LockType lock_; + + /// Condition used to signal worker threads that they may be able to + /// find a request in the queue_ that needs to be dispatched to a + /// servant that is currently "not busy". + /// This condition will be signal()'ed each time a new request is + /// added to the queue_, and also when a servant has become "not busy". + ConditionType work_available_; + + /// This condition will be signal()'ed each time the num_threads_ + /// data member has its value changed. This is used to keep the + /// close(1) invocation (ie, a shutdown request) blocked until all + /// of the worker threads have stopped running. + ConditionType active_workers_; + + /// Flag used to indicate when this task will (or will not) accept + /// requests via the the add_request() method. + bool accepting_requests_; + + /// Flag used to initiate a shutdown request to all worker threads. + bool shutdown_initiated_; + + /// Flag used to avoid multiple open() calls. + bool opened_; + + /// The number of currently active worker threads. + Thread_Counter num_threads_; + + /// The queue of pending servant requests (a.k.a. the "request queue"). + TP_Queue queue_; + + typedef ACE_Array <ACE_thread_t> Thread_Ids; + + /// The list of ids for the threads launched by this task. + Thread_Ids activated_threads_; + }; + + } +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +# include "tao/CSD_ThreadPool/CSD_TP_Task.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_TP_TASK_H */ diff --git a/TAO/tao/CSD_ThreadPool/CSD_TP_Task.inl b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.inl new file mode 100644 index 00000000000..d9c203f45ce --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_TP_Task.inl @@ -0,0 +1,22 @@ +// -*- C++ -*- +// +// $Id$ + +namespace { enum { MAX_THREADPOOL_TASK_WORKER_THREADS = 50 }; } +namespace { const ACE_thread_t default_thread_id = 0; } + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +TAO::CSD::TP_Task::TP_Task() + : work_available_(this->lock_), + active_workers_(this->lock_), + accepting_requests_(false), + shutdown_initiated_(false), + opened_(false), + num_threads_(0), + activated_threads_ ((size_t)MAX_THREADPOOL_TASK_WORKER_THREADS, default_thread_id) +{ +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.cpp b/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.cpp new file mode 100644 index 00000000000..2ae022711f5 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.cpp @@ -0,0 +1,23 @@ +// $Id$ + +#include "tao/CSD_ThreadPool/CSD_ThreadPool.h" +#include "tao/CSD_ThreadPool/CSD_TP_Strategy_Factory.h" +#include "tao/CSD_Framework/CSD_Framework_Loader.h" +#include "tao/debug.h" +#include "ace/Dynamic_Service.h" + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +int +TAO_CSD_ThreadPool::init (void) +{ + static int initialized = 0; + if (initialized == 1) + return 0; + initialized = 1; + + TAO_CSD_Framework_Loader::init(); + return ACE_Service_Config::process_directive (ace_svc_desc_TAO_CSD_TP_Strategy_Factory); +} + +TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.h b/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.h new file mode 100644 index 00000000000..bb1bd448284 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/CSD_ThreadPool.h @@ -0,0 +1,65 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file CSD_ThreadPool.h + * + * $Id$ + * + * @author Tim Bradley <bradley_t@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_CSD_THREADPOOL_H +#define TAO_CSD_THREADPOOL_H + +#include /**/ "ace/pre.h" + +#include "tao/CSD_ThreadPool/CSD_TP_Export.h" +#include "tao/Versioned_Namespace.h" +#include "ace/Service_Object.h" +#include "ace/Service_Config.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class TP_Strategy_Factory + * + * @brief An ACE_Service_Object capable of creating TP_Strategy objects. + * + * TBD - Explain in more detail. + * + */ +class TAO_CSD_TP_Export TAO_CSD_ThreadPool +{ + public: + /// Used to force the initialization of the ORB code. + static int init (void); +}; + +#if defined(ACE_HAS_BROKEN_STATIC_CONSTRUCTORS) + +typedef int (*TAO_CSD_Threadpool) (void); + +static TAO_CSD_Threadpool +TAO_Requires_CSD_Threadpool = + &TAO_CSD_ThreadPool::init; + +#else + +static int +TAO_Requires_CSD_Threadpool = + TAO_CSD_ThreadPool::init (); + +#endif /* ACE_HAS_BROKEN_STATIC_CONSTRUCTORS */ + +TAO_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" + +#endif /* TAO_CSD_THREADPOOL_H */ diff --git a/TAO/tao/CSD_ThreadPool/TAO_CSD_ThreadPool.pc.in b/TAO/tao/CSD_ThreadPool/TAO_CSD_ThreadPool.pc.in new file mode 100644 index 00000000000..7b2b894a6d7 --- /dev/null +++ b/TAO/tao/CSD_ThreadPool/TAO_CSD_ThreadPool.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: TAO_CSD_ThreadPool +Description: TAO CSD ThreadPool Library +Requires: TAO_CSD_Framework +Version: @VERSION@ +Libs: -L${libdir} -lTAO +Cflags: -I${includedir} |