diff options
author | jmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-07-17 05:48:07 +0000 |
---|---|---|
committer | jmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-07-17 05:48:07 +0000 |
commit | 5cfeca379c243824d2e6fd12a817cc79145916c1 (patch) | |
tree | ac78a26908835e080647cb0be87434b206612e8f | |
parent | dc31c769bcb7392312db86734ca139e901f6757d (diff) | |
download | ATCD-5cfeca379c243824d2e6fd12a817cc79145916c1.tar.gz |
deleting all files previously added to the branch after realizing they are woefully out of ACE compliance
31 files changed, 0 insertions, 5087 deletions
diff --git a/ACE/ace/Basic_P_Strategy.cpp b/ACE/ace/Basic_P_Strategy.cpp deleted file mode 100644 index 7e7fdce2301..00000000000 --- a/ACE/ace/Basic_P_Strategy.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "ace/Basic_P_Strategy.h" - -#if !defined (__ACE_INLINE__) -//#include "ace/Basic_P_Strategy.inl" -#endif /* __ACE_INLINE__ */ - diff --git a/ACE/ace/Basic_P_Strategy.h b/ACE/ace/Basic_P_Strategy.h deleted file mode 100644 index 4686c755f77..00000000000 --- a/ACE/ace/Basic_P_Strategy.h +++ /dev/null @@ -1,97 +0,0 @@ -// -*- C++ -*- - -//============================================================================= -/** - * @file Basic_P_Strategy.h - * - * - * - * - * - * @author Paul Oberlin <pauloberlin@gmail.com> - */ -//============================================================================= - -#ifndef ACE_BASIC_P_STRATEGY_H -#define ACE_BASIC_P_STRATEGY_H - -#include /**/ "ace/pre.h" - -#include "ace/DA_Strategy_Base.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -ACE_BEGIN_VERSIONED_NAMESPACE_DECL - -template <typename AnnotationId> -class Basic_P_Strategy : public DA_Strategy_Base<AnnotationId> { - - //The annotations consist of an identifier and a resource cost value - -public: - Basic_P_Strategy(int maxThreads); - virtual ~Basic_P_Strategy(); - virtual int is_deadlock_potential(AnnotationId handle); - virtual void grant(AnnotationId handle); - virtual void release(AnnotationId upcall_handle); -private: - int t_r; -}; - -ACE_END_VERSIONED_NAMESPACE_DECL - -//#if defined (__ACE_INLINE__) -//#include "ace/Basic_P_Strategy.inl" -//#endif /* __ACE_INLINE__ */ - - -template <typename AnnotationId> -ACE_INLINE -Basic_P_Strategy<AnnotationId>::Basic_P_Strategy(int maxThreads) -:DA_Strategy_Base<AnnotationId>(maxThreads), - t_r(maxThreads) -{ -} - -template <typename AnnotationId> -ACE_INLINE -Basic_P_Strategy<AnnotationId>::~Basic_P_Strategy() -{ - -} - -template <typename AnnotationId> -ACE_INLINE -int Basic_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) -{ - int annotation = get_annotation(handle); - if (annotation > t_r) - { - return annotation - t_r; - } - - return 0; -} - -template <typename AnnotationId> -ACE_INLINE -void Basic_P_Strategy<AnnotationId>::grant(AnnotationId handle) -{ - --t_r; -} - -template <typename AnnotationId> -ACE_INLINE -void Basic_P_Strategy<AnnotationId>::release(AnnotationId upcall_handle) -{ - ++t_r; -} - - - -#include /**/ "ace/post.h" - -#endif /* ACE_BASIC_P_STRATEGY_H */ - diff --git a/ACE/ace/Basic_P_Strategy.inl b/ACE/ace/Basic_P_Strategy.inl deleted file mode 100644 index 67290328c14..00000000000 --- a/ACE/ace/Basic_P_Strategy.inl +++ /dev/null @@ -1,37 +0,0 @@ - -template <typename AnnotationId> -ACE_INLINE -Basic_P_Strategy<AnnotationId>::Basic_P_Strategy(int maxThreads) -:DA_Strategy_Base<AnnotationId>(maxThreads), - t_r(maxThreads) -{ -} - -template <typename AnnotationId> -ACE_INLINE -Basic_P_Strategy<AnnotationId>::~Basic_P_Strategy() -{ - -} - -template <typename AnnotationId> -ACE_INLINE -bool Basic_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) -{ - int annotation = get_annotation(handle); - return !(annotation < t_r); -} - -template <typename AnnotationId> -ACE_INLINE -void Basic_P_Strategy<AnnotationId>::grant(AnnotationId handle) -{ - --t_r; -} - -template <typename AnnotationId> -ACE_INLINE -void Basic_P_Strategy<AnnotationId>::release(AnnotationId upcall_handle) -{ - ++t_r; -} diff --git a/ACE/ace/DA_Strategy_Base.cpp b/ACE/ace/DA_Strategy_Base.cpp deleted file mode 100644 index d0e484a700e..00000000000 --- a/ACE/ace/DA_Strategy_Base.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "ace/DA_Strategy_Base.h" - -#if !defined (__ACE_INLINE__) -//#include "ace/DA_Strategy_Base.inl" -#endif /* __ACE_INLINE__ */
\ No newline at end of file diff --git a/ACE/ace/DA_Strategy_Base.h b/ACE/ace/DA_Strategy_Base.h deleted file mode 100644 index 761f33a52ce..00000000000 --- a/ACE/ace/DA_Strategy_Base.h +++ /dev/null @@ -1,181 +0,0 @@ -// -*- C++ -*- - -//============================================================================= -/** - * @file DA_Strategy_Base.h - * - * - * - * The Deadlock Avoidance Strategy Base (DA_Strategy_Base) class - * is an abstract base class for Strategies that implement deadlock - * avoidance algorithms. This class provides interfaces for passing - * annotations for call graph annotations, number of available threads, as well - * as methods to determine whether a call is safe to make. - * - * - * @author Paul Oberlin <pauloberlin@gmail.com> - */ -//============================================================================= - -#ifndef DA_STRATEGY_BASE_H -#define DA_STRATEGY_BASE_H - -#include /**/ "ace/pre.h" -#include "ace/Hash_Map_Manager.h" -#include "ace/Thread_Mutex.h" -#include "ace/Atomic_Op_T.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -class ACE_Event_Handler; - -template <typename AnnotationId> -class DA_Strategy_Base { - - public: - - //The annotations consist of an identifier and a resource cost value -typedef ACE_Hash_Map_Entry<ACE_Event_Handler *, int> HASH_EH_ENTRY; - - -typedef ACE_Hash_Map_Manager_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> HASH_ANNOTATIONS_MAP; - -typedef ACE_Hash_Map_Iterator_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> HASH_ANNOTATIONS_ITER; - -typedef ACE_Hash_Map_Const_Iterator_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> HASH_ANNOTATIONS_CONST_ITER; - -typedef ACE_Hash_Map_Reverse_Iterator_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> HASH_ANNOTATIONS_REVERSE_ITER; - -typedef HASH_ANNOTATIONS_MAP Annotations_Table; - - - DA_Strategy_Base(int maxThreads); - virtual ~DA_Strategy_Base(); - - virtual int is_deadlock_potential(AnnotationId handle)=0; - virtual void grant(AnnotationId handle)=0; - virtual void release(AnnotationId upcall_handle)=0; - int get_max_threads() { return num_avail_threads_.value();} - HASH_ANNOTATIONS_CONST_ITER get_annotations_iter() const; - virtual int get_annotation (AnnotationId handle) const; - virtual int add_annotation (AnnotationId handle, int annotation); - virtual int remove_annotation (AnnotationId handle); - virtual int set_annotations_table (const HASH_ANNOTATIONS_REVERSE_ITER& table); - -private: - HASH_ANNOTATIONS_MAP annotations_repo_; - ACE_RW_Thread_Mutex lock_; - ACE_Atomic_Op<ACE_Thread_Mutex, int> num_avail_threads_; - -}; - -//#if defined (__ACE_INLINE__) -//#include "ace/DA_Strategy_Base.inl" -template <typename AnnotationId> -ACE_INLINE -DA_Strategy_Base<AnnotationId>::DA_Strategy_Base (int maxThreads) - :num_avail_threads_ (maxThreads) -{ -} - -template <typename AnnotationId> -ACE_INLINE -DA_Strategy_Base<AnnotationId>::~DA_Strategy_Base() -{ -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id) const -{ - int annotation; - if (annotations_repo_.find (id, annotation) == -1) - return -1; - else return annotation; -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::set_annotations_table ( - const HASH_ANNOTATIONS_REVERSE_ITER& table) -{ - HASH_ANNOTATIONS_REVERSE_ITER iter(table); - int rc=0; - - for (;!(iter.done()); iter++) - { - rc = annotations_repo_.bind((*iter).ext_id_, (*iter).int_id_); - if (rc != 0) break; - } - - return rc; -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::add_annotation (AnnotationId id, int annotation) -{ - int rc; - if (annotation > num_avail_threads_.value()) { - rc = -1; - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p.\n"), - ACE_TEXT ("DA_Strategy_Base annotation may not exceed number of threads"))); - } else { - rc = annotations_repo_.bind (id, annotation); - } - /* - ACE_DEBUG ((LM_DEBUG, "In add_annotation\n")); - HASH_ANNOTATIONS_CONST_ITER iter(annotations_repo_); - for (;!(iter.done()); iter++) - { - ACE_DEBUG ((LM_DEBUG, "%d-%d\n", (*iter).ext_id_, (*iter).int_id_)); - } - */ - return rc; -} - -template <typename AnnotationId> -ACE_INLINE ACE_Hash_Map_Const_Iterator_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> -DA_Strategy_Base<AnnotationId>::get_annotations_iter() const -{ - - return annotations_repo_.begin(); -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::remove_annotation (AnnotationId id) -{ - return annotations_repo_.unbind (id); -} - - -//#endif /* __ACE_INLINE__ */ - -#include /**/ "ace/post.h" - -#endif /* DA_STRATEGY_BASE_H */ - diff --git a/ACE/ace/DA_Strategy_Base.inl b/ACE/ace/DA_Strategy_Base.inl deleted file mode 100644 index be3e999e798..00000000000 --- a/ACE/ace/DA_Strategy_Base.inl +++ /dev/null @@ -1,83 +0,0 @@ -template <typename AnnotationId> -ACE_INLINE -DA_Strategy_Base<AnnotationId>::DA_Strategy_Base (int maxThreads) - :num_avail_threads_ (maxThreads) -{ -} - -template <typename AnnotationId> -ACE_INLINE -DA_Strategy_Base<AnnotationId>::~DA_Strategy_Base() -{ -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id) const -{ - int annotation; - if (annotations_repo_.find (id, annotation) == -1) - return -1; - else return annotation; -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::set_annotations_table ( - const HASH_ANNOTATIONS_REVERSE_ITER& table) -{ - HASH_ANNOTATIONS_REVERSE_ITER iter(table); - int rc=0; - - for (;!(iter.done()); iter++) - { - rc = annotations_repo_.bind((*iter).ext_id_, (*iter).int_id_); - if (rc != 0) break; - } - - return rc; -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::add_annotation (AnnotationId id, int annotation) -{ - int rc; - if (annotation > num_avail_threads_.value()) { - rc = -1; - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p.\n"), - ACE_TEXT ("DA_Strategy_Base annotation may not exceed number of threads"))); - } else { - rc = annotations_repo_.bind (id, annotation); - } - /* - ACE_DEBUG ((LM_DEBUG, "In add_annotation\n")); - HASH_ANNOTATIONS_CONST_ITER iter(annotations_repo_); - for (;!(iter.done()); iter++) - { - ACE_DEBUG ((LM_DEBUG, "%d-%d\n", (*iter).ext_id_, (*iter).int_id_)); - } - */ - return rc; -} - -template <typename AnnotationId> -ACE_INLINE ACE_Hash_Map_Const_Iterator_Ex<AnnotationId, - int, - ACE_Hash<AnnotationId>, - ACE_Equal_To<AnnotationId>, - ACE_Thread_Mutex> -DA_Strategy_Base<AnnotationId>::get_annotations_iter() const -{ - - return annotations_repo_.begin(); -} - -template <typename AnnotationId> -ACE_INLINE int -DA_Strategy_Base<AnnotationId>::remove_annotation (AnnotationId id) -{ - return annotations_repo_.unbind (id); -} - diff --git a/ACE/ace/Live_P_Strategy.cpp b/ACE/ace/Live_P_Strategy.cpp deleted file mode 100644 index 9b14b27dbcb..00000000000 --- a/ACE/ace/Live_P_Strategy.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "ace/Live_P_Strategy.h" - -#if !defined (__ACE_INLINE__) -#include "ace/Live_P_Strategy.inl" -#endif /* __ACE_INLINE__ */
\ No newline at end of file diff --git a/ACE/ace/Live_P_Strategy.h b/ACE/ace/Live_P_Strategy.h deleted file mode 100644 index 729e3efc69a..00000000000 --- a/ACE/ace/Live_P_Strategy.h +++ /dev/null @@ -1,55 +0,0 @@ -// -*- C++ -*- - -//============================================================================= -/** - * @file Live_P_Strategy.h - * - * - * - * - * - * @author Paul Oberlin <pauloberlin@gmail.com> - */ -//============================================================================= - -#ifndef ACE_LIVE_P_STRATEGY_H -#define ACE_LIVE_P_STRATEGY_H - -#include /**/ "ace/pre.h" - -#include "ace/DA_Strategy_Base.h" -#include "ace/RB_Tree.h" -#include "ace/Mutex.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -//forward decl -class Live_P_Tree; - -template <typename AnnotationId> -class Live_P_Strategy : public DA_Strategy_Base<AnnotationId> { - - //The annotations consist of an identifier and a resource cost value - -public: - Live_P_Strategy(int maxThreads); - virtual ~Live_P_Strategy(); - virtual int is_deadlock_potential(AnnotationId handle); - virtual void grant(AnnotationId handle); - virtual void release(AnnotationId upcall_handle); -private: - Live_P_Tree* tree_pimpl_; - bool min_illegal_is_computed_; - int min_illegal_; - ACE_Mutex computation_mutex_; - -}; -#if defined (__ACE_INLINE__) -#include "ace/Live_P_Strategy.inl" -#endif /* __ACE_INLINE__ */ - -#include /**/ "ace/post.h" - -#endif /* ACE_LIVE_P_STRATEGY_H */ diff --git a/ACE/ace/Live_P_Strategy.inl b/ACE/ace/Live_P_Strategy.inl deleted file mode 100644 index f1dd3f7936a..00000000000 --- a/ACE/ace/Live_P_Strategy.inl +++ /dev/null @@ -1,246 +0,0 @@ -#include <climits> -#include "ace/RB_Tree.h" -/* - Much of this is credited to "Efficient Distrubuted Deadlock - Avoidance with Liveness Guarentees" by Sanchez, Sipma, and Manna, - EMSOFT 2006 -*/ - -struct AnnotationNode { - AnnotationNode() - :count(0), size(0), larger(0), larger_me(0), larger_left(INT_MAX), larger_right(INT_MAX) - { - } - int count; //number of processes with this annotation - int size; //total number of processes in subtree including this node - int larger; //minimum of larger_left, larger_me, and, larger_right - int larger_me; - int larger_left; - int larger_right; -}; - -namespace { - - int min(int a, int b) { - return (a < b)? a : b; - } - - int MIN_THREE(int a, int b, int c) { - return (a < b) ? min(a,c) : min(b,c); - } - -} - -class Live_P_Tree : public ACE_RB_Tree<int, AnnotationNode, ACE_Equal_To<int>, ACE_Thread_Mutex> { - -public: - Live_P_Tree(int maxThreads); - virtual ~Live_P_Tree(); - int bind(const int& ext_id); - int unbind (const int &ext_id); - int calc_max() const; -protected: - void RB_rotate_right(ACE_RB_Tree_Node<int, AnnotationNode> *x); - void RB_rotate_left(ACE_RB_Tree_Node<int, AnnotationNode> *x); -private: - void recalculate_augmentation(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr); - void recalculate_augmentation_up(ACE_RB_Tree_Node<int, AnnotationNode>* x); - int calc_max_i(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr, int extra) const; - int T_; -}; - -ACE_INLINE -Live_P_Tree::Live_P_Tree(int maxThreads) -:ACE_RB_Tree<int, AnnotationNode, ACE_Equal_To<int>, ACE_Thread_Mutex>(), - T_(maxThreads) { - -} - -ACE_INLINE -Live_P_Tree::~Live_P_Tree() { -} - -ACE_INLINE -int -Live_P_Tree::bind(const int& ext_id) -{ - ACE_RB_Tree_Node<int, AnnotationNode>* entry = 0; - int returnVal = -1; //return error unless we return - //something else from the parent unbind - RB_SearchResult result = LEFT; - entry = find_node (ext_id, result); - // If there is a matching node, don't add a new one, just mod the existing one - if (entry && result == EXACT) { - entry->item().count++; - } else { - returnVal = ACE_RB_Tree<int, - AnnotationNode, - ACE_Equal_To<int>, - ACE_Thread_Mutex>::bind(ext_id, - AnnotationNode(), - entry); - } - recalculate_augmentation_up(entry); - return returnVal; -} - -void -Live_P_Tree::RB_rotate_right (ACE_RB_Tree_Node<int, AnnotationNode> *x) -{ - ACE_RB_Tree<int, AnnotationNode, ACE_Equal_To<int>, ACE_Thread_Mutex>::RB_rotate_right(x); - recalculate_augmentation_up(x); - -} - -void -Live_P_Tree::RB_rotate_left (ACE_RB_Tree_Node<int, AnnotationNode> *x) -{ - ACE_RB_Tree<int, AnnotationNode, ACE_Equal_To<int>, ACE_Thread_Mutex>::RB_rotate_left(x); - recalculate_augmentation_up(x); -} - -ACE_INLINE -int -Live_P_Tree::unbind(const int& ext_id) -{ - ACE_RB_Tree_Node<int, AnnotationNode>* entry = 0; - RB_SearchResult result = LEFT; - int returnVal = -1; //return error unless we return - //something else from the parent unbind - entry = find_node (ext_id, result); - // If there is a matching node, don't add a new one, just mod the existing one - if (entry && result == EXACT) { - if (--(entry->item().count) == 0) { - entry = entry->parent(); - returnVal = ACE_RB_Tree<int, AnnotationNode, ACE_Equal_To<int>, ACE_Thread_Mutex>::unbind(ext_id); - } - } else { - //exception? probably bad if we try to unbind something not in the tree - } - if (entry) { - recalculate_augmentation_up(entry); - } - return returnVal; -} - - -ACE_INLINE void -Live_P_Tree::recalculate_augmentation(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr) { - - AnnotationNode placeholderNode; - AnnotationNode& node = nodePtr->item(); - AnnotationNode& left = nodePtr->left() ? placeholderNode : nodePtr->left()->item(); - AnnotationNode& right = nodePtr->right() ? placeholderNode : nodePtr->right()->item(); - - // (1) size - node.size = left.size + right.size + node.count; - - // (2) larger_me - node.larger_me = T_ - (node.count + right.size + nodePtr->key()); - - // (3) larger_right - node.larger_right = right.larger; - - // (4) larger_left - node.larger_left = left.larger - (right.size + node.count); - - //(5) larger - node.larger = MIN_THREE(node.larger_me, node.larger_left, node.larger_right); -} - -ACE_INLINE void -Live_P_Tree::recalculate_augmentation_up(ACE_RB_Tree_Node<int, AnnotationNode>* x) { - while (x) { - recalculate_augmentation(x); - x = x->parent(); - } -} - -ACE_INLINE int -Live_P_Tree::calc_max() const { -// //note: need to add get_root method to RB_Tree - return 0;//calc_max_i(get_root(), 0); -} - -ACE_INLINE int -Live_P_Tree::calc_max_i(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr, int extra) const { - AnnotationNode& n = nodePtr->item(); - - if ( n.larger_left - extra==0) { - return calc_max_i(nodePtr->left(), extra + nodePtr->right()->item().size + n.count); } - else if (n.larger_me - extra==0) { return (nodePtr->key()); } - else if (n.larger_right - extra==0) { return calc_max_i(nodePtr->right(), extra); } - else { return T_; } -} - -template <typename AnnotationId> -ACE_INLINE -Live_P_Strategy<AnnotationId>::Live_P_Strategy(int maxThreads) -:DA_Strategy_Base<AnnotationId>(maxThreads), - min_illegal_is_computed_(false), - min_illegal_(0) -{ -} - -template <typename AnnotationId> -ACE_INLINE -Live_P_Strategy<AnnotationId>::~Live_P_Strategy() -{ -} - - - -template <typename AnnotationId> -ACE_INLINE -int -Live_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) -{ - int annotation = get_annotation(handle); - computation_mutex_.acquire(); - if (!min_illegal_is_computed_) - { - if (tree_pimpl_->current_size() > 1) - { - min_illegal_ = tree_pimpl_->calc_max(); - } - min_illegal_is_computed_ = true; - } - computation_mutex_.release(); - - if (annotation >= min_illegal_) - { - return annotation - min_illegal_ + 1; - } - - return 0; -} - -template <typename AnnotationId> -ACE_INLINE -void -Live_P_Strategy<AnnotationId>::grant(AnnotationId handle) -{ - int annotation = get_annotation(handle); - //since the state of the tree is involved in calculation - //of max, we must aquire the lock before changing the - //structure of the tree - computation_mutex_.acquire(); - tree_pimpl_->bind(annotation); - min_illegal_is_computed_ = false; - computation_mutex_.release(); -} - -template <typename AnnotationId> -ACE_INLINE -void -Live_P_Strategy<AnnotationId>::release(AnnotationId handle) -{ - //since the state of the tree is involved in calculation - //of max, we must aquire the lock before changing the - //structure of the tree - computation_mutex_.acquire(); - min_illegal_is_computed_ = false; - int annotation = get_annotation(handle); - tree_pimpl_->unbind(annotation); - computation_mutex_.release(); -} diff --git a/ACE/ace/PIP_Active_IO_Handler.cpp b/ACE/ace/PIP_Active_IO_Handler.cpp deleted file mode 100644 index 46bfc8fcee2..00000000000 --- a/ACE/ace/PIP_Active_IO_Handler.cpp +++ /dev/null @@ -1,99 +0,0 @@ -// $Id$ - -#include "ace/PIP_Active_IO_Handler.h" - - -#include <iostream> -/// Constructor -ACE_PIP_Active_IO_Handler::ACE_PIP_Active_IO_Handler() - : shutdown_(false) -{ - // acquire the shutdown lock so that when shutdown_svc is called, - // the caller cannot return until shutdown has been completed and - // lock relinquished - shutdown_lock_.acquire(); -} - -/// Closes all remote connections. -int ACE_PIP_Active_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) -{ - int result(0); - switch(close_mask) - { - case ACE_Event_Handler::READ_MASK: - read_closed_ = true; - break; - case ACE_Event_Handler::WRITE_MASK: - write_closed_ = true; - break; - }; - - if (read_closed_ && write_closed_) - { - // Close our end of the connection - peer_.close_reader(); - peer_.close_writer(); - delete this; - return -1; - } - - return 0; -} - - -/// Enqueue a message to be sent -int ACE_PIP_Active_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) -{ - outgoing_message_queue_.enqueue(message); -} - -int ACE_PIP_Active_IO_Handler::svc() -{ - int result(0); - ssize_t bytes_available(0); - char byte; - - // run until we're told to quit - while (!shutdown_) - { - // peek to see if incoming message available - bytes_available = peer_.recv(&byte, 1, MSG_PEEK); - if (bytes_available > 0) - { - handle_input(); - } - - // handle outgoing message - result = handle_output(); - if (result == -2) - { - // indicate to caller that the - // handler is no longer active - return -1; - } - - bytes_available = 0; - } - - - return 0; -} - -void ACE_PIP_Active_IO_Handler::shutdown_svc() -{ - shutdown_ = true; - shutdown_lock_.acquire(); - - handle_close(0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK); - -} - -int ACE_PIP_Active_IO_Handler::open(void*) -{ - std::cout << "activate" << std::endl; - this->activate(); -} - - - - diff --git a/ACE/ace/PIP_Active_IO_Handler.h b/ACE/ace/PIP_Active_IO_Handler.h deleted file mode 100644 index b50b606b2e2..00000000000 --- a/ACE/ace/PIP_Active_IO_Handler.h +++ /dev/null @@ -1,54 +0,0 @@ - /** - * @file PIP_Active_IO_Handler.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that manages network I/O in a dedicated thread -*/ - - -#ifndef _PIP_ACTIVE_IO_HANDLER_H_ -#define _PIP_ACTIVE_IO_HANDLER_H_ - -#include "ace/PIP_IO_Handler.h" - -/** - * @class ACE_PIP_Active_IO_Handler - * - * @brief Performs network I/O in a dedicated thread - * - * @author John Moore <ljohn7@gmail.com> - */ -class ACE_Export ACE_PIP_Active_IO_Handler : - public ACE_PIP_IO_Handler -{ - public: - - /// Constructor - ACE_PIP_Active_IO_Handler (); - - /// Enqueue a message to be sent - virtual int put_message (ACE_PIP_Protocol_Message* message); - - /// Closes all remote connections. - virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); - - /// Performs message I/O - virtual int svc(); - - /// Shuts down the service. Result is handler deactivated and - /// deleted - void shutdown_svc(); - - virtual int open(void* = 0); - - private: - - bool shutdown_; - ACE_Mutex shutdown_lock_; -}; - -#endif /* _PIP_Active_IO_Handler_H_ */ diff --git a/ACE/ace/PIP_Connection_Manager.cpp b/ACE/ace/PIP_Connection_Manager.cpp deleted file mode 100644 index d92d2e27895..00000000000 --- a/ACE/ace/PIP_Connection_Manager.cpp +++ /dev/null @@ -1,242 +0,0 @@ - /** - * @file PIP_Connection_Manager.cpp - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - */ - - - - -#include <ace/INET_Addr.h> -#include <ace/PIP_Connection_Manager.h> - - -ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::connection_manager_ = 0; -ACE_Mutex ACE_PIP_Connection_Manager::instance_lock_; -bool ACE_PIP_Connection_Manager::delete_manager_ = false; - -/// Default Constructor -ACE_PIP_Connection_Manager::ACE_PIP_Connection_Manager() -{ - -} - -/// Destructor -ACE_PIP_Connection_Manager::~ACE_PIP_Connection_Manager() -{ - -} - -ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::instance() -{ - if (connection_manager_ == 0) - { - instance_lock_.acquire(); - - if (ACE_PIP_Connection_Manager::connection_manager_ == 0) - { - ACE_NEW_RETURN (ACE_PIP_Connection_Manager::connection_manager_, - ACE_PIP_Connection_Manager, - 0); - - delete_manager_ = true; - } - - instance_lock_.release(); - } - - return connection_manager_; -} - -int ACE_PIP_Connection_Manager::establish_connections(ACE_UINT32 source_site_id) -{ - int result(0); - - //establish connections - for (int i = 0; i < connection_definitions_->size(); ++i) - { - if ((*connection_definitions_)[i]->source_site_id == - source_site_id) - { - ACE_INET_Addr address; - address.set((*connection_definitions_)[i]->port, - (*connection_definitions_)[i]->address.c_str()); - - if ((*connection_definitions_)[i]->type == - Connection_Definition::ACTIVE) - { - ACE_PIP_Active_IO_Handler* handler = new ACE_PIP_Active_IO_Handler; - result = active_connector_.connect(handler, address); - if (result == -1) - { - return -1; - } - else - { - handler->init( - (*connection_definitions_)[i]->source_site_id, - (*connection_definitions_)[i]->destination_site_id, - (*connection_definitions_)[i]->priority); - - handlers_.push_back(handler); - } - } - else - { - ACE_PIP_Reactive_IO_Handler* handler = new ACE_PIP_Reactive_IO_Handler; - result = reactive_connector_.connect(handler, address); - if (result == -1) - { - std::cerr << "Unable to connect to " - << (*connection_definitions_)[i]->address << " " - << (*connection_definitions_)[i]->port - << std::endl; - - return -1; - } - else - { - handler->init( - (*connection_definitions_)[i]->source_site_id, - (*connection_definitions_)[i]->destination_site_id, - (*connection_definitions_)[i]->priority); - - handlers_.push_back(handler); - } - } - } - - } - - return result; -} - -int ACE_PIP_Connection_Manager::process_connection_file(char* file_name) -{ - // Expecting the file to contain one tuple per line - // where each is of form (source_id, dest_id, dest_address, dest_port, priority, type) - std::ifstream* my_stream = new std::ifstream; - - my_stream->open(file_name); - - if (my_stream->fail()) - { - std::cerr << "Failed to open connection file: " << file_name - << std::endl; - - return -1; - } - - std::string line; - std::string token; - int strlen; - int first_pos; - int second_pos; - Connection_Definition* current_definition(0); - - std::getline(*my_stream, line); - int num_entries = atoi(line.c_str()); - - connection_definitions_ = new ACE_Vector<Connection_Definition*>; - for (int i = 0; i < num_entries; ++i) - { - current_definition = new Connection_Definition; - std::getline(*my_stream, line); - strlen = line.length(); - first_pos = line.find("("); - if (first_pos > strlen) - { - delete current_definition; - return -1; - } - - second_pos = line.find(",", first_pos); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // source site ID - token.assign(line, first_pos + 1, second_pos - first_pos - 1); - current_definition->source_site_id = atoi(token.c_str()); - - first_pos = second_pos; - second_pos = line.find(",", first_pos + 1); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // destination site ID - token.assign(line, first_pos + 1, second_pos - first_pos - 1); - current_definition->destination_site_id = atoi(token.c_str()); - - first_pos = second_pos; - second_pos = line.find(",", first_pos + 1); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // IP address - current_definition->address.assign(line, first_pos + 1, second_pos - first_pos - 1); - - first_pos = second_pos; - second_pos = line.find(",", first_pos + 1); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // IP port - token.assign(line, first_pos + 1, second_pos - first_pos - 1); - current_definition->port = atoi(token.c_str()); - - first_pos = second_pos; - second_pos = line.find(",", first_pos + 1); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // Connection priority - token.assign(line, first_pos + 1, second_pos - first_pos - 1); - current_definition->priority = atoi(token.c_str()); - - first_pos = second_pos; - second_pos = line.find(")", first_pos + 1); - if (second_pos > strlen) - { - delete current_definition; - return -1; - } - - // Connection Type - token.assign(line, first_pos + 1, second_pos - first_pos - 1); - if (token == "ACTIVE") - { - current_definition->type = Connection_Definition::ACTIVE; - } - else - { - current_definition->type = Connection_Definition::REACTIVE; - } - - connection_definitions_->push_back(current_definition); - } - - return 0; -} - -const ACE_Vector<ACE_PIP_Connection_Manager::Connection_Definition*>* ACE_PIP_Connection_Manager::get_connections() const -{ - return connection_definitions_; -} diff --git a/ACE/ace/PIP_Connection_Manager.h b/ACE/ace/PIP_Connection_Manager.h deleted file mode 100644 index e4925ec4568..00000000000 --- a/ACE/ace/PIP_Connection_Manager.h +++ /dev/null @@ -1,79 +0,0 @@ - /** - * @file PIP_Connection_Manager.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - */ - -#ifndef _PIP_CONNECTION_MANAGER_H_ -#define _PIP_CONNECTION_MANAGER_H_ - -#include <ace/Connector.h> -#include <ace/PIP_Active_IO_Handler.h> -#include <ace/PIP_Reactive_IO_Handler.h> -#include <ace/Reactor.h> -#include <ace/SOCK_Connector.h> -#include <ace/Vector_T.h> - -#include <fstream> -#include <iostream> -#include <string> -#include <vector> - -class ACE_Export ACE_PIP_Connection_Manager -{ - public: - - /// Informationa associated with a connection - struct Connection_Definition - { - enum Handler_Type {ACTIVE, REACTIVE}; - - ACE_UINT32 source_site_id; - ACE_UINT32 destination_site_id; - std::string address; - u_short port; - ACE_UINT32 priority; - Handler_Type type; - }; - - /// Default Constructor - ACE_PIP_Connection_Manager(); - - /// Destructor - virtual ~ACE_PIP_Connection_Manager(); - - /// obtain the single instance of the manager - static ACE_PIP_Connection_Manager* instance(); - - /// Extract all connection information from a file - virtual int process_connection_file(char* filename); - - /// Establish all connection for which source_site_id is the source - virtual int establish_connections(ACE_UINT32 source_site_id); - - const ACE_Vector<Connection_Definition*>* get_connections() const; - - private: - - ACE_Vector<Connection_Definition*>* connection_definitions_; - - // The connector used to actively connect to a remote site - ACE_Connector< - ACE_PIP_Active_IO_Handler, - ACE_SOCK_Connector> active_connector_; - - ACE_Connector< - ACE_PIP_Reactive_IO_Handler, - ACE_SOCK_Connector> reactive_connector_; - - static ACE_PIP_Connection_Manager* connection_manager_; - static ACE_Mutex instance_lock_; - static bool delete_manager_; - - std::vector<ACE_PIP_IO_Handler*> handlers_; -}; - -#endif diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.cpp b/ACE/ace/PIP_DA_Strategy_Adapter.cpp deleted file mode 100644 index ecfd42a4747..00000000000 --- a/ACE/ace/PIP_DA_Strategy_Adapter.cpp +++ /dev/null @@ -1,4 +0,0 @@ -// $Id$ - -#include "PIP_DA_Strategy_Adapter.h" - diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.h b/ACE/ace/PIP_DA_Strategy_Adapter.h deleted file mode 100644 index a0899aedaa7..00000000000 --- a/ACE/ace/PIP_DA_Strategy_Adapter.h +++ /dev/null @@ -1,261 +0,0 @@ - /** - * @file PIP_DA_Strategy_Adapter.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that adapts a deadlock avoidance strategy to additionally - * support priority inheritance protocol annotations -*/ - - -#ifndef _PIP_DA_STRATEGY_ADAPTER_ -#define _PIP_DA_STRATEGY_ADAPTER_ - -#include "ace/DA_Strategy_Base.h" -#include "ace/Hash_Map_Manager.h" -#include "ace/Unbounded_Set.h" -#include "ace/Mutex.h" -#include "ace/Null_Mutex.h" - - -#include <iostream> - -/** - * @class ACE_PIP_DA_Strategy_Adapter - * @brief Extends deadlock avoidance strategies - * to support priority inheritance annotations - * - * Deadlock avoidance strategies associate a resource cost annotation - * with each handle. This class extends the strategies to support - * the association of annotations with each priority at which the - * handle can be dispatched, i.e. the priority at which the corresponding - * thread resource can dispatch the handle -*/ -template <typename Handle_Id, typename Lock> -class ACE_PIP_DA_Strategy_Adapter -{ - public: - - /// Constructor that takes the deadlock avoidance strategy that - /// the Strategy Adapter adapts. - ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy); - ~ACE_PIP_DA_Strategy_Adapter(); - - /// Indicates whether allocating a thread to the handle - /// at the specified priority could potentially result in deadlock. - int is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority); - - /// Grant the handle a thread at the specified priority. - void grant(Handle_Id handle, ACE_UINT32 priority); - - /// Release the thread - void release(Handle_Id handle, ACE_UINT32 priority); - - /// Determine the number of threads being managed by - /// the DA_Strategy adapter. - int get_max_threads(); - - /// Add an annotation value for the handle / priority pair. - int add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation); - - /// Remove every annotation associated with this handle. - int remove_annotation (Handle_Id handle); - int remove_annotation (Handle_Id handle, ACE_UINT32 priority); - -private: - - /// Associates each message handler with an internally generated id - /// which can be used, along with a priority, to lookup an annotation. - typedef ACE_Hash_Map_Manager_Ex<Handle_Id, - ACE_UINT32, - ACE_Hash<Handle_Id>, - ACE_Equal_To<Handle_Id>, - ACE_Null_Mutex> HANDLE_ID_MAP; - - /// Associates each message handler with a set of potential priorities. - /// Message handler represented by internally generated id. - typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32, - ACE_Unbounded_Set<ACE_UINT32>*, - ACE_Hash<ACE_UINT32>, - ACE_Equal_To<ACE_UINT32>, - ACE_Null_Mutex> HANDLE_ID_PRIORITY_MAP; - - /// Determines an id that uniquely identifies a handler/priority pair. - ACE_UINT64 hash_handle_id_and_priority(ACE_UINT32 handle_id, - ACE_UINT32 priority) const; - - /// Generates an annotation ID given the actual handle and priority. - ACE_UINT64 get_annotation_id(Handle_Id handle, ACE_UINT32 priority); - - DA_Strategy_Base<ACE_UINT64>* DA_strategy_; - HANDLE_ID_MAP handle_ids_; - HANDLE_ID_PRIORITY_MAP id_to_priority_map_; - Lock lock_; - ACE_UINT32 next_id_; -}; - -template <typename Handle_Id, typename Lock> -ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy) -: DA_strategy_(DA_strategy) -, next_id_(0) -{ -} - -template <typename Handle_Id, typename Lock> -ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::~ACE_PIP_DA_Strategy_Adapter() -{ - HANDLE_ID_PRIORITY_MAP::iterator it = id_to_priority_map_.begin(); - for (; it != id_to_priority_map_.end(); ++it) - { - delete it->item(); - } -} - -template <typename Handle_Id, typename Lock> -ACE_INLINE int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::get_max_threads() -{ - return DA_strategy_->get_max_threads(); -} - -template <typename Handle_Id, typename Lock> -ACE_INLINE ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - hash_handle_id_and_priority(ACE_UINT32 handle_id, ACE_UINT32 priority) const -{ - ACE_UINT64 result = handle_id; - result = (result << 32) | priority; - return result; -} - -template <typename Handle_Id, typename Lock> -int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority) -{ - ACE_Guard<Lock> guard(lock_); - ACE_UINT64 annotation_id = get_annotation_id(handle, priority); - return DA_strategy_->is_deadlock_potential(annotation_id); -} - -template <typename Handle_Id, typename Lock> -void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - grant(Handle_Id handle, ACE_UINT32 priority) -{ - ACE_Guard<Lock> guard(lock_); - ACE_UINT64 annotation_id = get_annotation_id(handle, priority); - return DA_strategy_->grant(annotation_id); -} - -template <typename Handle_Id, typename Lock> -void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - release(Handle_Id handle, ACE_UINT32 priority) -{ - ACE_Guard<Lock> guard(lock_); - ACE_UINT64 annotation_id = get_annotation_id(handle, priority); - DA_strategy_->release(annotation_id); -} - -template <typename Handle_Id, typename Lock> -int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation) -{ - ACE_UINT32 internal_handle_id(0); - ACE_Unbounded_Set<ACE_UINT32>* priorities(0); - - ACE_Guard<Lock> guard(lock_); - if (handle_ids_.find(handle, internal_handle_id) == -1) - { - // This is the first time handle has been encountered, so generate an - // internal handle id. - internal_handle_id = next_id_++; - handle_ids_.bind(handle, internal_handle_id); - priorities = new ACE_Unbounded_Set<ACE_UINT32>; - id_to_priority_map_.bind(internal_handle_id, priorities); - } - else - { - id_to_priority_map_.find(internal_handle_id, priorities); - } - - priorities->insert(priority); - - return DA_strategy_->add_annotation( - hash_handle_id_and_priority(internal_handle_id, priority), annotation); -} - -template <typename Handle_Id, typename Lock> -int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - remove_annotation (Handle_Id handle) -{ - ACE_Guard<Lock> guard(lock_); - ACE_UINT32 internal_handle_id(0); - if (handle_ids_.unbind(handle, internal_handle_id) != -1) - { - ACE_Unbounded_Set<ACE_UINT32>* priorities(0); - if (id_to_priority_map_.unbind(internal_handle_id, priorities) != -1) - { - for (ACE_Unbounded_Set<ACE_UINT32>::ITERATOR it = priorities->begin(); - it != priorities->end(); - ++it) - { - DA_strategy_->remove_annotation( - get_annotation_id(internal_handle_id, *it)); - } - - delete priorities; - } - } - - return 0; -} - -template <typename Handle_Id, typename Lock> -int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - remove_annotation (Handle_Id handle, ACE_UINT32 priority) -{ - ACE_Guard<Lock> guard(lock_); - ACE_UINT32 internal_handle_id(0); - int result(0); - if (handle_ids_.find(handle, internal_handle_id) != -1) - { - ACE_Unbounded_Set<ACE_UINT32>* priorities(0); - if (id_to_priority_map_.find(internal_handle_id, priorities) != -1) - { - if (priorities->remove(priority) != -1) - { - result = DA_strategy_->remove_annotation( - get_annotation_id(internal_handle_id, priority)); - } - if (priorities->is_empty()) - { - // This was the last annotation for this handle, - // so remove the handle information - id_to_priority_map_.unbind(internal_handle_id, priorities); - delete priorities; - handle_ids_.unbind(handle, internal_handle_id); - } - } - } - - return result; -} - -template <typename Handle_Id, typename Lock> -ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: - get_annotation_id(Handle_Id handle, ACE_UINT32 priority) -{ - ACE_UINT64 annotation_id(0); - ACE_UINT32 handle_id(0); - - if (handle_ids_.find(handle, handle_id) != -1) - { - annotation_id = hash_handle_id_and_priority(handle_id, priority); - } - - return annotation_id; -} - -#endif - diff --git a/ACE/ace/PIP_Dispatcher.cpp b/ACE/ace/PIP_Dispatcher.cpp deleted file mode 100644 index dbc0931edbf..00000000000 --- a/ACE/ace/PIP_Dispatcher.cpp +++ /dev/null @@ -1,505 +0,0 @@ -#include "ace/PIP_Dispatcher.h" -#include "ace/PIP_Invocation_Manager.h" -#include "ace/PIP_Messages.h" -#include "ace/Reactor.h" - -#include <iostream> - -ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::dispatcher_ = 0; -ACE_Mutex ACE_PIP_Dispatcher::instance_lock_; -bool ACE_PIP_Dispatcher::delete_dispatcher_ = false; -bool ACE_PIP_Dispatcher::shutdown_ = false; - -/// Constructor -ACE_PIP_Dispatcher::ACE_PIP_Dispatcher() - : current_highest_priority_(ACE_Event_Handler::LO_PRIORITY) - , current_lowest_priority_(ACE_Event_Handler::LO_PRIORITY) - , DA_strategy_adapter_(0) - , message_available_signal_(0) - , threads_available_signal_(0) - , waiting_for_message_(false) -{ -} - -/// Destructor -ACE_PIP_Dispatcher::~ACE_PIP_Dispatcher() -{ - ACE_PIP_Protocol_Message* message(0); - - // Destroy all messages that have yet to be dispatched - pending_messages_lock_.acquire(); - while (pending_messages_by_message_id_.current_size() != 0) - { - pending_messages_by_message_id_.unbind( - pending_messages_by_message_id_.begin()->key(), - message); - - if (message) - { - delete message; - message = 0; - } - } - pending_messages_lock_.release(); -} - - -ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::instance() -{ - if (ACE_PIP_Dispatcher::dispatcher_ == 0) - { - instance_lock_.acquire(); - - if (ACE_PIP_Dispatcher::dispatcher_ == 0) - { - ACE_NEW_RETURN (ACE_PIP_Dispatcher::dispatcher_, - ACE_PIP_Dispatcher, - 0); - - delete_dispatcher_ = true; - } - - instance_lock_.release(); - } - - return dispatcher_; -} - -/// Receive a message for eventual dispatching -void ACE_PIP_Dispatcher::process_message(ACE_PIP_Protocol_Message* message) -{ - switch (message->get_message_type()) - { - case ACE_PIP_Protocol_Message::ACCEL: - process_incoming_acceleration(message); - break; - - case ACE_PIP_Protocol_Message::REQUEST: - process_incoming_request(message); - break; - - case ACE_PIP_Protocol_Message::RESPONSE: - // Forward the response to the invocation manager - ACE_PIP_Invocation_Manager::instance()->process_inbound_response(message); - break; - - default: - std::cerr << "PIP_Dispatcher::process_message: Invalid Message type of " << message->get_message_type() << std::endl; - } -} - - -/// Signals the dispatcher to dispatch a new message if possible. -int ACE_PIP_Dispatcher::handle_output (ACE_HANDLE) -{ - ACE_PIP_Protocol_Message* message(0); - bool message_dispatched(false); - - while (!message_dispatched && !shutdown_) - { - // get the highest priority message - pending_messages_lock_.acquire(); - message = retrieve_highest_priority_pending_message(); - if (message) - { - ACE_PIP_Data_Message* data_message = - static_cast<ACE_PIP_Data_Message*>(message->get_next()); - - deadlock_avoidance_lock_.acquire(); - - /// If dispatching could potentially cause deadlock, try to accelerate all lower priority - /// messages and then wait for threads to become available - num_threads_needed_ = DA_strategy_adapter_->is_deadlock_potential( - data_message->get_destination_handler_ID(), - data_message->get_message_priority()); - - if (num_threads_needed_ > 0) - { - deadlock_avoidance_lock_.release(); - find_and_accelerate_lower_priority_message(data_message->get_message_priority()); - - // Wait for signal indicating enough threads exist to dispatch the message - threads_available_signal_.acquire(); - - // Before grabing the deadlock avoidance lock, check to make sure - // we haven't been told to shutdown. - if (shutdown_) - break; - - deadlock_avoidance_lock_.acquire(); - } - - // At this point, sufficient threads exist to dispatch the message - // without threat of deadlock, so grant a thread - DA_strategy_adapter_->grant(data_message->get_destination_handler_ID(), - data_message->get_message_priority()); - - deadlock_avoidance_lock_.release(); - - // Transfer the message to the "dispatched" list - dispatched_messages_lock_.acquire(); - Dispatched_Message_Data dispatch_record; - dispatch_record.id = message->get_message_id(); - dispatch_record.priority = data_message->get_message_priority(); - dispatched_messages_data_.insert(dispatch_record); - dispatched_messages_lock_.release(); - - //-------------TEST DATA------------------ - // store statistics to be printed later - Dispatch_Test_Data test_data; - test_data.id = message->get_message_id(); - test_data.priority = data_message->get_message_priority(); - test_data.num_pending = num_pending_messages_; - test_data.highest_priority = current_highest_priority_; - test_data.lowest_priority = current_lowest_priority_; - dispatch_records_.push_back(test_data); - - dispatched_ids_.push_back(message->get_message_id()); - - ++num_messages_dispatched_; - --num_pending_messages_; - pending_messages_lock_.release(); - //----------------------------------------- - - // Request another thread to be associated with dispatcher - ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); - - message_dispatched = true; - - // Pass the message to the invocation manager for processing - ACE_PIP_Invocation_Manager::instance()->process_inbound_request(message); - - // All processing associated with the message has been completed - // so discard the record - dispatched_messages_lock_.acquire(); - dispatched_messages_data_.erase(dispatch_record); - dispatched_messages_lock_.release(); - - // Cleanup message information and release the thread resource - deadlock_avoidance_lock_.acquire(); - DA_strategy_adapter_->release(data_message->get_destination_handler_ID(), - data_message->get_message_priority()); - - if (num_threads_needed_ > 0) - { - --num_threads_needed_; - if (num_threads_needed_ == 0) - { - threads_available_signal_.release(); - } - } - - deadlock_avoidance_lock_.release(); - } - else - { - // There are no messages to dispatch, so wait for one to arrive - waiting_for_message_ = true; - pending_messages_lock_.release(); - message_available_signal_.acquire(); - - // Before dispatching a message, make sure we haven't been - // instructed to shutdown - if (shutdown_) - break; - } - } - - return 0; -} - - -/// Initializes dispatcher -void ACE_PIP_Dispatcher::init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter) -{ - DA_strategy_adapter_ = DA_strategy_adapter; - waiting_for_message_ = true; - ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); -} - -/// store the message -void ACE_PIP_Dispatcher::process_incoming_request(ACE_PIP_Protocol_Message* message) -{ - // Store the message token 2 ways to enable efficient dispatching as well as - // efficient lookup for accelerations - pending_messages_lock_.acquire(); - - //-------TEST DATA------------------------ - ++num_messages_received_; - ++num_pending_messages_; - received_ids_.push_back(message->get_message_id()); - - //------------------------------------------ - ACE_UINT32 priority = - static_cast<ACE_PIP_Data_Message*>(message->get_next())->get_message_priority(); - - // update the priority upper and lower bounds. These values are stored to - // avoid checking the full range of priorities when dispatching messages - if (priority > current_highest_priority_) - { - current_highest_priority_ = priority; - } - else if (priority < current_lowest_priority_) - { - current_lowest_priority_ = priority; - } - - PRIORITY_MESSAGE_LIST_MAP::iterator - message_iter = pending_messages_by_priority_.find(priority); - - if (message_iter == pending_messages_by_priority_.end()) - { - // Create a new entry for this priority level - std::list<ACE_PIP_Protocol_Message*> new_priority_list; - new_priority_list.push_back(message); - pending_messages_by_priority_.insert( - make_pair(priority, new_priority_list)); - } - else - { - // Priority already exists, so add the message token to the list - message_iter->second.push_back(message); - } - - pending_messages_by_message_id_.bind(message->get_message_id(), message); - - if (waiting_for_message_) - { - waiting_for_message_ = false; - - // Signal waiting dispatcher thread to dispatch new message - message_available_signal_.release(); - } - - pending_messages_lock_.release(); - -} - -/// Find the highest priority message and return it -ACE_PIP_Protocol_Message* ACE_PIP_Dispatcher:: - retrieve_highest_priority_pending_message() -{ - ACE_PIP_Protocol_Message* message(0); - for (ACE_INT32 current_priority = (ACE_INT32)current_highest_priority_; - current_priority >= (ACE_INT32)current_lowest_priority_; - --current_priority) - { - PRIORITY_MESSAGE_LIST_MAP::iterator - pending_message_iter = pending_messages_by_priority_.find(current_priority); - - for (; pending_message_iter != pending_messages_by_priority_.end(); - ++pending_message_iter) - { - std::list<ACE_PIP_Protocol_Message*>::iterator next_message_iter = - pending_message_iter->second.begin(); - - if (next_message_iter != pending_message_iter->second.end()) - { - // The highest-priority message has been found. Grab the message - // and remove it from both containers - message = *next_message_iter; - pending_message_iter->second.pop_front(); - pending_messages_by_message_id_.unbind(message->get_message_id()); - break; - } - else - { - // There are no messages at this priority. Since the search begins at - // the highest priority, lower the highest priority until a message - // is found - if (current_highest_priority_ > current_lowest_priority_) - { - --current_highest_priority_; - } - } - } - - if (message) - { - break; - } - } - - return message; -} - -bool ACE_PIP_Dispatcher:: -find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority) -{ - bool found(false); - bool erased_this_pass(true); - - dispatched_messages_lock_.acquire(); - - while(erased_this_pass) - { - erased_this_pass = false; - - std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin(); - - // Find all dispatched messages having priority lower than new_priority. For each - // send an acceleration message, and update the dispatch record - for (; iter != dispatched_messages_data_.end() && - num_threads_needed_ > 0; ++iter) - { - if (iter->priority < new_priority) - { - // A message has been found that has a lower priority, - // so the send an acceleration message - ACE_PIP_Accel_Message* accel_message = new ACE_PIP_Accel_Message; - accel_message->set_old_priority(iter->priority); - accel_message->set_new_priority(new_priority); - - ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message; - protocol_message->set_message_type(ACE_PIP_Protocol_Message::ACCEL); - protocol_message->set_message_id(iter->id); - protocol_message->set_next(accel_message); - - Dispatched_Message_Data dispatch_record = *iter; - dispatched_messages_data_.erase(iter); - dispatch_record.priority = new_priority; - dispatched_messages_data_.insert(dispatch_record); - std::cout << "PIP_Dispatcher::find_and_accel : accelerating " << iter->id << std::endl; - ACE_PIP_Invocation_Manager::instance()->process_acceleration(protocol_message); - found = true; - erased_this_pass = true; - break; - } - } - } - - dispatched_messages_lock_.release(); - pending_messages_lock_.release(); -} - -void ACE_PIP_Dispatcher::shutdown() -{ - shutdown_ = true; - - // Pulse signals so waiting threads can quit - message_available_signal_.release(); - threads_available_signal_.release(); -} - -void ACE_PIP_Dispatcher::process_incoming_acceleration(ACE_PIP_Protocol_Message* message) -{ - bool updated_pending(false); - // Look for pending message. If the message is pending, update the priority, move it around in data structures, and quit - - ACE_PIP_Accel_Message* accel_message = - static_cast<ACE_PIP_Accel_Message*>(message->get_next()); - - pending_messages_lock_.acquire(); - ACE_Hash_Map_Entry<ACE_UINT64, ACE_PIP_Protocol_Message*>* entry(0); - if (pending_messages_by_message_id_.find(message->get_message_id(), entry) == 0) - { - ACE_PIP_Data_Message* data_message = - static_cast<ACE_PIP_Data_Message*>(entry->item()->get_next()); - - data_message->set_message_priority(accel_message->get_new_priority()); - - // move the message from one priority to the other - updated_pending = true; - - std::cout << "Dispatcher::Accelerated pending message" << std::endl; - } - pending_messages_lock_.release(); - - if (!updated_pending) - { - bool found(false); - ACE_Guard<ACE_Mutex> guard(dispatched_messages_lock_); - // Message is not pending, so must already be dispatche - std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin(); - - // Find all dispatched messages having priority lower than new_priority. For each - // send an acceleration message, and update the dispatch record - for (; iter != dispatched_messages_data_.end(); ++iter) - { - if ((iter->id == message->get_message_id()) && - (iter->priority < accel_message->get_new_priority())) - { - std::cout << "Dispatcher::Accelerated dispatched message" << std::endl; - Dispatched_Message_Data dispatch_record = *iter; - dispatched_messages_data_.erase(iter); - dispatch_record.priority = accel_message->get_new_priority(); - dispatched_messages_data_.insert(dispatch_record); - ACE_PIP_Invocation_Manager::instance()->process_acceleration(message); - found = true; - break; - } - } - - if (!found) - { - for (std::vector<ACE_UINT64>::iterator it = received_ids_.begin(); - it != received_ids_.end(); ++it) - { - if (*it == message->get_message_id()) - { - std::cout << "MessageID: " << *it << " already came and left" << std::endl; - found = true; - break; - } - } - if (!found) - { - std::cout << "Accel for messageID: " << message->get_message_id() << - " beat message to the remote dispatcher" << std::endl; - } - } - - } -} - - - -void ACE_PIP_Dispatcher::print_results() -{ - std::cout << "----------------------DISPATCHER_RESULTS-------------" << std::endl; - std::cout << std::endl; - std::cout << "Num received: " << num_messages_received_ << std::endl; - std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl; - std::cout << std::endl; - - std::cout << "Received Ids: " << std::endl; - for (std::vector<ACE_UINT64>::iterator rec_id_iter = received_ids_.begin(); - rec_id_iter != received_ids_.end(); - ++rec_id_iter) - { - std::cout << *rec_id_iter << std::endl; - } - - std::cout << std::endl; - std::cout << "Dispatched Ids: " << std::endl; - for (std::vector<ACE_UINT64>::iterator disp_id_iter = dispatched_ids_.begin(); - disp_id_iter != dispatched_ids_.end(); - ++disp_id_iter) - - { - std::cout << *disp_id_iter << std::endl; - } - - std::cout << std::endl; - std::cout << "Dispatch Records: " << std::endl; - for (std::vector<ACE_PIP_Dispatcher::Dispatch_Test_Data>::iterator rec_iter = dispatch_records_.begin(); - rec_iter != dispatch_records_.end(); - ++rec_iter) - - { - std::cout << "Id: " << rec_iter->id << std::endl; - std::cout << "Priority: " << rec_iter->priority << std::endl; - std::cout << "Num Pending: " << rec_iter->num_pending << std::endl; - std::cout << "Highest Priority " << rec_iter->highest_priority << std::endl; - std::cout << "Lowest Priority " << rec_iter->lowest_priority << std::endl; - std::cout << std::endl; - } - - std::cout << std::endl; - std::cout << "Num received: " << num_messages_received_ << std::endl; - std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl; - std::cout << std::endl; - - - std::cout << "-----------------------------------------------------" << std::endl; -} diff --git a/ACE/ace/PIP_Dispatcher.h b/ACE/ace/PIP_Dispatcher.h deleted file mode 100644 index d93b2957ca6..00000000000 --- a/ACE/ace/PIP_Dispatcher.h +++ /dev/null @@ -1,188 +0,0 @@ - /** - * @file PIP_Dispatcher.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that dispatches priority inheritance protocol messages - * to the appropriate message handler. -*/ - - -#ifndef _PIP_DISPATCHER_H_ -#define _PIP_DISPATCHER_H_ - -// ACE definitions -#include "ace/Event_Handler.h" -#include "ace/Hash_Map_Manager.h" -#include "ace/PIP_DA_Strategy_Adapter.h" -#include "ace/PIP_Messages.h" -#include "ace/RW_Thread_Mutex.h" -#include "ace/Semaphore.h" -#include "ace/Singleton.h" - -// STL definitions -#include <list> -#include <map> -#include <set> -#include <vector> - -// Forward Declarations -class ACE_PIP_Protocol_Message; - -typedef std::map<ACE_UINT32, std::list<ACE_PIP_Protocol_Message*> > - PRIORITY_MESSAGE_LIST_MAP; - -// Associate each message with a message ID -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64, - ACE_PIP_Protocol_Message*, - ACE_Hash<ACE_UINT64>, - ACE_Equal_To<ACE_UINT64>, - ACE_Null_Mutex> ID_MESSAGE_MAP; - - -/** - * @class ACE_Dispatcher - * @brief Dispatches ACE_PIP_Priority_Messages in priority order - * message handlers. Additionally, notifies handlers when priority inversion is - * detected. - * - * The PIP_Message_Dispatcher implements the priority inheritance protocol. - * Upon receipt of messages, it determines the highest-priority message to - * be dispatched, and dispatches providing enough resources exist. If not enough exist, - * and a lower priority message has been dispatched, an acceleration message is sent - * to the corresponding handler to raise the priority of the message, thus - * mitigating the inversion. -*/ -class ACE_Export ACE_PIP_Dispatcher : public ACE_Event_Handler -{ - public: - - /// Constructor - ACE_PIP_Dispatcher(); - - /// Destructor - virtual ~ACE_PIP_Dispatcher(); - - /// obtain the single instance of the dispatcher - static ACE_PIP_Dispatcher* instance(); - - /// Receive a message for eventual dispatching - void process_message(ACE_PIP_Protocol_Message* message); - - /// Signals the dispatcher to dispatch a new message if possible. - virtual int handle_output (ACE_HANDLE); - - /// Initializes dispatcher - void init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter); - - /// Tell the dispatcher to stop dispatching and release all threads ASAP - void shutdown(); - - /// Accelerate the appropriate message - void process_incoming_acceleration(ACE_PIP_Protocol_Message* message); - - /// Print statistics - void print_results(); - - private: - - // Dispatched_Message_Data stores the ID and priority - // of a dispatched message - class Dispatched_Message_Data - { - public: - - bool operator<(const Dispatched_Message_Data& other) const - { - return (priority < other.priority); - } - - bool operator==(const Dispatched_Message_Data& other) const - { - return (id == other.id); - } - - bool operator!=(const Dispatched_Message_Data& other) const - { - return !(*this == other); - } - - ACE_UINT64 id; - ACE_UINT32 priority; - }; - - class Dispatch_Test_Data - { - public: - ACE_UINT64 id; - ACE_UINT64 priority; - ACE_UINT32 num_pending; - ACE_UINT32 highest_priority; - ACE_UINT32 lowest_priority; - }; - - /// store the message - void process_incoming_request(ACE_PIP_Protocol_Message* message); - - /// Find the highest priority message and return it - ACE_PIP_Protocol_Message* retrieve_highest_priority_pending_message(); - - bool find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority); - - - // Dispatched message data is stored to determine which messages are - // currently assigned to a thread. This is useful for finding messages - // whose priority needs to be accelerated in the case where an inversion - // is detected. - std::set<Dispatched_Message_Data> dispatched_messages_data_; - ACE_Mutex dispatched_messages_lock_; - - ACE_UINT32 current_highest_priority_; - ACE_UINT32 current_lowest_priority_; - - // Pending messages (those not dispatched) are stored in 2 ways for efficiency - // 1.) By message id - this is useful for managing priority accelerations - // because we can find the appropriate message in constant time - // 2.) By priority - this is useful for determining which message to dispatch next - // as messages are dispatched in priority order - PRIORITY_MESSAGE_LIST_MAP pending_messages_by_priority_; - ID_MESSAGE_MAP pending_messages_by_message_id_; - ACE_Mutex pending_messages_lock_; - - // Indicates the dispatcher has a thread waiting to - // dispatch a message - bool waiting_for_message_; - - // Number of threads that need to be returned in order to - // dispatch the current message - int num_threads_needed_; - - ACE_Semaphore message_available_signal_; - ACE_Semaphore threads_available_signal_; - - ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter_; - ACE_Mutex deadlock_avoidance_lock_; - - static ACE_PIP_Dispatcher* dispatcher_; - static ACE_Mutex instance_lock_; - static bool delete_dispatcher_; - static bool shutdown_; - - // Test variables - ACE_UINT32 num_pending_messages_; - ACE_UINT32 num_messages_received_; - ACE_UINT32 num_messages_dispatched_; - std::vector<ACE_UINT64> received_ids_; - std::vector<ACE_UINT64> dispatched_ids_; - std::vector<Dispatch_Test_Data> dispatch_records_; - -}; - -// Define a singleton class to make the dispatcher globally accessible -typedef ACE_Singleton<ACE_PIP_Dispatcher, ACE_Mutex> - ACE_PIP_Dispatcher_Singleton; - -#endif diff --git a/ACE/ace/PIP_IO_Handler.cpp b/ACE/ace/PIP_IO_Handler.cpp deleted file mode 100644 index f30713751a5..00000000000 --- a/ACE/ace/PIP_IO_Handler.cpp +++ /dev/null @@ -1,185 +0,0 @@ -// $Id$ - -#include "ace/Guard_T.h" -#include "ace/PIP_IO_Handler.h" -#include "ace/PIP_Invocation_Manager.h" -#include "ace/PIP_Dispatcher.h" - -/// Constructor -ACE_PIP_IO_Handler::ACE_PIP_IO_Handler() - : priority_set_(false) - , destination_site_id_(0) - , site_id_(0) - , handler_id_(0) - , millisecond_(0, 1000) -{ - // Temporarily assign the priority to be highest possible. - // The first message received by the handler will be the priority - this->priority(ACE_Event_Handler::HI_PRIORITY); -} - -/// Destructor -ACE_PIP_IO_Handler::~ACE_PIP_IO_Handler( ) -{ - // Tell the Invocation Manager to stop sending us messages - ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); - - // Delete all outgoing messages - ACE_PIP_Protocol_Message* message(0); - while (!outgoing_message_queue_.is_empty()) - { - outgoing_message_queue_.dequeue_tail(message); - delete message; - } -} - -void ACE_PIP_IO_Handler::site_id(ACE_UINT32 site_id) -{ - site_id_ = site_id; -} - -ACE_UINT32 ACE_PIP_IO_Handler::site_id() const -{ - return site_id_; -} - -ACE_UINT32 ACE_PIP_IO_Handler::destination_site_id() const -{ - return destination_site_id_; -} - -void ACE_PIP_IO_Handler::handler_id(ACE_UINT32 handler_id) -{ - handler_id_ = handler_id; -} - -ACE_UINT32 ACE_PIP_IO_Handler::handler_id() const -{ - return handler_id_; -} - -/// Initialize the priority of the handler, and inform the other end -/// of the priority -void ACE_PIP_IO_Handler::init(ACE_UINT32 site_id, - ACE_UINT32 destination_site_id, - ACE_UINT32 priority) -{ - this->priority(priority); - site_id_ = site_id; - destination_site_id_ = destination_site_id; - - // Inform other end of this connections priority - peer_.send(&priority, sizeof(priority)); - - // Inform other end of this end's site id - peer_.send(&site_id, sizeof(site_id)); - priority_set_ = true; - - // Register to receive outgoing messages - ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); -} - -void ACE_PIP_IO_Handler::extract_priority() -{ - ACE_UINT32 priority(0); - if (peer_.recv(&priority, sizeof(priority)) == sizeof(priority)) - { - this->priority(priority); - } - else - { - this->priority(ACE_Event_Handler::LO_PRIORITY); - } - - // Receive the other end's site id - if (peer_.recv(&destination_site_id_, sizeof(destination_site_id_)) != sizeof(destination_site_id_)) - { - destination_site_id_ = 0; - } - - priority_set_ = true; -} - -/// Handles read event on socket. -int ACE_PIP_IO_Handler::handle_input (ACE_HANDLE fd) -{ - int result(0); - int bytes_read(0); - - if (!priority_set_) - { - // incoming message is the priority of this connection - extract_priority(); - ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); - } - else - { - // Read the next incoming message - ACE_PIP_Protocol_Message* message = new ACE_PIP_Protocol_Message; - bytes_read = message->deserialize(peer_); - if (bytes_read > 0) - { - if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL) - { - std::cout << "Accel Message Received" << std::endl; - } - - ACE_PIP_Dispatcher::instance()->process_message(message); - } - else if (bytes_read < 0) - { - // The connection is broken, so handler should be deleted - delete message; - result = -1; - } - } - - return result; -} - - -/// Handles output event on socket -int ACE_PIP_IO_Handler::handle_output (ACE_HANDLE fd) -{ - int bytes_sent(0); - // determine if outgoing messages exist - ACE_PIP_Protocol_Message* message(0); - - write_closed_ = false; - big_lock_.acquire(); - if (outgoing_message_queue_.dequeue_tail(message) != -1) - { - if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL) - { - std::cout << "Sending accel message" << std::endl; - } - bytes_sent = message->serialize(peer_); - delete message; - if (bytes_sent >= 0) - { - big_lock_.release(); - return 0; - } - else - { - write_closed_ = true; - big_lock_.release(); - // indicate the outgoing connection is closed - return -2; - } - } - else - { - // indicate that there was no message to output - - big_lock_.release(); - return -1; - } -} - -ACE_INET_Addr ACE_PIP_IO_Handler::get_remote_address() const -{ - ACE_INET_Addr addr; - peer_.get_remote_addr(addr); - return addr; -} diff --git a/ACE/ace/PIP_IO_Handler.h b/ACE/ace/PIP_IO_Handler.h deleted file mode 100644 index 90665097b5e..00000000000 --- a/ACE/ace/PIP_IO_Handler.h +++ /dev/null @@ -1,94 +0,0 @@ - /** - * @file PIP_IO_Handler.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that manages network I/O -*/ - - -#ifndef _PIP_IO_HANDLER_H_ -#define _PIP_IO_HANDLER_H_ - - -#include "ace/Message_Queue.h" -#include "ace/Mutex.h" -#include "ace/PIP_Messages.h" -#include "ace/Svc_Handler.h" -#include "ace/Thread_Mutex.h" - -// Typedefs -typedef ACE_Message_Queue_Ex<ACE_PIP_Protocol_Message, ACE_NULL_SYNCH> - PROTO_MESSAGE_QUEUE_TYPE; - -/** - * @class ACE_PIP_IO_Handler - * - * @brief Performs network I/O - * - * @author John Moore <ljohn7@gmail.com> - */ -class ACE_Export ACE_PIP_IO_Handler : - public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_MT_SYNCH> -{ - public: - - /// Constructor - ACE_PIP_IO_Handler (); - - /// Destructor - virtual ~ACE_PIP_IO_Handler(); - - /// Enqueue a message to be sent - virtual int put_message (ACE_PIP_Protocol_Message* message) = 0; - - /// Initialize the priority of the handler, and inform the other end - /// of the priority - virtual void init(ACE_UINT32 site_id, - ACE_UINT32 destination_site_id, - ACE_UINT32 priority); - - /// Handles read event on socket. - virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - - /// Handles read event on socket. - virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE); - - /// Determine the id of the site at which the handler is located, - void site_id(ACE_UINT32 site_id); - ACE_UINT32 site_id() const; - - /// Determine the other end's site id - ACE_UINT32 destination_site_id() const; - - /// Determine the id that uniquely identifies this handler, - void handler_id(ACE_UINT32 handler_id); - ACE_UINT32 handler_id() const; - ACE_INET_Addr get_remote_address() const; - - protected: - - /// Reads priority from socket - void extract_priority(); - - // variables to track the state of the handler - bool read_closed_; - bool write_closed_; - bool priority_set_; - - ACE_UINT32 site_id_; - ACE_UINT32 handler_id_; - ACE_UINT32 destination_site_id_; - - const ACE_Time_Value millisecond_; - - PROTO_MESSAGE_QUEUE_TYPE outgoing_message_queue_; - ACE_Thread_Mutex big_lock_; -}; - -#endif /* _PIP_IO_Handler_H_ */ - - diff --git a/ACE/ace/PIP_Invocation_Manager.cpp b/ACE/ace/PIP_Invocation_Manager.cpp deleted file mode 100644 index ddc847b42da..00000000000 --- a/ACE/ace/PIP_Invocation_Manager.cpp +++ /dev/null @@ -1,358 +0,0 @@ - -#include "ace/PIP_Invocation_Manager.h" - -#include "ace/PIP_IO_Handler.h" - - -ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::invocation_manager_ = 0; -ACE_Mutex ACE_PIP_Invocation_Manager::instance_lock_; -bool ACE_PIP_Invocation_Manager::delete_manager_ = false; -ACE_UINT64 ACE_PIP_Invocation_Manager::message_id_base_ = 0; -ACE_UINT32 ACE_PIP_Invocation_Manager::site_id_ = 0; - -/// Constructor -ACE_PIP_Invocation_Manager::ACE_PIP_Invocation_Manager() - : message_counter_(0) -{ -} - -/// Destructor -ACE_PIP_Invocation_Manager::~ACE_PIP_Invocation_Manager() -{ - -} - -/// Processes requests received from I/O handler -void ACE_PIP_Invocation_Manager::process_inbound_request(ACE_PIP_Protocol_Message* message) -{ - ACE_PIP_Data_Message* payload = - static_cast<ACE_PIP_Data_Message*>(message->release_next()); - - ACE_UINT32 handler_id = payload->get_destination_handler_ID(); - ACE_PIP_Message_Handler* handler(0); - - big_lock_.acquire(); - ACE_UINT64 message_id = message->get_message_id(); - if (object_id_handler_map_.find(handler_id, handler) == 0) - { - // look to see if there are any accelerations. If so, accelerate. - // Map the message ID to a list of outgoing messages - in_out_id_map_.bind(message_id, - std::list<ACE_UINT64>()); - - // Keep a record of the message and its priority so - // it can be accelerated if necessary - Invocation_Data invocation_data; - invocation_data.site_id = site_id_; - invocation_data.priority = payload->get_message_priority(); - invocation_data_map_.bind(message_id, invocation_data); - - big_lock_.release(); - - // Pass the message to the message handler, deleting - // the corresponding struct - handler->process_incoming_message(payload->release_block(), message_id); - delete payload; - delete message; - - // Once message processing has completed, - // clean-up any message residue - big_lock_.acquire(); - in_out_id_map_.unbind(message_id); - } - else - { - std::cerr << "Invocation_Manager::Unable to find message handler: " << handler_id << std::endl; - } - - big_lock_.release(); -} - -/// Processes request to be forwarded to another handler -void ACE_PIP_Invocation_Manager::process_outbound_request(ACE_Message_Block* message, - ACE_UINT64 token, - ACE_Future<ACE_Message_Block*>*& response_holder) -{ - // Create a protocol message from the data block - ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message; - protocol_message->set_message_type(ACE_PIP_Protocol_Message::REQUEST); - protocol_message->process_message_payload(message); - - ACE_PIP_Data_Message* data_message = - static_cast<ACE_PIP_Data_Message*>(protocol_message->get_next()); - - // determine if the message expects a reply. If so, create a future for it. - if (data_message->get_reply_expected()) - { - // create and store future - response_holder = new ACE_Future<ACE_Message_Block*>; - } - - Invocation_Data remote_info; - remote_info.response_holder = response_holder; - remote_info.site_id = data_message->get_destination_site_ID(); - remote_info.priority = data_message->get_message_priority(); - - // Associate this message with token. This enables acceleration forwarding. Only - // Token == -1 indicates this is the start of a call chain - - big_lock_.acquire(); - ACE_UINT64 message_id = generate_message_id(); - invocation_data_map_.bind(message_id, remote_info); - protocol_message->set_message_id(message_id); - - if (token != 0) - { - ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* entry(0); - // map the originating message to the outgoing message - // so that accelerations can be forwarded appropriately - in_out_id_map_.find(token, entry); - if (entry) - { - entry->item().push_back(message_id); - } - } - - // use the priority and address to determine which I/O handler to send to - // pass the message to the I/O handler - PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); - if (site_to_handlers_map_.find(data_message->get_destination_site_ID(), - handler_map) == -1) - { - std::cerr << "Unable to locate priority->I/O handler mapping associated with site " - << data_message->get_destination_site_ID() - << std::endl; - exit(-1); - } - - ACE_PIP_IO_Handler* IO_handler(0); - if (handler_map->find(data_message->get_message_priority(), - IO_handler) == -1) - { - std::cerr << "Invocation_Manager::Unable to locate I/O handler w/ priority " - << data_message->get_message_priority() - << std::endl; - exit(-1); - } - - big_lock_.release(); - - if (IO_handler) - { - IO_handler->put_message(protocol_message); - } -} - -ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::instance() -{ - if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0) - { - instance_lock_.acquire(); - - if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0) - { - ACE_NEW_RETURN (ACE_PIP_Invocation_Manager::invocation_manager_, - ACE_PIP_Invocation_Manager, - 0); - - delete_manager_ = true; - } - - instance_lock_.release(); - } - - return invocation_manager_; -} - -/// Process response received from a handler -void ACE_PIP_Invocation_Manager::process_inbound_response(ACE_PIP_Protocol_Message* message) -{ - Invocation_Data remote_info; - - ACE_Guard<ACE_Mutex> guard(big_lock_); - - // Remove the child ID - if (invocation_data_map_.unbind(message->get_message_id(), - remote_info) != -1) - { - // Pass the received response to the message handler - // via a Future - remote_info.response_holder->set(message->get_next()->get_block()); - remote_info.response_holder = 0; - } - else - { - std::cerr << "PIP_Invocation_Manager::process_inbound_response: failed to unbind message info" - << std::endl; - } -} - -/// Process response received from a handler -void ACE_PIP_Invocation_Manager::process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token) -{ - // Parse the message - ACE_PIP_Protocol_Message* response = new ACE_PIP_Protocol_Message; - response->process_message_payload(message); - response->set_message_type(ACE_PIP_Protocol_Message::RESPONSE); - - // Lookup the appropriate IO handler, and pass down the message - ACE_PIP_Data_Message* data_message = - static_cast<ACE_PIP_Data_Message*>(response->get_next()); - - ACE_Guard<ACE_Mutex> guard(big_lock_); - - response->set_message_id(token); - PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); - - if (site_to_handlers_map_.find - (data_message->get_destination_site_ID(), - handler_map) != -1) - { - ACE_PIP_IO_Handler* handler(0); - if (handler_map->find(data_message->get_message_priority(), - handler) != -1) - { - handler->put_message(response); - } - else - { - std::cerr << "PIP_Invocation_Manager::process_outbound_response: cannot find I/O handler with " - << data_message->get_message_priority() << " priority" - << std::endl; - } - } - else - { - std::cerr << "PIP_Invocation_Manager::process_outbound_response: cannot find destination site " - << data_message->get_destination_site_ID() - << std::endl; - } -} - -/// Process request to accelerate the priority of a process -void ACE_PIP_Invocation_Manager::process_acceleration(ACE_PIP_Protocol_Message* message) -{ - ACE_PIP_Accel_Message* accel_message = - static_cast<ACE_PIP_Accel_Message*>(message->get_next()); - - ACE_Guard<ACE_Mutex> guard(big_lock_); - - // Update the stored priority of the original message. This will enable subsequent upcalls - // to adjust their priority appropriately - Invocation_Data invocation_data; - if (invocation_data_map_.unbind(message->get_message_id(), invocation_data) == 0) - { - if (invocation_data.priority < accel_message->get_new_priority()) - { - invocation_data.priority = accel_message->get_new_priority(); - } - - invocation_data_map_.bind(message->get_message_id(), invocation_data); - - // Generate acceleration messages for each outgoing invocation - // resulting from processing of incoming request. Do so - // only if their priority is lower than the accelerated priority - ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* child_entry(0); - if (in_out_id_map_.find(message->get_message_id(), child_entry) == 0) - { - std::list<ACE_UINT64>::iterator child_iter = child_entry->item().begin(); - for (; child_iter != child_entry->item().end(); ++child_iter) - { - if (invocation_data_map_.unbind(*child_iter, invocation_data) == 0) - { - if (invocation_data.priority < accel_message->get_new_priority()) - { - invocation_data.priority = accel_message->get_new_priority(); - invocation_data_map_.bind(*child_iter, invocation_data); - // Generate new message and send it to the appropriate site - PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); - if (site_to_handlers_map_.find(invocation_data.site_id, - handler_map) == 0) - { - ACE_PIP_IO_Handler* handler(0); - if (handler_map->find(ACE_Event_Handler::HI_PRIORITY, handler) == 0) - { - ACE_PIP_Accel_Message* accel_copy = accel_message->copy(); - ACE_PIP_Protocol_Message* proto_copy = message->copy(); - proto_copy->set_next(accel_copy); - handler->put_message(proto_copy); - } - else - { - std::cerr << "Invocation_Manager::process_accel : no hi_priority I/O handler registered for site " - << invocation_data.site_id << std::endl; - } - } - } - else - { - std::cout << "Child priority already updated" << std::endl; - std::cout << "Accel pri: " << accel_message->get_new_priority() << " " - << "child pri: " << invocation_data.priority << std::endl; - - } - - invocation_data_map_.bind(*child_iter, invocation_data); - } - } - } - } - - // delete the acceleration message here -} - -/// Register an IO handler that can send messages on invocation -/// manager's behalf -void ACE_PIP_Invocation_Manager::register_IO_handler(ACE_PIP_IO_Handler* handler) -{ - // Extract the priority and remote address of handler - ACE_UINT32 priority = handler->priority(); - ACE_Guard<ACE_Mutex> guard(big_lock_); - - // Map the destination site ID and priority to this handler - PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); - if (site_to_handlers_map_.find(handler->destination_site_id(), - handler_map) == -1) - { - handler_map = new PRIORITY_TO_IO_HANDLER_MAP; - if (site_to_handlers_map_.bind(handler->destination_site_id(), handler_map) == -1) - { - std::cerr << "PIP_Invocation_Manager::register_IO_Handler: unable to bind ip to (priority->handler) map" << std::endl; - } - } - - handler_map->bind(priority, handler); -} - -void ACE_PIP_Invocation_Manager::unregister_IO_handler(ACE_PIP_IO_Handler* handler) -{ - ACE_UINT32 priority = handler->priority(); - - ACE_Guard<ACE_Mutex> guard(big_lock_); - - // unbind the handler - PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); - if (site_to_handlers_map_.find(handler->destination_site_id(), - handler_map) != -1) - { - handler_map->unbind(priority, handler); - } -} - -void ACE_PIP_Invocation_Manager::register_message_handler(ACE_PIP_Message_Handler* handler) -{ - // extract the object id from the handler - // map the object id to the handler - ACE_Guard<ACE_Mutex> guard(big_lock_); - object_id_handler_map_.bind(handler->get_handler_id(), handler); -} - -ACE_UINT64 ACE_PIP_Invocation_Manager::generate_message_id() -{ - return (((ACE_UINT64)site_id_) << 32) + message_counter_++; -} - -void ACE_PIP_Invocation_Manager::set_site_id(ACE_UINT64 site_id) -{ - site_id_ = site_id; -} diff --git a/ACE/ace/PIP_Invocation_Manager.h b/ACE/ace/PIP_Invocation_Manager.h deleted file mode 100644 index 3fb84529844..00000000000 --- a/ACE/ace/PIP_Invocation_Manager.h +++ /dev/null @@ -1,150 +0,0 @@ - /** - * @file PIP_Invocation_Manager.h - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that tracks handler invocations at a particular site - */ - - -#ifndef _PIP_INVOCATION_MANAGER_H_ -#define _PIP_INVOCATION_MANAGER_H_ - -#include "ace/Containers_T.h" -#include "ace/Hash_Map_Manager.h" -#include "ace/Message_Block.h" -#include "ace/PIP_Messages.h" -#include "ace/PIP_Message_Handler.h" -#include "ace/Singleton.h" -#include "ace/Mutex.h" -#include "ace/Null_Mutex.h" -#include "ace/Future.h" - -#include <list> -class ACE_PIP_IO_Handler; - -struct Invocation_Data -{ - ACE_Future<ACE_Message_Block*>* response_holder; - ACE_UINT32 site_id; - ACE_UINT32 priority; -}; - -// Typedefs -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64, - std::list<ACE_UINT64>, - ACE_Hash<ACE_UINT64>, - ACE_Equal_To<ACE_UINT64>, - ACE_Null_Mutex> ID_TO_ID_LIST_MAP; - -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32, - ACE_PIP_Message_Handler*, - ACE_Hash<ACE_UINT32>, - ACE_Equal_To<ACE_UINT32>, - ACE_Null_Mutex> ID_TO_HANDLER_MAP; - -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64, - Invocation_Data, - ACE_Hash<ACE_UINT64>, - ACE_Equal_To<ACE_UINT64>, - ACE_Null_Mutex> ID_TO_INVOCATION_RECORD_MAP; - -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32, - ACE_PIP_IO_Handler*, - ACE_Hash<ACE_UINT32>, - ACE_Equal_To<ACE_UINT32>, - ACE_Null_Mutex> PRIORITY_TO_IO_HANDLER_MAP; - -typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32, - PRIORITY_TO_IO_HANDLER_MAP*, - ACE_Hash<ACE_UINT64>, - ACE_Equal_To<ACE_UINT64>, - ACE_Null_Mutex> SITE_TO_IO_HANDLERS_MAP; - -/** - * @class ACE_PIP_Invocation_Manager - * @brief -*/ -class ACE_Export ACE_PIP_Invocation_Manager -{ - public: - - /// Constructor - ACE_PIP_Invocation_Manager(); - - /// Destructor - ~ACE_PIP_Invocation_Manager(); - - /// Get the singleton instance of the Invocation Manager - static ACE_PIP_Invocation_Manager* instance(); - - /// Associated a site ID with the Invocation Manager - static void set_site_id(ACE_UINT64 site_id); - - /// Process request made on local handler - void process_inbound_request(ACE_PIP_Protocol_Message* message); - - /// Processes request to be forwarded to another handler - void process_outbound_request(ACE_Message_Block* message, - ACE_UINT64 token, - ACE_Future<ACE_Message_Block*>*& response_holder); - - /// Process response to message sent by local handler - void process_inbound_response(ACE_PIP_Protocol_Message* message); - - /// Process response sent to remote handler - void process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token); - - /// Process request to accelerate the priority of a process - void process_acceleration(ACE_PIP_Protocol_Message* message); - - /// Register an IO handler that can send messages of a certain priority - /// for the Invocation Manager - void register_IO_handler(ACE_PIP_IO_Handler* handler); - - /// Un-register an IO handler - void unregister_IO_handler(ACE_PIP_IO_Handler* handler); - - /// Register user-level message handler - void register_message_handler(ACE_PIP_Message_Handler* handler); - - private: - - - ACE_UINT64 generate_message_id(); - - ACE_UINT64 message_counter_; - - static ACE_UINT64 message_id_base_; - - static ACE_UINT32 site_id_; - - static ACE_PIP_Invocation_Manager* invocation_manager_; - - static ACE_Mutex instance_lock_; - - ACE_Mutex big_lock_; - - static bool delete_manager_; - - // Mapping of incoming messages to corresponding outgoing messages. - // This is used to track invocations resulting from an incoming invocation - // in order to pass acceleration messages along the chain - ID_TO_ID_LIST_MAP in_out_id_map_; - - // Mapping of user-level handler ID to corresponding handler - ID_TO_HANDLER_MAP object_id_handler_map_; - - // Mapping of site IDs to corresponding I/O handler map - SITE_TO_IO_HANDLERS_MAP site_to_handlers_map_; - - // Maps message ID to data such as destination site - // and priority - ID_TO_INVOCATION_RECORD_MAP invocation_data_map_; -}; - - -#endif diff --git a/ACE/ace/PIP_Message_Handler.cpp b/ACE/ace/PIP_Message_Handler.cpp deleted file mode 100644 index caf7a8f1ff6..00000000000 --- a/ACE/ace/PIP_Message_Handler.cpp +++ /dev/null @@ -1,105 +0,0 @@ -#include "ace/PIP_Message_Handler.h" -#include "ace/PIP_Invocation_Manager.h" - -ACE_PIP_Message_Handler::ACE_PIP_Message_Handler() - : handler_id_(0) - , site_id_(0) -{ - -} - -ACE_PIP_Message_Handler::ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id) - : site_id_(site_id) - , handler_id_(handler_id) -{ -} - -ACE_PIP_Protocol_Message* ACE_PIP_Message_Handler::create_protocol_message(ACE_UINT64 message_id, - bool reply_expected, - ACE_UINT32 source_handler_id, - ACE_UINT32 source_site_id, - ACE_UINT32 destination_handler_id, - ACE_UINT32 destination_site_id, - ACE_UINT32 message_priority, - ACE_PIP_Protocol_Message::Message_Type type, - const std::string& data_payload) -{ - // setup the proto message header - ACE_PIP_Protocol_Message* proto_message = new ACE_PIP_Protocol_Message; - proto_message->set_message_id(message_id); - proto_message->set_message_type(type); - - ACE_PIP_Data_Message* data_message = new ACE_PIP_Data_Message; - data_message->set_reply_expected(reply_expected); - - data_message->set_source_handler_ID(source_handler_id); - data_message->set_source_site_ID(source_site_id); - data_message->set_destination_handler_ID(destination_handler_id); - data_message->set_destination_site_ID(destination_site_id); - - data_message->set_message_priority(message_priority); - - // Create data message header and body, then pass to protocol message to be parsed - // and unpacked - ACE_Message_Block* header = new ACE_Message_Block(sizeof(ACE_PIP_Data_Message)); - ACE_Message_Block* body = new ACE_Message_Block(data_payload.length() + 1); - - ACE_OS::memcpy(body->wr_ptr(), data_payload.c_str(), data_payload.length() + 1); - body->wr_ptr(data_payload.length() + 1); - - // attach the data body to the header - header->next(body); - - // pack the header values into the message block - // set the write ptr ahead so pack() will know to put it back where it should be - header->wr_ptr(sizeof(ACE_PIP_Data_Message)); - data_message->block_ = header; - data_message->pack(); - - proto_message->set_next(data_message); - return proto_message; -} - -void ACE_PIP_Message_Handler::send_request(ACE_Message_Block* message, - ACE_UINT64 message_id, - ACE_Message_Block*& response) -{ - ACE_Future<ACE_Message_Block*>* response_holder(0); - ACE_PIP_Invocation_Manager::instance()->process_outbound_request(message, message_id, response_holder); - if (response_holder) - { - if (response_holder->get(response) == -1) - { - std::cerr << "Error receiving response in ::send_request" << std::endl; - response = 0; - } - } -} - -void ACE_PIP_Message_Handler::send_response(ACE_Message_Block* message, - ACE_UINT64 message_id) -{ - ACE_PIP_Invocation_Manager::instance()->process_outbound_response(message, message_id); -} - - -ACE_UINT32 ACE_PIP_Message_Handler::get_handler_id() const -{ - return handler_id_; -} -void ACE_PIP_Message_Handler::set_handler_id(ACE_UINT32 id) -{ - handler_id_ = id; -} - -ACE_UINT32 ACE_PIP_Message_Handler::get_site_id() const -{ - return site_id_; -} - -void ACE_PIP_Message_Handler::set_site_id(ACE_UINT32 id) -{ - site_id_ = id; -} - - diff --git a/ACE/ace/PIP_Message_Handler.h b/ACE/ace/PIP_Message_Handler.h deleted file mode 100644 index 2dec7cb628e..00000000000 --- a/ACE/ace/PIP_Message_Handler.h +++ /dev/null @@ -1,67 +0,0 @@ -// -*- C++ -*- - -//============================================================================= -/** - * @file PIP_Message_Handler.h - * - * - * @author John Moore - */ -//============================================================================= - - -#ifndef _PIP_MESSAGE_HANDLER_H_ -#define _PIP_MESSAGE_HANDLER_H_ - -#include "ace/Message_Block.h" -#include "ace/PIP_Messages.h" -#include "ace/Event_Handler.h" - -class ACE_Export ACE_PIP_Message_Handler -{ - public: - - ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 message_id); - ACE_PIP_Message_Handler(); - virtual ~ACE_PIP_Message_Handler(){} - - virtual void process_incoming_message(ACE_Message_Block* message, - ACE_UINT64 message_id) = 0; - - ACE_UINT32 get_handler_id() const; - void set_handler_id(ACE_UINT32 id); - - ACE_UINT32 get_site_id() const; - void set_site_id(ACE_UINT32 id); - - - - protected: - - ACE_UINT32 handler_id_; - ACE_UINT32 site_id_; - ACE_INET_Addr my_address_; - - // Pass a message to a remote handler - virtual void send_request(ACE_Message_Block* message, - ACE_UINT64 message_id, - ACE_Message_Block*& response); - - // Pass a response message to a remote handler - virtual void send_response(ACE_Message_Block* message, - ACE_UINT64 message_id); - - ACE_PIP_Protocol_Message* create_protocol_message(ACE_UINT64 message_id, - bool reply_expected, - ACE_UINT32 source_handler_id, - ACE_UINT32 source_site_id, - ACE_UINT32 destination_handler_id, - ACE_UINT32 destination_site_id, - ACE_UINT32 message_priority, - ACE_PIP_Protocol_Message::Message_Type type, - const std::string& data_payload); - -}; - -#endif - diff --git a/ACE/ace/PIP_Messages.cpp b/ACE/ace/PIP_Messages.cpp deleted file mode 100644 index 67fb886a3ec..00000000000 --- a/ACE/ace/PIP_Messages.cpp +++ /dev/null @@ -1,607 +0,0 @@ -// $Id$ - -#include "ace/OS_NS_stdlib.h" -#include "ace/OS_NS_string.h" -#include "ace/PIP_Messages.h" - -#include <iostream> - -ACE_PIP_Message::ACE_PIP_Message() - : block_(0) - , dirty_(false) - , next_(0) -{} - -ACE_PIP_Message::~ACE_PIP_Message() -{ - if (next_) - { - delete next_; - } - - if (block_) - { - block_->release(); - } -} - -void ACE_PIP_Message::set_block(ACE_Message_Block* block) -{ - // Remove the other block if it exist. - if (block_) - { - block_->release(); - } - - block_ = block; - - // Extract the values from the block. - unpack(); -} - -ACE_PIP_Data_Message::ACE_PIP_Data_Message() - : message_priority_(-1) - , reply_expected_(false) - , source_handler_ID_(-1) - , source_site_ID_(-1) - , destination_handler_ID_(-1) - , destination_site_ID_(-1) -{ -} - -int ACE_PIP_Data_Message::serialize(ACE_SOCK_Stream& stream) -{ - int total_bytes_sent(0); - - // Only serialize if there is a block. If not, - // there's nothing we can do but fail since we don't - // have enough information to create a block and unpack it. - if (block_) - { - if (dirty_) - { - pack(); - } - - ACE_Message_Block* curr_block = block_; - int bytes_sent(0); - while(curr_block) - { - bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); - if (bytes_sent > 0) - { - total_bytes_sent += bytes_sent; - curr_block = curr_block->next(); - } - else - { - std::cout << "Data_Mess:serialize: - didn't send any bytes" << std::endl; - total_bytes_sent = -1; - break; - } - } - } - else - { - std::cout << "DataMessage::Serialize - there is no block" << std::endl; - total_bytes_sent = -1; - } - - return total_bytes_sent; -} - -void ACE_PIP_Data_Message::pack() -{ - char* write_ptr = block_->wr_ptr(); - char* read_ptr = block_->rd_ptr(); - - block_->reset(); - - // Pack reply expected into buffer. - ACE_OS::memcpy(block_->wr_ptr(), &reply_expected_, sizeof(reply_expected_)); - block_->wr_ptr(sizeof(reply_expected_)); - - // Pack the message priority into the buffer. - ACE_OS::memcpy(block_->wr_ptr(), &message_priority_, sizeof(message_priority_)); - block_->wr_ptr(sizeof(message_priority_)); - - // Pack the destination handler ID into the buffer - ACE_OS::memcpy(block_->wr_ptr(), &destination_handler_ID_, sizeof(destination_handler_ID_)); - block_->wr_ptr(sizeof(destination_handler_ID_)); - - // Pack the source handler ID into the buffer - ACE_OS::memcpy(block_->wr_ptr(), &source_handler_ID_, sizeof(source_handler_ID_)); - block_->wr_ptr(sizeof(source_handler_ID_)); - - // Pack the destination site ID into the buffer - ACE_OS::memcpy(block_->wr_ptr(), &destination_site_ID_, sizeof(destination_site_ID_)); - block_->wr_ptr(sizeof(destination_site_ID_)); - - // Pack the source site ID into the buffer - ACE_OS::memcpy(block_->wr_ptr(), &source_site_ID_, sizeof(source_site_ID_)); - block_->wr_ptr(sizeof(source_site_ID_)); - - // Reset the buffer pointers to where they were so that the message length remains - // accurate. - block_->rd_ptr(read_ptr); - block_->wr_ptr(write_ptr); -} - -void ACE_PIP_Data_Message::unpack() -{ - if (block_) - { - char* write_ptr = block_->wr_ptr(); - block_->reset(); - - // reply_expected_ - ACE_OS::memcpy(&reply_expected_, block_->rd_ptr(), sizeof(reply_expected_)); - block_->rd_ptr(sizeof(reply_expected_)); - - // message priority - ACE_OS::memcpy(&message_priority_, block_->rd_ptr(), sizeof(message_priority_)); - block_->rd_ptr(sizeof(message_priority_)); - - // destination handler ID - ACE_OS::memcpy(&destination_handler_ID_, block_->rd_ptr(), sizeof(destination_handler_ID_)); - block_->rd_ptr(sizeof(destination_handler_ID_)); - - // source handler ID - ACE_OS::memcpy(&source_handler_ID_, block_->rd_ptr(), sizeof(source_handler_ID_)); - block_->rd_ptr(sizeof(source_handler_ID_)); - - // destination site ID - ACE_OS::memcpy(&destination_site_ID_, block_->rd_ptr(), sizeof(destination_site_ID_)); - block_->rd_ptr(sizeof(destination_site_ID_)); - - // source site ID - ACE_OS::memcpy(&source_site_ID_, block_->rd_ptr(), sizeof(source_site_ID_)); - block_->rd_ptr(sizeof(source_site_ID_)); - - block_->reset(); - block_->wr_ptr(write_ptr); - } - - dirty_ = false; -} - -void ACE_PIP_Data_Message::print() const -{ - std::cout << "Priority: " << message_priority_ << std::endl - << "Reply?: " << reply_expected_ << std::endl - << "Dest_Handler_ID: " << destination_handler_ID_ << std::endl - << "Source_Handler_ID: " << source_handler_ID_ << std::endl - << "Dest_Site_ID: " << destination_site_ID_ << std::endl - << "Source_Site_ID: " << source_site_ID_ << std::endl - << "Payload: " << block_->next()->base() << std::endl; -} - -ACE_PIP_Accel_Message::ACE_PIP_Accel_Message() - : ACCEL_HEADER_LENGTH_(2*sizeof(ACE_UINT32)) - , new_priority_(0) - , old_priority_(0) -{ -} - -int ACE_PIP_Accel_Message::serialize(ACE_SOCK_Stream& stream) -{ - pack(); - - int bytes_sent = stream.send_n(block_->rd_ptr(), block_->length()); - if (bytes_sent <= 0) - { - std::cout << "Accel:serial: didn't send any bytes" << std::endl; - } - - return bytes_sent; -} - -void ACE_PIP_Accel_Message::pack() -{ - if (!block_) - { - block_ = new ACE_Message_Block(ACCEL_HEADER_LENGTH_); - dirty_ = true; - } - - if (dirty_) - { - - // Set the buffer pointers to the start of the buffer to - // ensure we're writing to the correct location - block_->reset(); - - // Pack the contents of the struct into the message block - ACE_OS::memcpy(block_->wr_ptr(), &old_priority_, sizeof(old_priority_)); - block_->wr_ptr(sizeof(old_priority_)); - - ACE_OS::memcpy(block_->wr_ptr(), &new_priority_, sizeof(new_priority_)); - block_->wr_ptr(sizeof(new_priority_)); - - dirty_ = false; - } -} - -void ACE_PIP_Accel_Message::unpack() -{ - if (block_) - { - char* write_ptr = block_->wr_ptr(); - block_->reset(); - - old_priority_ = (*block_->rd_ptr()); - block_->rd_ptr(sizeof(old_priority_)); - - new_priority_ = (*block_->rd_ptr()); - block_->rd_ptr(sizeof (new_priority_)); - - // Reset the read and write pointers to their original location - // in the block. - block_->reset(); - block_->wr_ptr(write_ptr); - } - - dirty_ = false; -} - -ACE_PIP_Accel_Message* ACE_PIP_Accel_Message::copy() -{ - ACE_PIP_Accel_Message* copy = new ACE_PIP_Accel_Message; - - copy->new_priority_ = new_priority_; - copy->old_priority_ = old_priority_; - copy->pack(); - - return copy; -} - -void ACE_PIP_Accel_Message::print() const -{ - std::cout << "DestAddr: " << destination_address_ << std::endl - << "OldPriority: " << old_priority_ << std::endl - << "NewPriority: " << new_priority_ << std::endl; - -} - -ACE_PIP_Protocol_Message::ACE_PIP_Protocol_Message() - : message_type_(NONE) - , num_payload_blocks_(0) - , message_id_(0) - , FIXED_HEADER_LENGTH_(sizeof(Message_Type) + - sizeof(message_id_) + - sizeof(num_payload_blocks_)) -{ -} - -int ACE_PIP_Protocol_Message::serialize(ACE_SOCK_Stream& stream) -{ - int total_bytes_sent(0); - - pack(); - - ACE_Message_Block* curr_block = block_; - int bytes_sent(0); - - // Write each of the message blocks associated with this - // header into the stream - while(curr_block) - { - bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); - if (bytes_sent > 0) - { - total_bytes_sent += bytes_sent; - curr_block = curr_block->next(); - } - else - { - total_bytes_sent = -1; - break; - } - } - if ((total_bytes_sent > 0) && next_) - { - int next_sent = next_->serialize(stream); - if (next_sent > 0) - { - total_bytes_sent += next_sent; - } - else - { - total_bytes_sent = -1; - } - } - else - { - total_bytes_sent = -1; - } - - return total_bytes_sent; -} - -int ACE_PIP_Protocol_Message::deserialize(ACE_SOCK_Stream& stream) -{ - int total_bytes_received(-1); - - ACE_Message_Block* header_block = new ACE_Message_Block(FIXED_HEADER_LENGTH_); - ACE_Message_Block* lengths_block(0); - ACE_Message_Block* curr_payload_block(0); - ACE_Message_Block* payload_blocks(0); - - // Read the fixed-length portion of the protocol header. - int bytes_received = stream.recv_n(header_block->wr_ptr(), FIXED_HEADER_LENGTH_); - if (bytes_received == FIXED_HEADER_LENGTH_) - { - total_bytes_received = bytes_received; - - // Determine number of data message blocks in the payload. - header_block->rd_ptr(FIXED_HEADER_LENGTH_ - sizeof(num_payload_blocks_)); - ACE_OS::memcpy(&num_payload_blocks_, header_block->rd_ptr(), - sizeof(num_payload_blocks_)); - - header_block->reset(); - header_block->wr_ptr(bytes_received); - - // Extract the length of each payload block. - if (num_payload_blocks_ > 0) - { - // Read the lengths of each block. - int bytes_to_read = num_payload_blocks_ * sizeof(ACE_UINT32); - lengths_block = new ACE_Message_Block(bytes_to_read); - bytes_received = stream.recv_n(lengths_block->wr_ptr(), bytes_to_read); - - if (bytes_received == bytes_to_read) - { - total_bytes_received += bytes_received; - lengths_block->wr_ptr(bytes_received); - - // The lengths of each block have been successfully written, so - // unpack them. - header_block->next(lengths_block); - set_block(header_block); - - curr_payload_block = new ACE_Message_Block(payload_block_lengths_[0]); - payload_blocks = curr_payload_block; - unsigned int i = 0; - for (; i < num_payload_blocks_ && bytes_received != -1; ++i) - { - // Read the block. - bytes_received = stream.recv_n(curr_payload_block->wr_ptr(), - payload_block_lengths_[i]); - if (bytes_received > 0) - { - total_bytes_received += bytes_received; - curr_payload_block->wr_ptr(bytes_received); - if (i < (num_payload_blocks_ - 1)) - { - curr_payload_block->next( - new ACE_Message_Block(payload_block_lengths_[i + 1])); - - curr_payload_block = curr_payload_block->next(); - } - else - { - curr_payload_block->next(0); - } - - } - else - { - total_bytes_received = -1; - std::cout << "deserialize: didn't read enough bytes" << std::endl; - break; - } - } - } - else - { - total_bytes_received = -1; - - std::cout << "Deserialize: didnt read enought bytes" << std::endl; - } - } - } - else - { - total_bytes_received = -1; - std::cout << "Deserialize:didn't receive enought bytes: got " << bytes_received << std::endl; - } - - if (total_bytes_received > 0) - { - if (message_type_ == ACCEL) - { - next_ = new ACE_PIP_Accel_Message; - } - else - { - next_ = new ACE_PIP_Data_Message; - } - - // Pass the payload blocks to the next message struct - // so it can unpack it. - next_->set_block(payload_blocks); - } - else if (block_) - { - // Something failed during reading, so cleanup any allocated memory. - block_->release(); - } - - return total_bytes_received; -} - -void ACE_PIP_Protocol_Message::set_next(ACE_PIP_Message* next) -{ - // Determine the number and length of payload blocks. - payload_block_lengths_.clear(); - num_payload_blocks_ = 0; - next->pack(); - ACE_Message_Block* curr_block = next->get_block(); - while (curr_block) - { - ++num_payload_blocks_; - payload_block_lengths_.push_back(curr_block->length()); - curr_block = curr_block->next(); - } - - next_ = next; - dirty_ = true; -} - -void ACE_PIP_Protocol_Message::process_message_payload(ACE_Message_Block* payload) -{ - payload_block_lengths_.clear(); - num_payload_blocks_ = 0; - - // Determine the length and number of payload blocks. - ACE_Message_Block* curr_block = payload; - while (curr_block) - { - ++num_payload_blocks_; - payload_block_lengths_.push_back(curr_block->length()); - curr_block = curr_block->next(); - } - - if (!next_) - { - if (message_type_ == ACCEL) - { - next_ = new ACE_PIP_Accel_Message; - } - else - { - next_ = new ACE_PIP_Data_Message; - } - } - - next_->set_block(payload); - dirty_ = true; -} - -void ACE_PIP_Protocol_Message::pack() -{ - int total_bytes_sent(0); - if (!block_) - { - // Create the message buffer for the protocol header. - block_ = new ACE_Message_Block(FIXED_HEADER_LENGTH_); - - // Create the message buffer for the list of payload block lengths. - block_->next(new ACE_Message_Block(num_payload_blocks_ * sizeof(ACE_UINT32))); - block_->next()->next(0); - dirty_ = true; - } - if (dirty_) - { - // Set the buffer pointers to the start of the buffer - // so that we write to the appropriate location. - block_->reset(); - - // pack the process Id. - ACE_OS::memcpy(block_->wr_ptr(), &message_id_, sizeof(message_id_)); - block_->wr_ptr(sizeof (message_id_)); - - // Pack the message type. - ACE_OS::memcpy(block_->wr_ptr(), &message_type_, sizeof(message_type_)); - block_->wr_ptr(sizeof(message_type_)); - - // Number of blocks in payload. - ACE_OS::memcpy(block_->wr_ptr(), &num_payload_blocks_, sizeof(num_payload_blocks_)); - block_->wr_ptr(sizeof(num_payload_blocks_)); - - ACE_Message_Block* next_block = block_->next(); - if (next_block) - { - next_block->reset(); - - // Write the block lengths into the message block. - for (unsigned int i = 0; i < num_payload_blocks_; ++i) - { - ACE_OS::memcpy(next_block->wr_ptr(), - &payload_block_lengths_[i], - sizeof(ACE_UINT32)); - - next_block->wr_ptr(sizeof(ACE_UINT32)); - } - } - - dirty_ = false; - } -} - -void ACE_PIP_Protocol_Message::unpack() -{ - if (block_) - { - char* write_ptr = block_->wr_ptr(); - // char* read_ptr = block_->rd_ptr(); - block_->reset(); - - // Extract the process ID. - ACE_OS::memcpy(&message_id_, block_->rd_ptr(), sizeof(message_id_)); - block_->rd_ptr(sizeof (message_id_)); - - // Extract the message type. - ACE_OS::memcpy(&message_type_, block_->rd_ptr(), sizeof(message_type_)); - block_->rd_ptr(sizeof(message_type_)); - - // Number of blocks in payload. - ACE_OS::memcpy(&num_payload_blocks_, block_->rd_ptr(), - sizeof(num_payload_blocks_)); - - block_->rd_ptr(sizeof(num_payload_blocks_)); - - // Reset buffer pointers to be where they were prior to unpacking. - block_->reset(); - block_->wr_ptr(write_ptr); - - // The next block holds the lengths of each payload block. - ACE_Message_Block* next_block = block_->next(); - if (next_block) - { - write_ptr = next_block->wr_ptr(); - next_block->reset(); - payload_block_lengths_.resize(num_payload_blocks_, 0); - ACE_UINT32 block_length(0); - - // Extract the lengths of each payload block, which will - // be used to recreate the structure of the original payload. - for (ACE_UINT32 i = 0; i < num_payload_blocks_; ++i) - { - ACE_OS::memcpy(&block_length, next_block->rd_ptr(), sizeof(block_length)); - next_block->rd_ptr(sizeof(block_length)); - payload_block_lengths_[i] = block_length; - } - - // Reset the buffer pointers to where they were prior to unpacking. - next_block->reset(); - next_block->wr_ptr(write_ptr); - } - } - - dirty_ = false; -} - -ACE_PIP_Protocol_Message* ACE_PIP_Protocol_Message::copy() -{ - ACE_PIP_Protocol_Message* message_copy = new ACE_PIP_Protocol_Message; - message_copy->message_type_ = message_type_; - message_copy->num_payload_blocks_ = num_payload_blocks_; - for (ACE_UINT32 block_index = 0; block_index < num_payload_blocks_; ++block_index) - { - message_copy->payload_block_lengths_[block_index] = payload_block_lengths_[block_index]; - } -} - -void ACE_PIP_Protocol_Message::print() const -{ - std::cout << "Type: " << message_type_ << std::endl - << "MessageID: " << message_id_ << std::endl - << "NumPayload: " << num_payload_blocks_ << std::endl; - - for (unsigned int i = 0; i < num_payload_blocks_; ++i) - { - std::cout << "BlockLength[" << i << "] = " << payload_block_lengths_[i] << std::endl; - } -} - diff --git a/ACE/ace/PIP_Messages.h b/ACE/ace/PIP_Messages.h deleted file mode 100644 index 683bedb1ff9..00000000000 --- a/ACE/ace/PIP_Messages.h +++ /dev/null @@ -1,446 +0,0 @@ - /** - * @file PIP_Messages - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a heirarchy of - * classes that represent the various messages used in the - * priority inheritance protocol -*/ - -#ifndef _PIP_MESSAGES_H_ -#define _PIP_MESSAGES_H_ - -#include "ace/Message_Block.h" -#include "ace/SOCK_Stream.h" -#include "ace/Vector_T.h" - -#include <iostream> - -/** - * @class ACE_PIP_Message - * @brief Base class for all messages used in - * the implementation of a distributed priority inheritance - * protocol. - * - * Base class for all messages used in the implementation of a distributed - * priority inheritance protocol. Provides an interface for message (de)serialization, - * message chaining, packing, unpacking, and payload ownership transfer - */ -class ACE_Export ACE_PIP_Message -{ - public: - - ACE_PIP_Message(); - virtual ~ACE_PIP_Message(); - - /// Send the contents of this message over the stream. - virtual int serialize(ACE_SOCK_Stream& stream) = 0; - - /// Get the next message struct. - virtual ACE_PIP_Message* get_next(); - - /// Set the next message struct. - virtual void set_next(ACE_PIP_Message* next); - - /// Returns the next message, making the caller - /// the new owner. - virtual ACE_PIP_Message* release_next(); - - /// Get the message block. - virtual ACE_Message_Block* get_block(); - - /// Set the message block and populate the message struct - /// with message contents. - virtual void set_block(ACE_Message_Block* block); - - /// Get the message block, making the caller the new owner. - virtual ACE_Message_Block* release_block(); - - /// Place the values in the message struct into the message block. - virtual void pack() = 0; - - /// Populate the message struct using values from the message block. - virtual void unpack() = 0; - - /// This is temporarily public to facilitate testing. - /// It should eventually be made private. - ACE_Message_Block* block_; - - /// Print the contents of this struct to stdout. - virtual void print() const = 0; - - - protected: - - // Indicates values in structure are newer than values in the - // message block. - bool dirty_; - - ACE_PIP_Message* next_; -}; - -/** - * @class ACE_PIP_Data_Message - * @brief Structure representing the fields of an application- - * level protocol message and associated header values - * - * Structure representing the fields of an appliation level - * protocol message and associated header values. Structure is that - * of several contiguous ACE_Message_Block's. The message is configurable - * to support any application-level protocol that contains at least the following - * data: source address, destination address, reply expectation, and priority - * -*/ -class ACE_Export ACE_PIP_Data_Message : public ACE_PIP_Message -{ - public: - - ACE_PIP_Data_Message(); - virtual ~ACE_PIP_Data_Message(){} - - /// Send the contents of this message over the stream. - virtual int serialize(ACE_SOCK_Stream& stream); - - /// Determine if a reply message is expected - bool get_reply_expected() const; - void set_reply_expected(bool expected); - - /// Determine the priority at which this message should be handled - ACE_UINT32 get_message_priority() const; - void set_message_priority(ACE_UINT32 priority); - - /// Determine the ID of the destination handler - ACE_UINT32 get_destination_handler_ID() const; - void set_destination_handler_ID(ACE_UINT32 ID); - - /// Determine the ID of the sending handler - ACE_UINT32 get_source_handler_ID() const; - void set_source_handler_ID(ACE_UINT32 ID); - - /// Determine the ID of the destination site - ACE_UINT32 get_destination_site_ID() const; - void set_destination_site_ID(ACE_UINT32 ID); - - /// Determine the ID of the sending site - ACE_UINT32 get_source_site_ID() const; - void set_source_site_ID(ACE_UINT32 ID); - - // Place the values from the struct into the message blocks. - virtual void pack(); - - // Extract the values from the message blocks into the structs. - virtual void unpack(); - - /// Print the contents of this struct to stdout. - virtual void print() const; - - private: - - ACE_UINT32 message_priority_; - bool reply_expected_; - ACE_UINT32 source_handler_ID_; - ACE_UINT32 destination_handler_ID_; - ACE_UINT32 source_site_ID_; - ACE_UINT32 destination_site_ID_; -}; - -/** - * @class ACE_PIP_Protocol_Message - * @brief Structure representing a message supported by the priority - * inheritance protocol - * -*/ - -class ACE_Export ACE_PIP_Accel_Message : public ACE_PIP_Message -{ - public: - - ACE_PIP_Accel_Message(); - virtual ~ACE_PIP_Accel_Message(){} - - /// Send the contents of this message over the stream. - virtual int serialize(ACE_SOCK_Stream& stream); - - ACE_UINT32 get_old_priority() const; - void set_old_priority(ACE_UINT32 priority); - - ACE_UINT32 get_new_priority() const; - void set_new_priority(ACE_UINT32 priority); - - /// Get the address of the application receiving the message. - ACE_UINT32 get_destination_address() const; - void set_destination_address(const ACE_UINT32& address); - - u_short get_destination_port() const; - void set_destination_port(u_short port); - - /// Place the values in the message struct into the message block. - virtual void pack(); - - /// Extract the values from the message block and store them in the struct. - virtual void unpack(); - - /// Print the contents of this struct to stdout. - virtual void print() const; - - /// Return a copy of the this message - ACE_PIP_Accel_Message* copy(); - - private: - - const ACE_UINT32 ACCEL_HEADER_LENGTH_; - ACE_UINT32 destination_address_; - u_short destination_port_; - ACE_UINT32 new_priority_; - ACE_UINT32 old_priority_; -}; - -/** - * @class ACE_PIP_Accel_Message - * @brief Structure representing an acceleration message - * used in the implementation of a priority inheritance protocol - * - * Structure representing an acceleration message used in the - * implementation of a priority inheritance protocol. Indicates the - * old and new priority of the targeted process, as well as the address - * of handler to which the associated message was sent. -*/ -class ACE_Export ACE_PIP_Protocol_Message : public ACE_PIP_Message -{ - public: - - enum Message_Type { NONE, ACCEL, DATA, REQUEST, RESPONSE }; - - ACE_PIP_Protocol_Message(); - virtual ~ACE_PIP_Protocol_Message(){} - - /// Send the contents of this message over the stream. - virtual int serialize(ACE_SOCK_Stream& stream); - - /// Receive the contents of this message from the stream. - virtual int deserialize(ACE_SOCK_Stream& stream); - - /// Set the next message in the chain. - virtual void set_next(ACE_PIP_Message* next); - - /// Determine the type of message this header has been tacked onto. - Message_Type get_message_type() const; - void set_message_type(Message_Type type); - - /// Determine which call chain this message is associated with. - ACE_UINT64 get_message_id() const; - void set_message_id(ACE_UINT64 id); - - /// Attach message block as payload of priority inheritance - /// protocol message. - void process_message_payload(ACE_Message_Block* payload); - - virtual void pack(); - virtual void unpack(); - - /// Print the contents of this struct to stdout. - virtual void print() const; - - /// Make a copy of the header of this message, i.e. without - /// data or accel payload - ACE_PIP_Protocol_Message* copy(); - - const int FIXED_HEADER_LENGTH_; - - private: - - Message_Type message_type_; - ACE_UINT32 num_payload_blocks_; - ACE_Vector<ACE_UINT32> payload_block_lengths_; - ACE_UINT64 message_id_; -}; - - -/************************************************** - * - * ACE_PIP_Message - Inline Methods - * - **************************************************/ -inline ACE_PIP_Message* ACE_PIP_Message::get_next() -{ - return next_; -} - -inline void ACE_PIP_Message::set_next(ACE_PIP_Message* message) -{ - next_ = message; -} - -inline ACE_PIP_Message* ACE_PIP_Message::release_next() -{ - ACE_PIP_Message* temp = next_; - next_ = 0; - return temp; -} - -inline ACE_Message_Block* ACE_PIP_Message::get_block() -{ - return block_; -} - -inline ACE_Message_Block* ACE_PIP_Message::release_block() -{ - ACE_Message_Block* temp_block = block_; - block_ = 0; - dirty_ = true; - return temp_block; -} - -/************************************************** - * - * ACE_PIP_Data_Message - Inline Methods - * - **************************************************/ - -inline bool ACE_PIP_Data_Message::get_reply_expected() const -{ - return reply_expected_; -} - -inline void ACE_PIP_Data_Message::set_reply_expected(bool expected) -{ - dirty_ = true; - reply_expected_ = expected; -} - -inline ACE_UINT32 ACE_PIP_Data_Message::get_message_priority() const -{ - return message_priority_; -} - -inline void ACE_PIP_Data_Message::set_message_priority(ACE_UINT32 priority) -{ - dirty_ = true; - message_priority_ = priority; -} - -inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_handler_ID() const -{ - return destination_handler_ID_; -} - -inline void ACE_PIP_Data_Message::set_destination_handler_ID(ACE_UINT32 ID) -{ - destination_handler_ID_ = ID; - dirty_ = true; -} - -inline ACE_UINT32 ACE_PIP_Data_Message::get_source_handler_ID() const -{ - return source_handler_ID_; -} - -inline void ACE_PIP_Data_Message::set_source_handler_ID(ACE_UINT32 ID) -{ - source_handler_ID_ = ID; -} - -inline ACE_UINT32 ACE_PIP_Data_Message::get_source_site_ID() const -{ - return source_site_ID_; -} - -inline void ACE_PIP_Data_Message::set_source_site_ID(ACE_UINT32 ID) -{ - source_site_ID_ = ID; -} - -inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_site_ID() const -{ - return destination_site_ID_; -} - -inline void ACE_PIP_Data_Message::set_destination_site_ID(ACE_UINT32 ID) -{ - destination_site_ID_ = ID; -} - -/************************************************** - * - * ACE_PIP_Accel_Message - Inline Methods - * - **************************************************/ - -inline ACE_UINT32 ACE_PIP_Accel_Message::get_old_priority() const -{ - return old_priority_; -} - -inline void ACE_PIP_Accel_Message::set_old_priority(ACE_UINT32 priority) -{ - dirty_ = true; - old_priority_ = priority; -} - -inline ACE_UINT32 ACE_PIP_Accel_Message::get_new_priority() const -{ - return new_priority_; -} - -inline void ACE_PIP_Accel_Message::set_new_priority(ACE_UINT32 priority) -{ - dirty_ = true; - new_priority_ = priority; -} - -inline ACE_UINT32 ACE_PIP_Accel_Message::get_destination_address() const -{ - return destination_address_; -} - -inline void ACE_PIP_Accel_Message::set_destination_address(const ACE_UINT32& address) -{ - dirty_ = true; - destination_address_ = address; -} - -inline u_short ACE_PIP_Accel_Message::get_destination_port() const -{ - return destination_port_; -} - -inline void ACE_PIP_Accel_Message::set_destination_port(u_short port) -{ - destination_port_ = port; -} - -/************************************************** - * - * ACE_PIP_Protocol_Message - Inline Methods - * - **************************************************/ - -inline void ACE_PIP_Protocol_Message:: - set_message_type(ACE_PIP_Protocol_Message::Message_Type type) -{ - message_type_ = type; - dirty_ = true; -} - -inline ACE_PIP_Protocol_Message::Message_Type ACE_PIP_Protocol_Message:: - get_message_type() const -{ - return message_type_; -} - -inline ACE_UINT64 ACE_PIP_Protocol_Message::get_message_id() const -{ - return message_id_; -} - -inline void ACE_PIP_Protocol_Message::set_message_id(ACE_UINT64 id) -{ - dirty_ = true; - message_id_ = id; -} - -#endif - diff --git a/ACE/ace/PIP_Reactive_IO_Handler.cpp b/ACE/ace/PIP_Reactive_IO_Handler.cpp deleted file mode 100644 index 925be857608..00000000000 --- a/ACE/ace/PIP_Reactive_IO_Handler.cpp +++ /dev/null @@ -1,64 +0,0 @@ -// $Id$ - -#include "ace/OS_NS_sys_time.h" -#include "ace/PIP_Reactive_IO_Handler.h" -#include "ace/PIP_Invocation_Manager.h" - -/// Constructor -ACE_PIP_Reactive_IO_Handler::ACE_PIP_Reactive_IO_Handler() -{ -} - -ACE_PIP_Reactive_IO_Handler::~ACE_PIP_Reactive_IO_Handler() -{ -} - -/// Closes all remote connections. -int ACE_PIP_Reactive_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) -{ - int result(0); - switch(close_mask) - { - case ACE_Event_Handler::READ_MASK: - read_closed_ = true; - break; - case ACE_Event_Handler::WRITE_MASK: - write_closed_ = true; - break; - }; - - if (read_closed_ && write_closed_) - { - // Close our end of the connection - peer_.close_reader(); - peer_.close_writer(); - - // un-register with invocation manager so it doesn't - // try to use the handler for IO - ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); - - delete this; - return -1; - } - - return 0; -} - - -/// Enqueue a message to be sent -int ACE_PIP_Reactive_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) -{ - big_lock_.acquire(); - outgoing_message_queue_.enqueue_head(message); - big_lock_.release(); - - // Register so Reactor tells us to send the message - ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::WRITE_MASK); - ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK); - - return 0; -} - - - - diff --git a/ACE/ace/PIP_Reactive_IO_Handler.h b/ACE/ace/PIP_Reactive_IO_Handler.h deleted file mode 100644 index ae50ebf9b27..00000000000 --- a/ACE/ace/PIP_Reactive_IO_Handler.h +++ /dev/null @@ -1,54 +0,0 @@ - /** - * @file PIP_IO_Handler.cpp - * - * // $Id$ - * - * @author John Moore <ljohn7@gmail.com> - * - * This file contains the specification for a class - * that manages network I/O -*/ - - -#ifndef _PIP_REACTIVE_IO_HANDLER_H_ -#define _PIP_REACTIVE_IO_HANDLER_H_ - - -#include "ace/Message_Queue.h" -#include "ace/PIP_IO_Handler.h" -#include "ace/PIP_Messages.h" - -/** - * @class ACE_PIP_Reactive_IO_Handler - * - * @brief Performs reactive network I/O in - * the context of a distributed system - * employing the the priority inheritance - * protocol - * - * @author John Moore <ljohn7@gmail.com> - */ -class ACE_Export ACE_PIP_Reactive_IO_Handler : - public ACE_PIP_IO_Handler -{ - public: - - /// Constructor - ACE_PIP_Reactive_IO_Handler (); - ~ACE_PIP_Reactive_IO_Handler(); - - /// Closes all remote connections. - virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); - - /// Enqueue a message to be sent - virtual int put_message (ACE_PIP_Protocol_Message* message); - - private: - -}; - - - -#endif /* _PIP_Reactive_IO_Handler_H_ */ - - diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc deleted file mode 100644 index 4a4ef0d66f3..00000000000 --- a/ACE/ace/ace.mpc +++ /dev/null @@ -1,493 +0,0 @@ -// -*- MPC -*- now wouldn't this be cool... -// $Id$ - -project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, filecache, versioned_namespace, pkgconfig, support_ostream { - avoids = ace_for_tao - libout = $(ACE_ROOT)/lib - sharedname = ACE - dynamicflags = ACE_BUILD_DLL - - Source_Files(ACE_COMPONENTS) { - ACE.cpp - ACE_crc_ccitt.cpp - ACE_crc32.cpp - ace_wchar.cpp - Activation_Queue.cpp - Active_Map_Manager.cpp - Addr.cpp - Argv_Type_Converter.cpp - Assert.cpp - Asynch_IO.cpp - Asynch_IO_Impl.cpp - Asynch_Pseudo_Task.cpp - ATM_Acceptor.cpp - ATM_Addr.cpp - ATM_Connector.cpp - ATM_Params.cpp - ATM_QoS.cpp - ATM_Stream.cpp - Atomic_Op.cpp - Atomic_Op_Sparc.c - Auto_Event.cpp - Barrier.cpp - Base_Thread_Adapter.cpp - Based_Pointer_Repository.cpp - Basic_P_Strategy.cpp - Basic_Stats.cpp - Basic_Types.cpp - Capabilities.cpp - CDR_Base.cpp - CDR_Stream.cpp - CDR_Size.cpp - Cleanup.cpp - Codeset_IBM1047.cpp - Codeset_Registry.cpp - Codeset_Registry_db.cpp - Condition_Recursive_Thread_Mutex.cpp - Condition_Thread_Mutex.cpp - Configuration.cpp - Configuration_Import_Export.cpp - Connection_Recycling_Strategy.cpp - Containers.cpp - Copy_Disabled.cpp - Countdown_Time.cpp - Date_Time.cpp - DA_Strategy_Base.cpp - DEV.cpp - DEV_Addr.cpp - DEV_Connector.cpp - DEV_IO.cpp - DLL_Manager.cpp - Dev_Poll_Reactor.cpp - Dirent.cpp - Dirent_Selector.cpp - Dump.cpp - Dynamic.cpp - Event.cpp - Event_Handler.cpp - FIFO.cpp - FIFO_Recv.cpp - FIFO_Recv_Msg.cpp - FIFO_Send.cpp - FIFO_Send_Msg.cpp - FILE.cpp - FILE_Addr.cpp - FILE_Connector.cpp - FILE_IO.cpp - File_Lock.cpp - Flag_Manip.cpp - Framework_Component.cpp - Functor.cpp - Functor_String.cpp - Get_Opt.cpp - gethrtime.cpp - Handle_Ops.cpp - Handle_Set.cpp - Hashable.cpp - High_Res_Timer.cpp - ICMP_Socket.cpp - INET_Addr.cpp - Init_ACE.cpp - IO_SAP.cpp - IO_Cntl_Msg.cpp - IOStream.cpp - IPC_SAP.cpp - k_Efficient_P_Strategy.cpp - Lib_Find.cpp - Live_P_Strategy.cpp - Local_Memory_Pool.cpp - Lock.cpp - Log_Msg.cpp - Log_Msg_Backend.cpp - Log_Msg_Callback.cpp - Log_Msg_IPC.cpp - Log_Msg_NT_Event_Log.cpp - Log_Msg_UNIX_Syslog.cpp - Log_Record.cpp - Logging_Strategy.cpp - LSOCK.cpp - LSOCK_Acceptor.cpp - LSOCK_CODgram.cpp - LSOCK_Connector.cpp - LSOCK_Dgram.cpp - LSOCK_Stream.cpp - Malloc.cpp - Malloc_Allocator.cpp - Manual_Event.cpp - MEM_Acceptor.cpp - MEM_Addr.cpp - MEM_Connector.cpp - MEM_IO.cpp - Mem_Map.cpp - MEM_SAP.cpp - MEM_Stream.cpp - Message_Block.cpp - Message_Queue.cpp - Message_Queue_Vx.cpp - Method_Request.cpp - MMAP_Memory_Pool.cpp - Msg_WFMO_Reactor.cpp - MT_Priority_Reactor.cpp - Multihomed_INET_Addr.cpp - Mutex.cpp - Netlink_Addr.cpp - Notification_Strategy.cpp - Notification_Queue.cpp - Obchunk.cpp - Object_Manager.cpp - Object_Manager_Base.cpp - OS_Errno.cpp - OS_Log_Msg_Attributes.cpp - OS_main.cpp - OS_NS_arpa_inet.cpp - OS_NS_ctype.cpp - OS_NS_dirent.cpp - OS_NS_dlfcn.cpp - OS_NS_errno.cpp - OS_NS_fcntl.cpp - OS_NS_math.cpp - OS_NS_netdb.cpp - OS_NS_poll.cpp - OS_NS_pwd.cpp - OS_NS_regex.cpp - OS_NS_signal.cpp - OS_NS_stdio.cpp - OS_NS_stdlib.cpp - OS_NS_string.cpp - OS_NS_strings.cpp - OS_NS_stropts.cpp - OS_NS_sys_mman.cpp - OS_NS_sys_msg.cpp - OS_NS_sys_resource.cpp - OS_NS_sys_select.cpp - OS_NS_sys_sendfile.cpp - OS_NS_sys_shm.cpp - OS_NS_sys_socket.cpp - OS_NS_sys_stat.cpp - OS_NS_sys_time.cpp - OS_NS_sys_uio.cpp - OS_NS_sys_utsname.cpp - OS_NS_sys_wait.cpp - OS_NS_Thread.cpp - OS_NS_time.cpp - OS_NS_unistd.cpp - OS_NS_wchar.cpp - OS_QoS.cpp - OS_Thread_Adapter.cpp - OS_TLI.cpp - Pagefile_Memory_Pool.cpp - Parse_Node.cpp - PI_Malloc.cpp - Ping_Socket.cpp - Pipe.cpp - PIP_Active_IO_Handler.cpp - PIP_Connection_Manager.cpp - PIP_DA_Strategy_Adapter.cpp - PIP_Dispatcher.cpp - PIP_Invocation_Manager.cpp - PIP_IO_Handler.cpp - PIP_Messages.cpp - PIP_Message_Handler.cpp - PIP_Reactive_IO_Handler.cpp - POSIX_Asynch_IO.cpp - POSIX_CB_Proactor.cpp - POSIX_Proactor.cpp - Priority_Reactor.cpp - Proactor.cpp - Proactor_Impl.cpp - Process.cpp - Process_Manager.cpp - Process_Mutex.cpp - Process_Semaphore.cpp - Profile_Timer.cpp - Reactor.cpp - Reactor_Impl.cpp - Reactor_Notification_Strategy.cpp - Reactor_Timer_Interface.cpp - Read_Buffer.cpp - Recursive_Thread_Mutex.cpp - Recyclable.cpp - Refcountable.cpp - Registry.cpp - Rtems_init.c - RW_Mutex.cpp - RW_Process_Mutex.cpp - RW_Thread_Mutex.cpp - Sample_History.cpp - Sbrk_Memory_Pool.cpp - Sched_Params.cpp - Select_Reactor_Base.cpp - Semaphore.cpp - Shared_Memory.cpp - Shared_Memory_MM.cpp - Shared_Memory_Pool.cpp - Shared_Memory_SV.cpp - Sig_Adapter.cpp - Sig_Handler.cpp - Signal.cpp - SOCK.cpp - SOCK_Acceptor.cpp - SOCK_CODgram.cpp - Sock_Connect.cpp - SOCK_Connector.cpp - SOCK_Dgram.cpp - SOCK_Dgram_Bcast.cpp - SOCK_Dgram_Mcast.cpp - SOCK_IO.cpp - SOCK_Netlink.cpp - SOCK_SEQPACK_Acceptor.cpp - SOCK_SEQPACK_Association.cpp - SOCK_SEQPACK_Connector.cpp - SOCK_Stream.cpp - SPIPE.cpp - SPIPE_Acceptor.cpp - SPIPE_Addr.cpp - SPIPE_Connector.cpp - SPIPE_Stream.cpp - SString.cpp - Stats.cpp - String_Base_Const.cpp - SUN_Proactor.cpp - SV_Message.cpp - SV_Message_Queue.cpp - SV_Semaphore_Complex.cpp - SV_Semaphore_Simple.cpp - SV_Shared_Memory.cpp - Synch_Options.cpp - System_Time.cpp - Task.cpp - Thread.cpp - Thread_Adapter.cpp - Thread_Control.cpp - Thread_Exit.cpp - Thread_Hook.cpp - Thread_Manager.cpp - Thread_Mutex.cpp - Thread_Semaphore.cpp - Throughput_Stats.cpp - Time_Value.cpp - Timeprobe.cpp - Timer_Hash.cpp - Timer_Heap.cpp - Timer_List.cpp - Timer_Queue.cpp - Timer_Wheel.cpp - TLI.cpp - TLI_Acceptor.cpp - TLI_Connector.cpp - TLI_Stream.cpp - Token.cpp - TP_Reactor.cpp - Trace.cpp - TSS_Adapter.cpp - TTY_IO.cpp - UNIX_Addr.cpp - UPIPE_Acceptor.cpp - UPIPE_Connector.cpp - UPIPE_Stream.cpp - WFMO_Reactor.cpp - WIN32_Asynch_IO.cpp - WIN32_Proactor.cpp - XTI_ATM_Mcast.cpp - } - - Template_Files { - Acceptor.cpp - Active_Map_Manager_T.cpp - ARGV.cpp - Arg_Shifter.cpp - Array_Base.cpp - Array_Map.cpp - Asynch_Acceptor.cpp - Asynch_Connector.cpp - Atomic_Op_T.cpp - Auto_Functor.cpp - Auto_IncDec_T.cpp - Auto_Ptr.cpp - Based_Pointer_T.cpp - Cache_Map_Manager_T.cpp - Cached_Connect_Strategy_T.cpp - Caching_Strategies_T.cpp - Caching_Utility_T.cpp - Cleanup_Strategies_T.cpp - Condition_T.cpp - Connector.cpp - Containers_T.cpp - Dump_T.cpp - Dynamic_Service.cpp - Env_Value_T.cpp - Event_Handler_T.cpp - Framework_Component_T.cpp - Free_List.cpp - Functor_T.cpp - Future.cpp - Future_Set.cpp - Guard_T.cpp - Hash_Cache_Map_Manager_T.cpp - Hash_Map_Manager_T.cpp - Hash_Multi_Map_Manager_T.cpp - Hash_Map_With_Allocator_T.cpp - IOStream_T.cpp - Intrusive_List.cpp - Intrusive_List_Node.cpp - LOCK_SOCK_Acceptor.cpp - Local_Name_Space_T.cpp - Lock_Adapter_T.cpp - Malloc_T.cpp - Managed_Object.cpp - Map_Manager.cpp - Map_T.cpp - Message_Block_T.cpp - Message_Queue_T.cpp - Module.cpp - Node.cpp - Obstack_T.cpp - Pair_T.cpp - RB_Tree.cpp - Reactor_Token_T.cpp - Refcounted_Auto_Ptr.cpp - Reverse_Lock_T.cpp - Select_Reactor_T.cpp - Singleton.cpp - Strategies_T.cpp - Stream.cpp - Stream_Modules.cpp - String_Base.cpp - Svc_Handler.cpp - Synch_T.cpp - TSS_T.cpp - Task_Ex_T.cpp - Task_T.cpp - Test_and_Set.cpp - Timeprobe_T.cpp - Timer_Hash_T.cpp - Timer_Heap_T.cpp - Timer_List_T.cpp - Timer_Queue_Adapters.cpp - Timer_Queue_T.cpp - Timer_Wheel_T.cpp - Typed_SV_Message.cpp - Typed_SV_Message_Queue.cpp - Unbounded_Queue.cpp - Unbounded_Set.cpp - Vector_T.cpp - } - - Inline_Files { - Bound_Ptr.inl - Condition_T.inl - Guard_T.inl - Handle_Gobbler.inl - Lock_Adapter_T.inl - Refcounted_Auto_Ptr.inl - Reverse_Lock_T.inl - TSS_T.inl - ace_wchar.inl - OS.inl - } - - Header_Files { - ACE_export.h - Array.h - Basic_P_Strategy.h - Bound_Ptr.h - CORBA_macros.h - Condition_T.h - DA_Strategy_Base.h - Default_Constants.h - Exception_Macros.h - Global_Macros.h - Guard_T.h - Hash_Map_Manager.h - Handle_Gobbler.h - If_Then_Else.h - IO_Cntl_Msg.h - k_Efficient_P_Strategy.h - Lock_Adapter_T.h - Log_Priority.h - Malloc_Base.h - Method_Object.h - Memory_Pool.h - Min_Max.h - MT_Priority_Reactor.h - Netlink_Addr.h - Null_Barrier.h - Null_Condition.h - Null_Mutex.h - Null_Semaphore.h - Numeric_Limits.h - OS.h - OS_Dirent.h - OS_Memory.h - OS_NS_macros.h - OS_String.h - OS_Thread_Adapter.h - Object_Manager_Base.h - Pair.h - PIP_Active_IO_Handler.h - PIP_Connection_Manager.h - PIP_DA_Strategy_Adapter.h - PIP_Dispatcher.h - PIP_Invocation_Manager.h - PIP_IO_Handler.h - PIP_Messages.h - PIP_Message_Handler.h - PIP_Reactive_IO_Handler.h - Proactor_Impl.h - Reactor_Impl.h - Reactor_Timer_Interface.h - Refcounted_Auto_Ptr.h - Reverse_Lock_T.h - SOCK_Netlink.h - SStringfwd.h - Static_Object_Lock.h - Strategies.h - String_Base_Const.h - Svc_Conf.h - Svc_Conf_Tokens.h - Synch.h - Synch_Traits.h - TSS_T.h - Timer_Queuefwd.h - Truncate.h - UPIPE_Addr.h - Value_Ptr.h - Version.h - Versioned_Namespace.h - ace_wchar.h - checked_iterator.h - config-WinCE.h - config-all.h - config-borland-common.h - config-lite.h - config-macros.h - config-minimal.h - config-win32-borland.h - config-win32-common.h - config-win32-ghs.h - config-win32-msvc-7.h - config-win32-msvc-8.h - config-win32-msvc.h - config-win32.h - config.h - iosfwd.h - os_include - os_include/arpa - os_include/net - os_include/netinet - os_include/sys - post.h - pre.h - streams.h - svc_export.h - } - - Documentation_Files { - README - ../VERSION - } - - Pkgconfig_Files { - ACE.pc.in - } -} diff --git a/ACE/ace/k_Efficient_P_Strategy.cpp b/ACE/ace/k_Efficient_P_Strategy.cpp deleted file mode 100644 index f7459611efb..00000000000 --- a/ACE/ace/k_Efficient_P_Strategy.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "ace/k_Efficient_P_Strategy.h" - -#if !defined (__ACE_INLINE__) -//#include "ace/k_Efficient_P_Strategy.inl" -#endif /* __ACE_INLINE__ */
\ No newline at end of file diff --git a/ACE/ace/k_Efficient_P_Strategy.h b/ACE/ace/k_Efficient_P_Strategy.h deleted file mode 100644 index 846c234323d..00000000000 --- a/ACE/ace/k_Efficient_P_Strategy.h +++ /dev/null @@ -1,188 +0,0 @@ -// -*- C++ -*- - -//============================================================================= -/** - * @file k_Efficient_P_Strategy.h - * - * - * - * - * - * @author Paul Oberlin <pauloberlin@gmail.com> - */ -//============================================================================= - -#ifndef ACE_K_EFFICIENT_P_STRATEGY_H -#define ACE_K_EFFICIENT_P_STRATEGY_H - -#include /**/ "ace/pre.h" - -#include "ace/DA_Strategy_Base.h" -#include "ace/Mutex.h" -#include <vector> - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -template <typename AnnotationId> -class k_Efficient_P_Strategy : public DA_Strategy_Base<AnnotationId> { - - //The annotations consist of an identifier and a resource cost value - -public: - //note: k must be less than maxThreads - k_Efficient_P_Strategy(int maxThreads, int k); - virtual ~k_Efficient_P_Strategy(); - virtual int is_deadlock_potential(AnnotationId handle); - virtual void grant(AnnotationId handle); - virtual void release(AnnotationId upcall_handle); -private: - int compute_min_illegal(); - int get_min_illegal(); - int min_illegal_; - ACE_Mutex computation_mutex_; - int k_; - bool min_illegal_is_computed_ ; - std::vector<int> a; - std::vector<int> A; -}; -//#if defined (__ACE_INLINE__) -//#include "ace/k_Efficient_P_Strategy.inl" -//#endif /* __ACE_INLINE__ */ - -ACE_BEGIN_VERSIONED_NAMESPACE_DECL - -template <typename AnnotationId> -ACE_INLINE -k_Efficient_P_Strategy<AnnotationId>::k_Efficient_P_Strategy(int maxThreads, int k) -:DA_Strategy_Base<AnnotationId>(maxThreads), - k_(k) - { - a.resize(k_ + 1); - A.resize(k_ + 1); - for (int i=0; i<k_; ++i) { - a[i] = 0; - A[i] = 0; - } - min_illegal_ = maxThreads; - min_illegal_is_computed_ = true; -} - -template <typename AnnotationId> -ACE_INLINE -k_Efficient_P_Strategy<AnnotationId>::~k_Efficient_P_Strategy() -{ - -} - -template <typename AnnotationId> -ACE_INLINE -int k_Efficient_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) -{ - int annotation = DA_Strategy_Base<AnnotationId>::get_annotation(handle); - - int min_illegal = get_min_illegal(); - if (annotation >= min_illegal) - { - return annotation - min_illegal + 1; - } - - return 0; -} - -template <typename AnnotationId> -ACE_INLINE -int -k_Efficient_P_Strategy<AnnotationId>::compute_min_illegal() -{ - int T = this->get_max_threads(); - for (int i=0; i<k_; ++i) { - if (!(A[i] < (T - i))) { - return i; - } - } - if (A[k_]>0) { - return (T - A[k_]); - } - return T; -} - -template <typename AnnotationId> -ACE_INLINE -int -k_Efficient_P_Strategy<AnnotationId>::get_min_illegal() -{ - computation_mutex_.acquire(); - if (!min_illegal_is_computed_) { - min_illegal_ = compute_min_illegal(); - min_illegal_is_computed_ = true; - } - computation_mutex_.release(); - return min_illegal_; -} - -template <typename AnnotationId> -ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle) -{ - int annotation = get_annotation(handle); - computation_mutex_.acquire(); - if (annotation < k_) - { - a[annotation] ++; - for (int i=0; i<=annotation; ++i) - { - A[i]++; - } - } - else - { - a[k_] ++; - for (int i=0; i<=k_ ; ++i) - { - A[i]++; - } - } - min_illegal_is_computed_ = false; - computation_mutex_.release(); -} - -template <typename AnnotationId> -ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId>::release(AnnotationId handle) -{ - int annotation = get_annotation(handle); - computation_mutex_.acquire(); -/* if (annotation < k ) { - assert(a[annotation]>0); - } else { - assert(a[k] >0); - } -*/ - if (annotation < k_) - { - a[annotation] --; - for (int i=0; i<=annotation; ++i) - { - A[i]--; - } - } - else - { - a[k_] --; - for (int i=0; i<=k_ ; ++i) - { - A[i]--; - } - } - min_illegal_is_computed_ = false; - computation_mutex_.release(); -} - - -ACE_END_VERSIONED_NAMESPACE_DECL - -#include /**/ "ace/post.h" - -#endif /* ACE_BASIC_P_STRATEGY_H */ diff --git a/ACE/ace/k_Efficient_P_Strategy.inl b/ACE/ace/k_Efficient_P_Strategy.inl deleted file mode 100644 index b1fe2c17a18..00000000000 --- a/ACE/ace/k_Efficient_P_Strategy.inl +++ /dev/null @@ -1,124 +0,0 @@ -ACE_BEGIN_VERSIONED_NAMESPACE_DECL - -template <typename AnnotationId> -ACE_INLINE -k_Efficient_P_Strategy<AnnotationId>::k_Efficient_P_Strategy(int maxThreads, int k) -:DA_Strategy_Base<AnnotationId>(maxThreads), - k_(k) - { - a.resize(k_ + 1); - A.resize(k_ + 1); - for (int i=0; i<k_; ++i) { - a[i] = 0; - A[i] = 0; - } - min_illegal_ = maxThreads; - min_illegal_is_computed_ = true; -} - -template <typename AnnotationId> -ACE_INLINE -k_Efficient_P_Strategy<AnnotationId>::~k_Efficient_P_Strategy() -{ - -} - -template <typename AnnotationId> -ACE_INLINE -bool k_Efficient_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) -{ - int annotation = DA_Strategy_Base<AnnotationId>::get_annotation(handle); - return (annotation >= get_min_illegal()); -} - -template <typename AnnotationId> -ACE_INLINE -int -k_Efficient_P_Strategy<AnnotationId>::compute_min_illegal() -{ - int T = this->get_max_threads(); - for (int i=0; i<k_; ++i) { - if (!(A[i] < (T - i))) { - return i; - } - } - if (A[k_]>0) { - return (T - A[k_]); - } - return T; -} - -template <typename AnnotationId> -ACE_INLINE -int -k_Efficient_P_Strategy<AnnotationId>::get_min_illegal() -{ - computation_mutex_.acquire(); - if (!min_illegal_is_computed_) { - min_illegal_ = compute_min_illegal(); - min_illegal_is_computed_ = true; - } - computation_mutex_.release(); - return min_illegal_; -} - -template <typename AnnotationId> -ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle) -{ - int annotation = get_annotation(handle); - computation_mutex_.acquire(); - if (annotation < k_) - { - a[annotation] ++; - for (int i=0; i<=annotation; ++i) - { - A[i]++; - } - } - else - { - a[k_] ++; - for (int i=0; i<=k_ ; ++i) - { - A[i]++; - } - } - min_illegal_is_computed_ = false; - computation_mutex_.release(); -} - -template <typename AnnotationId> -ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId>::release(AnnotationId handle) -{ - int annotation = get_annotation(handle); - computation_mutex_.acquire(); -/* if (annotation < k ) { - assert(a[annotation]>0); - } else { - assert(a[k] >0); - } -*/ - if (annotation < k_) - { - a[annotation] --; - for (int i=0; i<=annotation; ++i) - { - A[i]--; - } - } - else - { - a[k_] --; - for (int i=0; i<=k_ ; ++i) - { - A[i]--; - } - } - min_illegal_is_computed_ = false; - computation_mutex_.release(); -} - - -ACE_END_VERSIONED_NAMESPACE_DECL
\ No newline at end of file |