summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-07-17 03:38:43 +0000
committerjmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-07-17 03:38:43 +0000
commit764103e1aaa91fc7cede07beaa312730e170f502 (patch)
treec575da2bbb55d1d02269d9fe75f608aa2214252e
parent1d845c80795563c69fe52c086ca9ea526e5f6891 (diff)
downloadATCD-764103e1aaa91fc7cede07beaa312730e170f502.tar.gz
Initial commit of rough priority inheritance protocol implementation. Files still need to be made consistent with ACE coding standards and stl references need to be removed
-rw-r--r--ACE/ace/PIP_Active_IO_Handler.cpp99
-rw-r--r--ACE/ace/PIP_Connection_Manager.cpp242
-rw-r--r--ACE/ace/PIP_Connection_Manager.h79
-rw-r--r--ACE/ace/PIP_DA_Strategy_Adapter.cpp4
-rw-r--r--ACE/ace/PIP_DA_Strategy_Adapter.h261
-rw-r--r--ACE/ace/PIP_Dispatcher.cpp505
-rw-r--r--ACE/ace/PIP_Dispatcher.h188
-rw-r--r--ACE/ace/PIP_IO_Handler.cpp185
-rw-r--r--ACE/ace/PIP_IO_Handler.h94
-rw-r--r--ACE/ace/PIP_Message_Handler.cpp105
-rw-r--r--ACE/ace/PIP_Message_Handler.h67
-rw-r--r--ACE/ace/PIP_Messages.cpp607
-rw-r--r--ACE/ace/PIP_Messages.h446
-rw-r--r--ACE/ace/PIP_Reactive_IO_Handler.cpp64
-rw-r--r--ACE/ace/PIP_Reactive_IO_Handler.h54
15 files changed, 3000 insertions, 0 deletions
diff --git a/ACE/ace/PIP_Active_IO_Handler.cpp b/ACE/ace/PIP_Active_IO_Handler.cpp
new file mode 100644
index 00000000000..46bfc8fcee2
--- /dev/null
+++ b/ACE/ace/PIP_Active_IO_Handler.cpp
@@ -0,0 +1,99 @@
+// $Id$
+
+#include "ace/PIP_Active_IO_Handler.h"
+
+
+#include <iostream>
+/// Constructor
+ACE_PIP_Active_IO_Handler::ACE_PIP_Active_IO_Handler()
+ : shutdown_(false)
+{
+ // acquire the shutdown lock so that when shutdown_svc is called,
+ // the caller cannot return until shutdown has been completed and
+ // lock relinquished
+ shutdown_lock_.acquire();
+}
+
+/// Closes all remote connections.
+int ACE_PIP_Active_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
+{
+ int result(0);
+ switch(close_mask)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ read_closed_ = true;
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ write_closed_ = true;
+ break;
+ };
+
+ if (read_closed_ && write_closed_)
+ {
+ // Close our end of the connection
+ peer_.close_reader();
+ peer_.close_writer();
+ delete this;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/// Enqueue a message to be sent
+int ACE_PIP_Active_IO_Handler::put_message (ACE_PIP_Protocol_Message* message)
+{
+ outgoing_message_queue_.enqueue(message);
+}
+
+int ACE_PIP_Active_IO_Handler::svc()
+{
+ int result(0);
+ ssize_t bytes_available(0);
+ char byte;
+
+ // run until we're told to quit
+ while (!shutdown_)
+ {
+ // peek to see if incoming message available
+ bytes_available = peer_.recv(&byte, 1, MSG_PEEK);
+ if (bytes_available > 0)
+ {
+ handle_input();
+ }
+
+ // handle outgoing message
+ result = handle_output();
+ if (result == -2)
+ {
+ // indicate to caller that the
+ // handler is no longer active
+ return -1;
+ }
+
+ bytes_available = 0;
+ }
+
+
+ return 0;
+}
+
+void ACE_PIP_Active_IO_Handler::shutdown_svc()
+{
+ shutdown_ = true;
+ shutdown_lock_.acquire();
+
+ handle_close(0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK);
+
+}
+
+int ACE_PIP_Active_IO_Handler::open(void*)
+{
+ std::cout << "activate" << std::endl;
+ this->activate();
+}
+
+
+
+
diff --git a/ACE/ace/PIP_Connection_Manager.cpp b/ACE/ace/PIP_Connection_Manager.cpp
new file mode 100644
index 00000000000..d92d2e27895
--- /dev/null
+++ b/ACE/ace/PIP_Connection_Manager.cpp
@@ -0,0 +1,242 @@
+ /**
+ * @file PIP_Connection_Manager.cpp
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ */
+
+
+
+
+#include <ace/INET_Addr.h>
+#include <ace/PIP_Connection_Manager.h>
+
+
+ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::connection_manager_ = 0;
+ACE_Mutex ACE_PIP_Connection_Manager::instance_lock_;
+bool ACE_PIP_Connection_Manager::delete_manager_ = false;
+
+/// Default Constructor
+ACE_PIP_Connection_Manager::ACE_PIP_Connection_Manager()
+{
+
+}
+
+/// Destructor
+ACE_PIP_Connection_Manager::~ACE_PIP_Connection_Manager()
+{
+
+}
+
+ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::instance()
+{
+ if (connection_manager_ == 0)
+ {
+ instance_lock_.acquire();
+
+ if (ACE_PIP_Connection_Manager::connection_manager_ == 0)
+ {
+ ACE_NEW_RETURN (ACE_PIP_Connection_Manager::connection_manager_,
+ ACE_PIP_Connection_Manager,
+ 0);
+
+ delete_manager_ = true;
+ }
+
+ instance_lock_.release();
+ }
+
+ return connection_manager_;
+}
+
+int ACE_PIP_Connection_Manager::establish_connections(ACE_UINT32 source_site_id)
+{
+ int result(0);
+
+ //establish connections
+ for (int i = 0; i < connection_definitions_->size(); ++i)
+ {
+ if ((*connection_definitions_)[i]->source_site_id ==
+ source_site_id)
+ {
+ ACE_INET_Addr address;
+ address.set((*connection_definitions_)[i]->port,
+ (*connection_definitions_)[i]->address.c_str());
+
+ if ((*connection_definitions_)[i]->type ==
+ Connection_Definition::ACTIVE)
+ {
+ ACE_PIP_Active_IO_Handler* handler = new ACE_PIP_Active_IO_Handler;
+ result = active_connector_.connect(handler, address);
+ if (result == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ handler->init(
+ (*connection_definitions_)[i]->source_site_id,
+ (*connection_definitions_)[i]->destination_site_id,
+ (*connection_definitions_)[i]->priority);
+
+ handlers_.push_back(handler);
+ }
+ }
+ else
+ {
+ ACE_PIP_Reactive_IO_Handler* handler = new ACE_PIP_Reactive_IO_Handler;
+ result = reactive_connector_.connect(handler, address);
+ if (result == -1)
+ {
+ std::cerr << "Unable to connect to "
+ << (*connection_definitions_)[i]->address << " "
+ << (*connection_definitions_)[i]->port
+ << std::endl;
+
+ return -1;
+ }
+ else
+ {
+ handler->init(
+ (*connection_definitions_)[i]->source_site_id,
+ (*connection_definitions_)[i]->destination_site_id,
+ (*connection_definitions_)[i]->priority);
+
+ handlers_.push_back(handler);
+ }
+ }
+ }
+
+ }
+
+ return result;
+}
+
+int ACE_PIP_Connection_Manager::process_connection_file(char* file_name)
+{
+ // Expecting the file to contain one tuple per line
+ // where each is of form (source_id, dest_id, dest_address, dest_port, priority, type)
+ std::ifstream* my_stream = new std::ifstream;
+
+ my_stream->open(file_name);
+
+ if (my_stream->fail())
+ {
+ std::cerr << "Failed to open connection file: " << file_name
+ << std::endl;
+
+ return -1;
+ }
+
+ std::string line;
+ std::string token;
+ int strlen;
+ int first_pos;
+ int second_pos;
+ Connection_Definition* current_definition(0);
+
+ std::getline(*my_stream, line);
+ int num_entries = atoi(line.c_str());
+
+ connection_definitions_ = new ACE_Vector<Connection_Definition*>;
+ for (int i = 0; i < num_entries; ++i)
+ {
+ current_definition = new Connection_Definition;
+ std::getline(*my_stream, line);
+ strlen = line.length();
+ first_pos = line.find("(");
+ if (first_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ second_pos = line.find(",", first_pos);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // source site ID
+ token.assign(line, first_pos + 1, second_pos - first_pos - 1);
+ current_definition->source_site_id = atoi(token.c_str());
+
+ first_pos = second_pos;
+ second_pos = line.find(",", first_pos + 1);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // destination site ID
+ token.assign(line, first_pos + 1, second_pos - first_pos - 1);
+ current_definition->destination_site_id = atoi(token.c_str());
+
+ first_pos = second_pos;
+ second_pos = line.find(",", first_pos + 1);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // IP address
+ current_definition->address.assign(line, first_pos + 1, second_pos - first_pos - 1);
+
+ first_pos = second_pos;
+ second_pos = line.find(",", first_pos + 1);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // IP port
+ token.assign(line, first_pos + 1, second_pos - first_pos - 1);
+ current_definition->port = atoi(token.c_str());
+
+ first_pos = second_pos;
+ second_pos = line.find(",", first_pos + 1);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // Connection priority
+ token.assign(line, first_pos + 1, second_pos - first_pos - 1);
+ current_definition->priority = atoi(token.c_str());
+
+ first_pos = second_pos;
+ second_pos = line.find(")", first_pos + 1);
+ if (second_pos > strlen)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // Connection Type
+ token.assign(line, first_pos + 1, second_pos - first_pos - 1);
+ if (token == "ACTIVE")
+ {
+ current_definition->type = Connection_Definition::ACTIVE;
+ }
+ else
+ {
+ current_definition->type = Connection_Definition::REACTIVE;
+ }
+
+ connection_definitions_->push_back(current_definition);
+ }
+
+ return 0;
+}
+
+const ACE_Vector<ACE_PIP_Connection_Manager::Connection_Definition*>* ACE_PIP_Connection_Manager::get_connections() const
+{
+ return connection_definitions_;
+}
diff --git a/ACE/ace/PIP_Connection_Manager.h b/ACE/ace/PIP_Connection_Manager.h
new file mode 100644
index 00000000000..e4925ec4568
--- /dev/null
+++ b/ACE/ace/PIP_Connection_Manager.h
@@ -0,0 +1,79 @@
+ /**
+ * @file PIP_Connection_Manager.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ */
+
+#ifndef _PIP_CONNECTION_MANAGER_H_
+#define _PIP_CONNECTION_MANAGER_H_
+
+#include <ace/Connector.h>
+#include <ace/PIP_Active_IO_Handler.h>
+#include <ace/PIP_Reactive_IO_Handler.h>
+#include <ace/Reactor.h>
+#include <ace/SOCK_Connector.h>
+#include <ace/Vector_T.h>
+
+#include <fstream>
+#include <iostream>
+#include <string>
+#include <vector>
+
+class ACE_Export ACE_PIP_Connection_Manager
+{
+ public:
+
+ /// Informationa associated with a connection
+ struct Connection_Definition
+ {
+ enum Handler_Type {ACTIVE, REACTIVE};
+
+ ACE_UINT32 source_site_id;
+ ACE_UINT32 destination_site_id;
+ std::string address;
+ u_short port;
+ ACE_UINT32 priority;
+ Handler_Type type;
+ };
+
+ /// Default Constructor
+ ACE_PIP_Connection_Manager();
+
+ /// Destructor
+ virtual ~ACE_PIP_Connection_Manager();
+
+ /// obtain the single instance of the manager
+ static ACE_PIP_Connection_Manager* instance();
+
+ /// Extract all connection information from a file
+ virtual int process_connection_file(char* filename);
+
+ /// Establish all connection for which source_site_id is the source
+ virtual int establish_connections(ACE_UINT32 source_site_id);
+
+ const ACE_Vector<Connection_Definition*>* get_connections() const;
+
+ private:
+
+ ACE_Vector<Connection_Definition*>* connection_definitions_;
+
+ // The connector used to actively connect to a remote site
+ ACE_Connector<
+ ACE_PIP_Active_IO_Handler,
+ ACE_SOCK_Connector> active_connector_;
+
+ ACE_Connector<
+ ACE_PIP_Reactive_IO_Handler,
+ ACE_SOCK_Connector> reactive_connector_;
+
+ static ACE_PIP_Connection_Manager* connection_manager_;
+ static ACE_Mutex instance_lock_;
+ static bool delete_manager_;
+
+ std::vector<ACE_PIP_IO_Handler*> handlers_;
+};
+
+#endif
diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.cpp b/ACE/ace/PIP_DA_Strategy_Adapter.cpp
new file mode 100644
index 00000000000..ecfd42a4747
--- /dev/null
+++ b/ACE/ace/PIP_DA_Strategy_Adapter.cpp
@@ -0,0 +1,4 @@
+// $Id$
+
+#include "PIP_DA_Strategy_Adapter.h"
+
diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.h b/ACE/ace/PIP_DA_Strategy_Adapter.h
new file mode 100644
index 00000000000..a0899aedaa7
--- /dev/null
+++ b/ACE/ace/PIP_DA_Strategy_Adapter.h
@@ -0,0 +1,261 @@
+ /**
+ * @file PIP_DA_Strategy_Adapter.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that adapts a deadlock avoidance strategy to additionally
+ * support priority inheritance protocol annotations
+*/
+
+
+#ifndef _PIP_DA_STRATEGY_ADAPTER_
+#define _PIP_DA_STRATEGY_ADAPTER_
+
+#include "ace/DA_Strategy_Base.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Mutex.h"
+#include "ace/Null_Mutex.h"
+
+
+#include <iostream>
+
+/**
+ * @class ACE_PIP_DA_Strategy_Adapter
+ * @brief Extends deadlock avoidance strategies
+ * to support priority inheritance annotations
+ *
+ * Deadlock avoidance strategies associate a resource cost annotation
+ * with each handle. This class extends the strategies to support
+ * the association of annotations with each priority at which the
+ * handle can be dispatched, i.e. the priority at which the corresponding
+ * thread resource can dispatch the handle
+*/
+template <typename Handle_Id, typename Lock>
+class ACE_PIP_DA_Strategy_Adapter
+{
+ public:
+
+ /// Constructor that takes the deadlock avoidance strategy that
+ /// the Strategy Adapter adapts.
+ ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy);
+ ~ACE_PIP_DA_Strategy_Adapter();
+
+ /// Indicates whether allocating a thread to the handle
+ /// at the specified priority could potentially result in deadlock.
+ int is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Grant the handle a thread at the specified priority.
+ void grant(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Release the thread
+ void release(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Determine the number of threads being managed by
+ /// the DA_Strategy adapter.
+ int get_max_threads();
+
+ /// Add an annotation value for the handle / priority pair.
+ int add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation);
+
+ /// Remove every annotation associated with this handle.
+ int remove_annotation (Handle_Id handle);
+ int remove_annotation (Handle_Id handle, ACE_UINT32 priority);
+
+private:
+
+ /// Associates each message handler with an internally generated id
+ /// which can be used, along with a priority, to lookup an annotation.
+ typedef ACE_Hash_Map_Manager_Ex<Handle_Id,
+ ACE_UINT32,
+ ACE_Hash<Handle_Id>,
+ ACE_Equal_To<Handle_Id>,
+ ACE_Null_Mutex> HANDLE_ID_MAP;
+
+ /// Associates each message handler with a set of potential priorities.
+ /// Message handler represented by internally generated id.
+ typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32,
+ ACE_Unbounded_Set<ACE_UINT32>*,
+ ACE_Hash<ACE_UINT32>,
+ ACE_Equal_To<ACE_UINT32>,
+ ACE_Null_Mutex> HANDLE_ID_PRIORITY_MAP;
+
+ /// Determines an id that uniquely identifies a handler/priority pair.
+ ACE_UINT64 hash_handle_id_and_priority(ACE_UINT32 handle_id,
+ ACE_UINT32 priority) const;
+
+ /// Generates an annotation ID given the actual handle and priority.
+ ACE_UINT64 get_annotation_id(Handle_Id handle, ACE_UINT32 priority);
+
+ DA_Strategy_Base<ACE_UINT64>* DA_strategy_;
+ HANDLE_ID_MAP handle_ids_;
+ HANDLE_ID_PRIORITY_MAP id_to_priority_map_;
+ Lock lock_;
+ ACE_UINT32 next_id_;
+};
+
+template <typename Handle_Id, typename Lock>
+ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy)
+: DA_strategy_(DA_strategy)
+, next_id_(0)
+{
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::~ACE_PIP_DA_Strategy_Adapter()
+{
+ HANDLE_ID_PRIORITY_MAP::iterator it = id_to_priority_map_.begin();
+ for (; it != id_to_priority_map_.end(); ++it)
+ {
+ delete it->item();
+ }
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_INLINE int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::get_max_threads()
+{
+ return DA_strategy_->get_max_threads();
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_INLINE ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ hash_handle_id_and_priority(ACE_UINT32 handle_id, ACE_UINT32 priority) const
+{
+ ACE_UINT64 result = handle_id;
+ result = (result << 32) | priority;
+ return result;
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ return DA_strategy_->is_deadlock_potential(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ grant(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ return DA_strategy_->grant(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ release(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ DA_strategy_->release(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation)
+{
+ ACE_UINT32 internal_handle_id(0);
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+
+ ACE_Guard<Lock> guard(lock_);
+ if (handle_ids_.find(handle, internal_handle_id) == -1)
+ {
+ // This is the first time handle has been encountered, so generate an
+ // internal handle id.
+ internal_handle_id = next_id_++;
+ handle_ids_.bind(handle, internal_handle_id);
+ priorities = new ACE_Unbounded_Set<ACE_UINT32>;
+ id_to_priority_map_.bind(internal_handle_id, priorities);
+ }
+ else
+ {
+ id_to_priority_map_.find(internal_handle_id, priorities);
+ }
+
+ priorities->insert(priority);
+
+ return DA_strategy_->add_annotation(
+ hash_handle_id_and_priority(internal_handle_id, priority), annotation);
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ remove_annotation (Handle_Id handle)
+{
+ ACE_Guard<Lock> guard(lock_);
+ ACE_UINT32 internal_handle_id(0);
+ if (handle_ids_.unbind(handle, internal_handle_id) != -1)
+ {
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+ if (id_to_priority_map_.unbind(internal_handle_id, priorities) != -1)
+ {
+ for (ACE_Unbounded_Set<ACE_UINT32>::ITERATOR it = priorities->begin();
+ it != priorities->end();
+ ++it)
+ {
+ DA_strategy_->remove_annotation(
+ get_annotation_id(internal_handle_id, *it));
+ }
+
+ delete priorities;
+ }
+ }
+
+ return 0;
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ remove_annotation (Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(lock_);
+ ACE_UINT32 internal_handle_id(0);
+ int result(0);
+ if (handle_ids_.find(handle, internal_handle_id) != -1)
+ {
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+ if (id_to_priority_map_.find(internal_handle_id, priorities) != -1)
+ {
+ if (priorities->remove(priority) != -1)
+ {
+ result = DA_strategy_->remove_annotation(
+ get_annotation_id(internal_handle_id, priority));
+ }
+ if (priorities->is_empty())
+ {
+ // This was the last annotation for this handle,
+ // so remove the handle information
+ id_to_priority_map_.unbind(internal_handle_id, priorities);
+ delete priorities;
+ handle_ids_.unbind(handle, internal_handle_id);
+ }
+ }
+ }
+
+ return result;
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ get_annotation_id(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_UINT64 annotation_id(0);
+ ACE_UINT32 handle_id(0);
+
+ if (handle_ids_.find(handle, handle_id) != -1)
+ {
+ annotation_id = hash_handle_id_and_priority(handle_id, priority);
+ }
+
+ return annotation_id;
+}
+
+#endif
+
diff --git a/ACE/ace/PIP_Dispatcher.cpp b/ACE/ace/PIP_Dispatcher.cpp
new file mode 100644
index 00000000000..dbc0931edbf
--- /dev/null
+++ b/ACE/ace/PIP_Dispatcher.cpp
@@ -0,0 +1,505 @@
+#include "ace/PIP_Dispatcher.h"
+#include "ace/PIP_Invocation_Manager.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Reactor.h"
+
+#include <iostream>
+
+ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::dispatcher_ = 0;
+ACE_Mutex ACE_PIP_Dispatcher::instance_lock_;
+bool ACE_PIP_Dispatcher::delete_dispatcher_ = false;
+bool ACE_PIP_Dispatcher::shutdown_ = false;
+
+/// Constructor
+ACE_PIP_Dispatcher::ACE_PIP_Dispatcher()
+ : current_highest_priority_(ACE_Event_Handler::LO_PRIORITY)
+ , current_lowest_priority_(ACE_Event_Handler::LO_PRIORITY)
+ , DA_strategy_adapter_(0)
+ , message_available_signal_(0)
+ , threads_available_signal_(0)
+ , waiting_for_message_(false)
+{
+}
+
+/// Destructor
+ACE_PIP_Dispatcher::~ACE_PIP_Dispatcher()
+{
+ ACE_PIP_Protocol_Message* message(0);
+
+ // Destroy all messages that have yet to be dispatched
+ pending_messages_lock_.acquire();
+ while (pending_messages_by_message_id_.current_size() != 0)
+ {
+ pending_messages_by_message_id_.unbind(
+ pending_messages_by_message_id_.begin()->key(),
+ message);
+
+ if (message)
+ {
+ delete message;
+ message = 0;
+ }
+ }
+ pending_messages_lock_.release();
+}
+
+
+ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::instance()
+{
+ if (ACE_PIP_Dispatcher::dispatcher_ == 0)
+ {
+ instance_lock_.acquire();
+
+ if (ACE_PIP_Dispatcher::dispatcher_ == 0)
+ {
+ ACE_NEW_RETURN (ACE_PIP_Dispatcher::dispatcher_,
+ ACE_PIP_Dispatcher,
+ 0);
+
+ delete_dispatcher_ = true;
+ }
+
+ instance_lock_.release();
+ }
+
+ return dispatcher_;
+}
+
+/// Receive a message for eventual dispatching
+void ACE_PIP_Dispatcher::process_message(ACE_PIP_Protocol_Message* message)
+{
+ switch (message->get_message_type())
+ {
+ case ACE_PIP_Protocol_Message::ACCEL:
+ process_incoming_acceleration(message);
+ break;
+
+ case ACE_PIP_Protocol_Message::REQUEST:
+ process_incoming_request(message);
+ break;
+
+ case ACE_PIP_Protocol_Message::RESPONSE:
+ // Forward the response to the invocation manager
+ ACE_PIP_Invocation_Manager::instance()->process_inbound_response(message);
+ break;
+
+ default:
+ std::cerr << "PIP_Dispatcher::process_message: Invalid Message type of " << message->get_message_type() << std::endl;
+ }
+}
+
+
+/// Signals the dispatcher to dispatch a new message if possible.
+int ACE_PIP_Dispatcher::handle_output (ACE_HANDLE)
+{
+ ACE_PIP_Protocol_Message* message(0);
+ bool message_dispatched(false);
+
+ while (!message_dispatched && !shutdown_)
+ {
+ // get the highest priority message
+ pending_messages_lock_.acquire();
+ message = retrieve_highest_priority_pending_message();
+ if (message)
+ {
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(message->get_next());
+
+ deadlock_avoidance_lock_.acquire();
+
+ /// If dispatching could potentially cause deadlock, try to accelerate all lower priority
+ /// messages and then wait for threads to become available
+ num_threads_needed_ = DA_strategy_adapter_->is_deadlock_potential(
+ data_message->get_destination_handler_ID(),
+ data_message->get_message_priority());
+
+ if (num_threads_needed_ > 0)
+ {
+ deadlock_avoidance_lock_.release();
+ find_and_accelerate_lower_priority_message(data_message->get_message_priority());
+
+ // Wait for signal indicating enough threads exist to dispatch the message
+ threads_available_signal_.acquire();
+
+ // Before grabing the deadlock avoidance lock, check to make sure
+ // we haven't been told to shutdown.
+ if (shutdown_)
+ break;
+
+ deadlock_avoidance_lock_.acquire();
+ }
+
+ // At this point, sufficient threads exist to dispatch the message
+ // without threat of deadlock, so grant a thread
+ DA_strategy_adapter_->grant(data_message->get_destination_handler_ID(),
+ data_message->get_message_priority());
+
+ deadlock_avoidance_lock_.release();
+
+ // Transfer the message to the "dispatched" list
+ dispatched_messages_lock_.acquire();
+ Dispatched_Message_Data dispatch_record;
+ dispatch_record.id = message->get_message_id();
+ dispatch_record.priority = data_message->get_message_priority();
+ dispatched_messages_data_.insert(dispatch_record);
+ dispatched_messages_lock_.release();
+
+ //-------------TEST DATA------------------
+ // store statistics to be printed later
+ Dispatch_Test_Data test_data;
+ test_data.id = message->get_message_id();
+ test_data.priority = data_message->get_message_priority();
+ test_data.num_pending = num_pending_messages_;
+ test_data.highest_priority = current_highest_priority_;
+ test_data.lowest_priority = current_lowest_priority_;
+ dispatch_records_.push_back(test_data);
+
+ dispatched_ids_.push_back(message->get_message_id());
+
+ ++num_messages_dispatched_;
+ --num_pending_messages_;
+ pending_messages_lock_.release();
+ //-----------------------------------------
+
+ // Request another thread to be associated with dispatcher
+ ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK);
+
+ message_dispatched = true;
+
+ // Pass the message to the invocation manager for processing
+ ACE_PIP_Invocation_Manager::instance()->process_inbound_request(message);
+
+ // All processing associated with the message has been completed
+ // so discard the record
+ dispatched_messages_lock_.acquire();
+ dispatched_messages_data_.erase(dispatch_record);
+ dispatched_messages_lock_.release();
+
+ // Cleanup message information and release the thread resource
+ deadlock_avoidance_lock_.acquire();
+ DA_strategy_adapter_->release(data_message->get_destination_handler_ID(),
+ data_message->get_message_priority());
+
+ if (num_threads_needed_ > 0)
+ {
+ --num_threads_needed_;
+ if (num_threads_needed_ == 0)
+ {
+ threads_available_signal_.release();
+ }
+ }
+
+ deadlock_avoidance_lock_.release();
+ }
+ else
+ {
+ // There are no messages to dispatch, so wait for one to arrive
+ waiting_for_message_ = true;
+ pending_messages_lock_.release();
+ message_available_signal_.acquire();
+
+ // Before dispatching a message, make sure we haven't been
+ // instructed to shutdown
+ if (shutdown_)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/// Initializes dispatcher
+void ACE_PIP_Dispatcher::init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter)
+{
+ DA_strategy_adapter_ = DA_strategy_adapter;
+ waiting_for_message_ = true;
+ ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK);
+}
+
+/// store the message
+void ACE_PIP_Dispatcher::process_incoming_request(ACE_PIP_Protocol_Message* message)
+{
+ // Store the message token 2 ways to enable efficient dispatching as well as
+ // efficient lookup for accelerations
+ pending_messages_lock_.acquire();
+
+ //-------TEST DATA------------------------
+ ++num_messages_received_;
+ ++num_pending_messages_;
+ received_ids_.push_back(message->get_message_id());
+
+ //------------------------------------------
+ ACE_UINT32 priority =
+ static_cast<ACE_PIP_Data_Message*>(message->get_next())->get_message_priority();
+
+ // update the priority upper and lower bounds. These values are stored to
+ // avoid checking the full range of priorities when dispatching messages
+ if (priority > current_highest_priority_)
+ {
+ current_highest_priority_ = priority;
+ }
+ else if (priority < current_lowest_priority_)
+ {
+ current_lowest_priority_ = priority;
+ }
+
+ PRIORITY_MESSAGE_LIST_MAP::iterator
+ message_iter = pending_messages_by_priority_.find(priority);
+
+ if (message_iter == pending_messages_by_priority_.end())
+ {
+ // Create a new entry for this priority level
+ std::list<ACE_PIP_Protocol_Message*> new_priority_list;
+ new_priority_list.push_back(message);
+ pending_messages_by_priority_.insert(
+ make_pair(priority, new_priority_list));
+ }
+ else
+ {
+ // Priority already exists, so add the message token to the list
+ message_iter->second.push_back(message);
+ }
+
+ pending_messages_by_message_id_.bind(message->get_message_id(), message);
+
+ if (waiting_for_message_)
+ {
+ waiting_for_message_ = false;
+
+ // Signal waiting dispatcher thread to dispatch new message
+ message_available_signal_.release();
+ }
+
+ pending_messages_lock_.release();
+
+}
+
+/// Find the highest priority message and return it
+ACE_PIP_Protocol_Message* ACE_PIP_Dispatcher::
+ retrieve_highest_priority_pending_message()
+{
+ ACE_PIP_Protocol_Message* message(0);
+ for (ACE_INT32 current_priority = (ACE_INT32)current_highest_priority_;
+ current_priority >= (ACE_INT32)current_lowest_priority_;
+ --current_priority)
+ {
+ PRIORITY_MESSAGE_LIST_MAP::iterator
+ pending_message_iter = pending_messages_by_priority_.find(current_priority);
+
+ for (; pending_message_iter != pending_messages_by_priority_.end();
+ ++pending_message_iter)
+ {
+ std::list<ACE_PIP_Protocol_Message*>::iterator next_message_iter =
+ pending_message_iter->second.begin();
+
+ if (next_message_iter != pending_message_iter->second.end())
+ {
+ // The highest-priority message has been found. Grab the message
+ // and remove it from both containers
+ message = *next_message_iter;
+ pending_message_iter->second.pop_front();
+ pending_messages_by_message_id_.unbind(message->get_message_id());
+ break;
+ }
+ else
+ {
+ // There are no messages at this priority. Since the search begins at
+ // the highest priority, lower the highest priority until a message
+ // is found
+ if (current_highest_priority_ > current_lowest_priority_)
+ {
+ --current_highest_priority_;
+ }
+ }
+ }
+
+ if (message)
+ {
+ break;
+ }
+ }
+
+ return message;
+}
+
+bool ACE_PIP_Dispatcher::
+find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority)
+{
+ bool found(false);
+ bool erased_this_pass(true);
+
+ dispatched_messages_lock_.acquire();
+
+ while(erased_this_pass)
+ {
+ erased_this_pass = false;
+
+ std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin();
+
+ // Find all dispatched messages having priority lower than new_priority. For each
+ // send an acceleration message, and update the dispatch record
+ for (; iter != dispatched_messages_data_.end() &&
+ num_threads_needed_ > 0; ++iter)
+ {
+ if (iter->priority < new_priority)
+ {
+ // A message has been found that has a lower priority,
+ // so the send an acceleration message
+ ACE_PIP_Accel_Message* accel_message = new ACE_PIP_Accel_Message;
+ accel_message->set_old_priority(iter->priority);
+ accel_message->set_new_priority(new_priority);
+
+ ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message;
+ protocol_message->set_message_type(ACE_PIP_Protocol_Message::ACCEL);
+ protocol_message->set_message_id(iter->id);
+ protocol_message->set_next(accel_message);
+
+ Dispatched_Message_Data dispatch_record = *iter;
+ dispatched_messages_data_.erase(iter);
+ dispatch_record.priority = new_priority;
+ dispatched_messages_data_.insert(dispatch_record);
+ std::cout << "PIP_Dispatcher::find_and_accel : accelerating " << iter->id << std::endl;
+ ACE_PIP_Invocation_Manager::instance()->process_acceleration(protocol_message);
+ found = true;
+ erased_this_pass = true;
+ break;
+ }
+ }
+ }
+
+ dispatched_messages_lock_.release();
+ pending_messages_lock_.release();
+}
+
+void ACE_PIP_Dispatcher::shutdown()
+{
+ shutdown_ = true;
+
+ // Pulse signals so waiting threads can quit
+ message_available_signal_.release();
+ threads_available_signal_.release();
+}
+
+void ACE_PIP_Dispatcher::process_incoming_acceleration(ACE_PIP_Protocol_Message* message)
+{
+ bool updated_pending(false);
+ // Look for pending message. If the message is pending, update the priority, move it around in data structures, and quit
+
+ ACE_PIP_Accel_Message* accel_message =
+ static_cast<ACE_PIP_Accel_Message*>(message->get_next());
+
+ pending_messages_lock_.acquire();
+ ACE_Hash_Map_Entry<ACE_UINT64, ACE_PIP_Protocol_Message*>* entry(0);
+ if (pending_messages_by_message_id_.find(message->get_message_id(), entry) == 0)
+ {
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(entry->item()->get_next());
+
+ data_message->set_message_priority(accel_message->get_new_priority());
+
+ // move the message from one priority to the other
+ updated_pending = true;
+
+ std::cout << "Dispatcher::Accelerated pending message" << std::endl;
+ }
+ pending_messages_lock_.release();
+
+ if (!updated_pending)
+ {
+ bool found(false);
+ ACE_Guard<ACE_Mutex> guard(dispatched_messages_lock_);
+ // Message is not pending, so must already be dispatche
+ std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin();
+
+ // Find all dispatched messages having priority lower than new_priority. For each
+ // send an acceleration message, and update the dispatch record
+ for (; iter != dispatched_messages_data_.end(); ++iter)
+ {
+ if ((iter->id == message->get_message_id()) &&
+ (iter->priority < accel_message->get_new_priority()))
+ {
+ std::cout << "Dispatcher::Accelerated dispatched message" << std::endl;
+ Dispatched_Message_Data dispatch_record = *iter;
+ dispatched_messages_data_.erase(iter);
+ dispatch_record.priority = accel_message->get_new_priority();
+ dispatched_messages_data_.insert(dispatch_record);
+ ACE_PIP_Invocation_Manager::instance()->process_acceleration(message);
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ for (std::vector<ACE_UINT64>::iterator it = received_ids_.begin();
+ it != received_ids_.end(); ++it)
+ {
+ if (*it == message->get_message_id())
+ {
+ std::cout << "MessageID: " << *it << " already came and left" << std::endl;
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ {
+ std::cout << "Accel for messageID: " << message->get_message_id() <<
+ " beat message to the remote dispatcher" << std::endl;
+ }
+ }
+
+ }
+}
+
+
+
+void ACE_PIP_Dispatcher::print_results()
+{
+ std::cout << "----------------------DISPATCHER_RESULTS-------------" << std::endl;
+ std::cout << std::endl;
+ std::cout << "Num received: " << num_messages_received_ << std::endl;
+ std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl;
+ std::cout << std::endl;
+
+ std::cout << "Received Ids: " << std::endl;
+ for (std::vector<ACE_UINT64>::iterator rec_id_iter = received_ids_.begin();
+ rec_id_iter != received_ids_.end();
+ ++rec_id_iter)
+ {
+ std::cout << *rec_id_iter << std::endl;
+ }
+
+ std::cout << std::endl;
+ std::cout << "Dispatched Ids: " << std::endl;
+ for (std::vector<ACE_UINT64>::iterator disp_id_iter = dispatched_ids_.begin();
+ disp_id_iter != dispatched_ids_.end();
+ ++disp_id_iter)
+
+ {
+ std::cout << *disp_id_iter << std::endl;
+ }
+
+ std::cout << std::endl;
+ std::cout << "Dispatch Records: " << std::endl;
+ for (std::vector<ACE_PIP_Dispatcher::Dispatch_Test_Data>::iterator rec_iter = dispatch_records_.begin();
+ rec_iter != dispatch_records_.end();
+ ++rec_iter)
+
+ {
+ std::cout << "Id: " << rec_iter->id << std::endl;
+ std::cout << "Priority: " << rec_iter->priority << std::endl;
+ std::cout << "Num Pending: " << rec_iter->num_pending << std::endl;
+ std::cout << "Highest Priority " << rec_iter->highest_priority << std::endl;
+ std::cout << "Lowest Priority " << rec_iter->lowest_priority << std::endl;
+ std::cout << std::endl;
+ }
+
+ std::cout << std::endl;
+ std::cout << "Num received: " << num_messages_received_ << std::endl;
+ std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl;
+ std::cout << std::endl;
+
+
+ std::cout << "-----------------------------------------------------" << std::endl;
+}
diff --git a/ACE/ace/PIP_Dispatcher.h b/ACE/ace/PIP_Dispatcher.h
new file mode 100644
index 00000000000..d93b2957ca6
--- /dev/null
+++ b/ACE/ace/PIP_Dispatcher.h
@@ -0,0 +1,188 @@
+ /**
+ * @file PIP_Dispatcher.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that dispatches priority inheritance protocol messages
+ * to the appropriate message handler.
+*/
+
+
+#ifndef _PIP_DISPATCHER_H_
+#define _PIP_DISPATCHER_H_
+
+// ACE definitions
+#include "ace/Event_Handler.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/PIP_DA_Strategy_Adapter.h"
+#include "ace/PIP_Messages.h"
+#include "ace/RW_Thread_Mutex.h"
+#include "ace/Semaphore.h"
+#include "ace/Singleton.h"
+
+// STL definitions
+#include <list>
+#include <map>
+#include <set>
+#include <vector>
+
+// Forward Declarations
+class ACE_PIP_Protocol_Message;
+
+typedef std::map<ACE_UINT32, std::list<ACE_PIP_Protocol_Message*> >
+ PRIORITY_MESSAGE_LIST_MAP;
+
+// Associate each message with a message ID
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64,
+ ACE_PIP_Protocol_Message*,
+ ACE_Hash<ACE_UINT64>,
+ ACE_Equal_To<ACE_UINT64>,
+ ACE_Null_Mutex> ID_MESSAGE_MAP;
+
+
+/**
+ * @class ACE_Dispatcher
+ * @brief Dispatches ACE_PIP_Priority_Messages in priority order
+ * message handlers. Additionally, notifies handlers when priority inversion is
+ * detected.
+ *
+ * The PIP_Message_Dispatcher implements the priority inheritance protocol.
+ * Upon receipt of messages, it determines the highest-priority message to
+ * be dispatched, and dispatches providing enough resources exist. If not enough exist,
+ * and a lower priority message has been dispatched, an acceleration message is sent
+ * to the corresponding handler to raise the priority of the message, thus
+ * mitigating the inversion.
+*/
+class ACE_Export ACE_PIP_Dispatcher : public ACE_Event_Handler
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Dispatcher();
+
+ /// Destructor
+ virtual ~ACE_PIP_Dispatcher();
+
+ /// obtain the single instance of the dispatcher
+ static ACE_PIP_Dispatcher* instance();
+
+ /// Receive a message for eventual dispatching
+ void process_message(ACE_PIP_Protocol_Message* message);
+
+ /// Signals the dispatcher to dispatch a new message if possible.
+ virtual int handle_output (ACE_HANDLE);
+
+ /// Initializes dispatcher
+ void init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter);
+
+ /// Tell the dispatcher to stop dispatching and release all threads ASAP
+ void shutdown();
+
+ /// Accelerate the appropriate message
+ void process_incoming_acceleration(ACE_PIP_Protocol_Message* message);
+
+ /// Print statistics
+ void print_results();
+
+ private:
+
+ // Dispatched_Message_Data stores the ID and priority
+ // of a dispatched message
+ class Dispatched_Message_Data
+ {
+ public:
+
+ bool operator<(const Dispatched_Message_Data& other) const
+ {
+ return (priority < other.priority);
+ }
+
+ bool operator==(const Dispatched_Message_Data& other) const
+ {
+ return (id == other.id);
+ }
+
+ bool operator!=(const Dispatched_Message_Data& other) const
+ {
+ return !(*this == other);
+ }
+
+ ACE_UINT64 id;
+ ACE_UINT32 priority;
+ };
+
+ class Dispatch_Test_Data
+ {
+ public:
+ ACE_UINT64 id;
+ ACE_UINT64 priority;
+ ACE_UINT32 num_pending;
+ ACE_UINT32 highest_priority;
+ ACE_UINT32 lowest_priority;
+ };
+
+ /// store the message
+ void process_incoming_request(ACE_PIP_Protocol_Message* message);
+
+ /// Find the highest priority message and return it
+ ACE_PIP_Protocol_Message* retrieve_highest_priority_pending_message();
+
+ bool find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority);
+
+
+ // Dispatched message data is stored to determine which messages are
+ // currently assigned to a thread. This is useful for finding messages
+ // whose priority needs to be accelerated in the case where an inversion
+ // is detected.
+ std::set<Dispatched_Message_Data> dispatched_messages_data_;
+ ACE_Mutex dispatched_messages_lock_;
+
+ ACE_UINT32 current_highest_priority_;
+ ACE_UINT32 current_lowest_priority_;
+
+ // Pending messages (those not dispatched) are stored in 2 ways for efficiency
+ // 1.) By message id - this is useful for managing priority accelerations
+ // because we can find the appropriate message in constant time
+ // 2.) By priority - this is useful for determining which message to dispatch next
+ // as messages are dispatched in priority order
+ PRIORITY_MESSAGE_LIST_MAP pending_messages_by_priority_;
+ ID_MESSAGE_MAP pending_messages_by_message_id_;
+ ACE_Mutex pending_messages_lock_;
+
+ // Indicates the dispatcher has a thread waiting to
+ // dispatch a message
+ bool waiting_for_message_;
+
+ // Number of threads that need to be returned in order to
+ // dispatch the current message
+ int num_threads_needed_;
+
+ ACE_Semaphore message_available_signal_;
+ ACE_Semaphore threads_available_signal_;
+
+ ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter_;
+ ACE_Mutex deadlock_avoidance_lock_;
+
+ static ACE_PIP_Dispatcher* dispatcher_;
+ static ACE_Mutex instance_lock_;
+ static bool delete_dispatcher_;
+ static bool shutdown_;
+
+ // Test variables
+ ACE_UINT32 num_pending_messages_;
+ ACE_UINT32 num_messages_received_;
+ ACE_UINT32 num_messages_dispatched_;
+ std::vector<ACE_UINT64> received_ids_;
+ std::vector<ACE_UINT64> dispatched_ids_;
+ std::vector<Dispatch_Test_Data> dispatch_records_;
+
+};
+
+// Define a singleton class to make the dispatcher globally accessible
+typedef ACE_Singleton<ACE_PIP_Dispatcher, ACE_Mutex>
+ ACE_PIP_Dispatcher_Singleton;
+
+#endif
diff --git a/ACE/ace/PIP_IO_Handler.cpp b/ACE/ace/PIP_IO_Handler.cpp
new file mode 100644
index 00000000000..f30713751a5
--- /dev/null
+++ b/ACE/ace/PIP_IO_Handler.cpp
@@ -0,0 +1,185 @@
+// $Id$
+
+#include "ace/Guard_T.h"
+#include "ace/PIP_IO_Handler.h"
+#include "ace/PIP_Invocation_Manager.h"
+#include "ace/PIP_Dispatcher.h"
+
+/// Constructor
+ACE_PIP_IO_Handler::ACE_PIP_IO_Handler()
+ : priority_set_(false)
+ , destination_site_id_(0)
+ , site_id_(0)
+ , handler_id_(0)
+ , millisecond_(0, 1000)
+{
+ // Temporarily assign the priority to be highest possible.
+ // The first message received by the handler will be the priority
+ this->priority(ACE_Event_Handler::HI_PRIORITY);
+}
+
+/// Destructor
+ACE_PIP_IO_Handler::~ACE_PIP_IO_Handler( )
+{
+ // Tell the Invocation Manager to stop sending us messages
+ ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this);
+
+ // Delete all outgoing messages
+ ACE_PIP_Protocol_Message* message(0);
+ while (!outgoing_message_queue_.is_empty())
+ {
+ outgoing_message_queue_.dequeue_tail(message);
+ delete message;
+ }
+}
+
+void ACE_PIP_IO_Handler::site_id(ACE_UINT32 site_id)
+{
+ site_id_ = site_id;
+}
+
+ACE_UINT32 ACE_PIP_IO_Handler::site_id() const
+{
+ return site_id_;
+}
+
+ACE_UINT32 ACE_PIP_IO_Handler::destination_site_id() const
+{
+ return destination_site_id_;
+}
+
+void ACE_PIP_IO_Handler::handler_id(ACE_UINT32 handler_id)
+{
+ handler_id_ = handler_id;
+}
+
+ACE_UINT32 ACE_PIP_IO_Handler::handler_id() const
+{
+ return handler_id_;
+}
+
+/// Initialize the priority of the handler, and inform the other end
+/// of the priority
+void ACE_PIP_IO_Handler::init(ACE_UINT32 site_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 priority)
+{
+ this->priority(priority);
+ site_id_ = site_id;
+ destination_site_id_ = destination_site_id;
+
+ // Inform other end of this connections priority
+ peer_.send(&priority, sizeof(priority));
+
+ // Inform other end of this end's site id
+ peer_.send(&site_id, sizeof(site_id));
+ priority_set_ = true;
+
+ // Register to receive outgoing messages
+ ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this);
+}
+
+void ACE_PIP_IO_Handler::extract_priority()
+{
+ ACE_UINT32 priority(0);
+ if (peer_.recv(&priority, sizeof(priority)) == sizeof(priority))
+ {
+ this->priority(priority);
+ }
+ else
+ {
+ this->priority(ACE_Event_Handler::LO_PRIORITY);
+ }
+
+ // Receive the other end's site id
+ if (peer_.recv(&destination_site_id_, sizeof(destination_site_id_)) != sizeof(destination_site_id_))
+ {
+ destination_site_id_ = 0;
+ }
+
+ priority_set_ = true;
+}
+
+/// Handles read event on socket.
+int ACE_PIP_IO_Handler::handle_input (ACE_HANDLE fd)
+{
+ int result(0);
+ int bytes_read(0);
+
+ if (!priority_set_)
+ {
+ // incoming message is the priority of this connection
+ extract_priority();
+ ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this);
+ }
+ else
+ {
+ // Read the next incoming message
+ ACE_PIP_Protocol_Message* message = new ACE_PIP_Protocol_Message;
+ bytes_read = message->deserialize(peer_);
+ if (bytes_read > 0)
+ {
+ if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL)
+ {
+ std::cout << "Accel Message Received" << std::endl;
+ }
+
+ ACE_PIP_Dispatcher::instance()->process_message(message);
+ }
+ else if (bytes_read < 0)
+ {
+ // The connection is broken, so handler should be deleted
+ delete message;
+ result = -1;
+ }
+ }
+
+ return result;
+}
+
+
+/// Handles output event on socket
+int ACE_PIP_IO_Handler::handle_output (ACE_HANDLE fd)
+{
+ int bytes_sent(0);
+ // determine if outgoing messages exist
+ ACE_PIP_Protocol_Message* message(0);
+
+ write_closed_ = false;
+ big_lock_.acquire();
+ if (outgoing_message_queue_.dequeue_tail(message) != -1)
+ {
+ if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL)
+ {
+ std::cout << "Sending accel message" << std::endl;
+ }
+ bytes_sent = message->serialize(peer_);
+ delete message;
+ if (bytes_sent >= 0)
+ {
+ big_lock_.release();
+ return 0;
+ }
+ else
+ {
+ write_closed_ = true;
+ big_lock_.release();
+ // indicate the outgoing connection is closed
+ return -2;
+ }
+ }
+ else
+ {
+ // indicate that there was no message to output
+
+ big_lock_.release();
+ return -1;
+ }
+}
+
+ACE_INET_Addr ACE_PIP_IO_Handler::get_remote_address() const
+{
+ ACE_INET_Addr addr;
+ peer_.get_remote_addr(addr);
+ return addr;
+}
diff --git a/ACE/ace/PIP_IO_Handler.h b/ACE/ace/PIP_IO_Handler.h
new file mode 100644
index 00000000000..90665097b5e
--- /dev/null
+++ b/ACE/ace/PIP_IO_Handler.h
@@ -0,0 +1,94 @@
+ /**
+ * @file PIP_IO_Handler.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that manages network I/O
+*/
+
+
+#ifndef _PIP_IO_HANDLER_H_
+#define _PIP_IO_HANDLER_H_
+
+
+#include "ace/Message_Queue.h"
+#include "ace/Mutex.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Svc_Handler.h"
+#include "ace/Thread_Mutex.h"
+
+// Typedefs
+typedef ACE_Message_Queue_Ex<ACE_PIP_Protocol_Message, ACE_NULL_SYNCH>
+ PROTO_MESSAGE_QUEUE_TYPE;
+
+/**
+ * @class ACE_PIP_IO_Handler
+ *
+ * @brief Performs network I/O
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+class ACE_Export ACE_PIP_IO_Handler :
+ public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_MT_SYNCH>
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_IO_Handler ();
+
+ /// Destructor
+ virtual ~ACE_PIP_IO_Handler();
+
+ /// Enqueue a message to be sent
+ virtual int put_message (ACE_PIP_Protocol_Message* message) = 0;
+
+ /// Initialize the priority of the handler, and inform the other end
+ /// of the priority
+ virtual void init(ACE_UINT32 site_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 priority);
+
+ /// Handles read event on socket.
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+
+ /// Handles read event on socket.
+ virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+
+ /// Determine the id of the site at which the handler is located,
+ void site_id(ACE_UINT32 site_id);
+ ACE_UINT32 site_id() const;
+
+ /// Determine the other end's site id
+ ACE_UINT32 destination_site_id() const;
+
+ /// Determine the id that uniquely identifies this handler,
+ void handler_id(ACE_UINT32 handler_id);
+ ACE_UINT32 handler_id() const;
+ ACE_INET_Addr get_remote_address() const;
+
+ protected:
+
+ /// Reads priority from socket
+ void extract_priority();
+
+ // variables to track the state of the handler
+ bool read_closed_;
+ bool write_closed_;
+ bool priority_set_;
+
+ ACE_UINT32 site_id_;
+ ACE_UINT32 handler_id_;
+ ACE_UINT32 destination_site_id_;
+
+ const ACE_Time_Value millisecond_;
+
+ PROTO_MESSAGE_QUEUE_TYPE outgoing_message_queue_;
+ ACE_Thread_Mutex big_lock_;
+};
+
+#endif /* _PIP_IO_Handler_H_ */
+
+
diff --git a/ACE/ace/PIP_Message_Handler.cpp b/ACE/ace/PIP_Message_Handler.cpp
new file mode 100644
index 00000000000..caf7a8f1ff6
--- /dev/null
+++ b/ACE/ace/PIP_Message_Handler.cpp
@@ -0,0 +1,105 @@
+#include "ace/PIP_Message_Handler.h"
+#include "ace/PIP_Invocation_Manager.h"
+
+ACE_PIP_Message_Handler::ACE_PIP_Message_Handler()
+ : handler_id_(0)
+ , site_id_(0)
+{
+
+}
+
+ACE_PIP_Message_Handler::ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id)
+ : site_id_(site_id)
+ , handler_id_(handler_id)
+{
+}
+
+ACE_PIP_Protocol_Message* ACE_PIP_Message_Handler::create_protocol_message(ACE_UINT64 message_id,
+ bool reply_expected,
+ ACE_UINT32 source_handler_id,
+ ACE_UINT32 source_site_id,
+ ACE_UINT32 destination_handler_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 message_priority,
+ ACE_PIP_Protocol_Message::Message_Type type,
+ const std::string& data_payload)
+{
+ // setup the proto message header
+ ACE_PIP_Protocol_Message* proto_message = new ACE_PIP_Protocol_Message;
+ proto_message->set_message_id(message_id);
+ proto_message->set_message_type(type);
+
+ ACE_PIP_Data_Message* data_message = new ACE_PIP_Data_Message;
+ data_message->set_reply_expected(reply_expected);
+
+ data_message->set_source_handler_ID(source_handler_id);
+ data_message->set_source_site_ID(source_site_id);
+ data_message->set_destination_handler_ID(destination_handler_id);
+ data_message->set_destination_site_ID(destination_site_id);
+
+ data_message->set_message_priority(message_priority);
+
+ // Create data message header and body, then pass to protocol message to be parsed
+ // and unpacked
+ ACE_Message_Block* header = new ACE_Message_Block(sizeof(ACE_PIP_Data_Message));
+ ACE_Message_Block* body = new ACE_Message_Block(data_payload.length() + 1);
+
+ ACE_OS::memcpy(body->wr_ptr(), data_payload.c_str(), data_payload.length() + 1);
+ body->wr_ptr(data_payload.length() + 1);
+
+ // attach the data body to the header
+ header->next(body);
+
+ // pack the header values into the message block
+ // set the write ptr ahead so pack() will know to put it back where it should be
+ header->wr_ptr(sizeof(ACE_PIP_Data_Message));
+ data_message->block_ = header;
+ data_message->pack();
+
+ proto_message->set_next(data_message);
+ return proto_message;
+}
+
+void ACE_PIP_Message_Handler::send_request(ACE_Message_Block* message,
+ ACE_UINT64 message_id,
+ ACE_Message_Block*& response)
+{
+ ACE_Future<ACE_Message_Block*>* response_holder(0);
+ ACE_PIP_Invocation_Manager::instance()->process_outbound_request(message, message_id, response_holder);
+ if (response_holder)
+ {
+ if (response_holder->get(response) == -1)
+ {
+ std::cerr << "Error receiving response in ::send_request" << std::endl;
+ response = 0;
+ }
+ }
+}
+
+void ACE_PIP_Message_Handler::send_response(ACE_Message_Block* message,
+ ACE_UINT64 message_id)
+{
+ ACE_PIP_Invocation_Manager::instance()->process_outbound_response(message, message_id);
+}
+
+
+ACE_UINT32 ACE_PIP_Message_Handler::get_handler_id() const
+{
+ return handler_id_;
+}
+void ACE_PIP_Message_Handler::set_handler_id(ACE_UINT32 id)
+{
+ handler_id_ = id;
+}
+
+ACE_UINT32 ACE_PIP_Message_Handler::get_site_id() const
+{
+ return site_id_;
+}
+
+void ACE_PIP_Message_Handler::set_site_id(ACE_UINT32 id)
+{
+ site_id_ = id;
+}
+
+
diff --git a/ACE/ace/PIP_Message_Handler.h b/ACE/ace/PIP_Message_Handler.h
new file mode 100644
index 00000000000..2dec7cb628e
--- /dev/null
+++ b/ACE/ace/PIP_Message_Handler.h
@@ -0,0 +1,67 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file PIP_Message_Handler.h
+ *
+ *
+ * @author John Moore
+ */
+//=============================================================================
+
+
+#ifndef _PIP_MESSAGE_HANDLER_H_
+#define _PIP_MESSAGE_HANDLER_H_
+
+#include "ace/Message_Block.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Event_Handler.h"
+
+class ACE_Export ACE_PIP_Message_Handler
+{
+ public:
+
+ ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 message_id);
+ ACE_PIP_Message_Handler();
+ virtual ~ACE_PIP_Message_Handler(){}
+
+ virtual void process_incoming_message(ACE_Message_Block* message,
+ ACE_UINT64 message_id) = 0;
+
+ ACE_UINT32 get_handler_id() const;
+ void set_handler_id(ACE_UINT32 id);
+
+ ACE_UINT32 get_site_id() const;
+ void set_site_id(ACE_UINT32 id);
+
+
+
+ protected:
+
+ ACE_UINT32 handler_id_;
+ ACE_UINT32 site_id_;
+ ACE_INET_Addr my_address_;
+
+ // Pass a message to a remote handler
+ virtual void send_request(ACE_Message_Block* message,
+ ACE_UINT64 message_id,
+ ACE_Message_Block*& response);
+
+ // Pass a response message to a remote handler
+ virtual void send_response(ACE_Message_Block* message,
+ ACE_UINT64 message_id);
+
+ ACE_PIP_Protocol_Message* create_protocol_message(ACE_UINT64 message_id,
+ bool reply_expected,
+ ACE_UINT32 source_handler_id,
+ ACE_UINT32 source_site_id,
+ ACE_UINT32 destination_handler_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 message_priority,
+ ACE_PIP_Protocol_Message::Message_Type type,
+ const std::string& data_payload);
+
+};
+
+#endif
+
diff --git a/ACE/ace/PIP_Messages.cpp b/ACE/ace/PIP_Messages.cpp
new file mode 100644
index 00000000000..67fb886a3ec
--- /dev/null
+++ b/ACE/ace/PIP_Messages.cpp
@@ -0,0 +1,607 @@
+// $Id$
+
+#include "ace/OS_NS_stdlib.h"
+#include "ace/OS_NS_string.h"
+#include "ace/PIP_Messages.h"
+
+#include <iostream>
+
+ACE_PIP_Message::ACE_PIP_Message()
+ : block_(0)
+ , dirty_(false)
+ , next_(0)
+{}
+
+ACE_PIP_Message::~ACE_PIP_Message()
+{
+ if (next_)
+ {
+ delete next_;
+ }
+
+ if (block_)
+ {
+ block_->release();
+ }
+}
+
+void ACE_PIP_Message::set_block(ACE_Message_Block* block)
+{
+ // Remove the other block if it exist.
+ if (block_)
+ {
+ block_->release();
+ }
+
+ block_ = block;
+
+ // Extract the values from the block.
+ unpack();
+}
+
+ACE_PIP_Data_Message::ACE_PIP_Data_Message()
+ : message_priority_(-1)
+ , reply_expected_(false)
+ , source_handler_ID_(-1)
+ , source_site_ID_(-1)
+ , destination_handler_ID_(-1)
+ , destination_site_ID_(-1)
+{
+}
+
+int ACE_PIP_Data_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_sent(0);
+
+ // Only serialize if there is a block. If not,
+ // there's nothing we can do but fail since we don't
+ // have enough information to create a block and unpack it.
+ if (block_)
+ {
+ if (dirty_)
+ {
+ pack();
+ }
+
+ ACE_Message_Block* curr_block = block_;
+ int bytes_sent(0);
+ while(curr_block)
+ {
+ bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length());
+ if (bytes_sent > 0)
+ {
+ total_bytes_sent += bytes_sent;
+ curr_block = curr_block->next();
+ }
+ else
+ {
+ std::cout << "Data_Mess:serialize: - didn't send any bytes" << std::endl;
+ total_bytes_sent = -1;
+ break;
+ }
+ }
+ }
+ else
+ {
+ std::cout << "DataMessage::Serialize - there is no block" << std::endl;
+ total_bytes_sent = -1;
+ }
+
+ return total_bytes_sent;
+}
+
+void ACE_PIP_Data_Message::pack()
+{
+ char* write_ptr = block_->wr_ptr();
+ char* read_ptr = block_->rd_ptr();
+
+ block_->reset();
+
+ // Pack reply expected into buffer.
+ ACE_OS::memcpy(block_->wr_ptr(), &reply_expected_, sizeof(reply_expected_));
+ block_->wr_ptr(sizeof(reply_expected_));
+
+ // Pack the message priority into the buffer.
+ ACE_OS::memcpy(block_->wr_ptr(), &message_priority_, sizeof(message_priority_));
+ block_->wr_ptr(sizeof(message_priority_));
+
+ // Pack the destination handler ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &destination_handler_ID_, sizeof(destination_handler_ID_));
+ block_->wr_ptr(sizeof(destination_handler_ID_));
+
+ // Pack the source handler ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &source_handler_ID_, sizeof(source_handler_ID_));
+ block_->wr_ptr(sizeof(source_handler_ID_));
+
+ // Pack the destination site ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &destination_site_ID_, sizeof(destination_site_ID_));
+ block_->wr_ptr(sizeof(destination_site_ID_));
+
+ // Pack the source site ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &source_site_ID_, sizeof(source_site_ID_));
+ block_->wr_ptr(sizeof(source_site_ID_));
+
+ // Reset the buffer pointers to where they were so that the message length remains
+ // accurate.
+ block_->rd_ptr(read_ptr);
+ block_->wr_ptr(write_ptr);
+}
+
+void ACE_PIP_Data_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ block_->reset();
+
+ // reply_expected_
+ ACE_OS::memcpy(&reply_expected_, block_->rd_ptr(), sizeof(reply_expected_));
+ block_->rd_ptr(sizeof(reply_expected_));
+
+ // message priority
+ ACE_OS::memcpy(&message_priority_, block_->rd_ptr(), sizeof(message_priority_));
+ block_->rd_ptr(sizeof(message_priority_));
+
+ // destination handler ID
+ ACE_OS::memcpy(&destination_handler_ID_, block_->rd_ptr(), sizeof(destination_handler_ID_));
+ block_->rd_ptr(sizeof(destination_handler_ID_));
+
+ // source handler ID
+ ACE_OS::memcpy(&source_handler_ID_, block_->rd_ptr(), sizeof(source_handler_ID_));
+ block_->rd_ptr(sizeof(source_handler_ID_));
+
+ // destination site ID
+ ACE_OS::memcpy(&destination_site_ID_, block_->rd_ptr(), sizeof(destination_site_ID_));
+ block_->rd_ptr(sizeof(destination_site_ID_));
+
+ // source site ID
+ ACE_OS::memcpy(&source_site_ID_, block_->rd_ptr(), sizeof(source_site_ID_));
+ block_->rd_ptr(sizeof(source_site_ID_));
+
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+ }
+
+ dirty_ = false;
+}
+
+void ACE_PIP_Data_Message::print() const
+{
+ std::cout << "Priority: " << message_priority_ << std::endl
+ << "Reply?: " << reply_expected_ << std::endl
+ << "Dest_Handler_ID: " << destination_handler_ID_ << std::endl
+ << "Source_Handler_ID: " << source_handler_ID_ << std::endl
+ << "Dest_Site_ID: " << destination_site_ID_ << std::endl
+ << "Source_Site_ID: " << source_site_ID_ << std::endl
+ << "Payload: " << block_->next()->base() << std::endl;
+}
+
+ACE_PIP_Accel_Message::ACE_PIP_Accel_Message()
+ : ACCEL_HEADER_LENGTH_(2*sizeof(ACE_UINT32))
+ , new_priority_(0)
+ , old_priority_(0)
+{
+}
+
+int ACE_PIP_Accel_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ pack();
+
+ int bytes_sent = stream.send_n(block_->rd_ptr(), block_->length());
+ if (bytes_sent <= 0)
+ {
+ std::cout << "Accel:serial: didn't send any bytes" << std::endl;
+ }
+
+ return bytes_sent;
+}
+
+void ACE_PIP_Accel_Message::pack()
+{
+ if (!block_)
+ {
+ block_ = new ACE_Message_Block(ACCEL_HEADER_LENGTH_);
+ dirty_ = true;
+ }
+
+ if (dirty_)
+ {
+
+ // Set the buffer pointers to the start of the buffer to
+ // ensure we're writing to the correct location
+ block_->reset();
+
+ // Pack the contents of the struct into the message block
+ ACE_OS::memcpy(block_->wr_ptr(), &old_priority_, sizeof(old_priority_));
+ block_->wr_ptr(sizeof(old_priority_));
+
+ ACE_OS::memcpy(block_->wr_ptr(), &new_priority_, sizeof(new_priority_));
+ block_->wr_ptr(sizeof(new_priority_));
+
+ dirty_ = false;
+ }
+}
+
+void ACE_PIP_Accel_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ block_->reset();
+
+ old_priority_ = (*block_->rd_ptr());
+ block_->rd_ptr(sizeof(old_priority_));
+
+ new_priority_ = (*block_->rd_ptr());
+ block_->rd_ptr(sizeof (new_priority_));
+
+ // Reset the read and write pointers to their original location
+ // in the block.
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+ }
+
+ dirty_ = false;
+}
+
+ACE_PIP_Accel_Message* ACE_PIP_Accel_Message::copy()
+{
+ ACE_PIP_Accel_Message* copy = new ACE_PIP_Accel_Message;
+
+ copy->new_priority_ = new_priority_;
+ copy->old_priority_ = old_priority_;
+ copy->pack();
+
+ return copy;
+}
+
+void ACE_PIP_Accel_Message::print() const
+{
+ std::cout << "DestAddr: " << destination_address_ << std::endl
+ << "OldPriority: " << old_priority_ << std::endl
+ << "NewPriority: " << new_priority_ << std::endl;
+
+}
+
+ACE_PIP_Protocol_Message::ACE_PIP_Protocol_Message()
+ : message_type_(NONE)
+ , num_payload_blocks_(0)
+ , message_id_(0)
+ , FIXED_HEADER_LENGTH_(sizeof(Message_Type) +
+ sizeof(message_id_) +
+ sizeof(num_payload_blocks_))
+{
+}
+
+int ACE_PIP_Protocol_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_sent(0);
+
+ pack();
+
+ ACE_Message_Block* curr_block = block_;
+ int bytes_sent(0);
+
+ // Write each of the message blocks associated with this
+ // header into the stream
+ while(curr_block)
+ {
+ bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length());
+ if (bytes_sent > 0)
+ {
+ total_bytes_sent += bytes_sent;
+ curr_block = curr_block->next();
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ break;
+ }
+ }
+ if ((total_bytes_sent > 0) && next_)
+ {
+ int next_sent = next_->serialize(stream);
+ if (next_sent > 0)
+ {
+ total_bytes_sent += next_sent;
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ }
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ }
+
+ return total_bytes_sent;
+}
+
+int ACE_PIP_Protocol_Message::deserialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_received(-1);
+
+ ACE_Message_Block* header_block = new ACE_Message_Block(FIXED_HEADER_LENGTH_);
+ ACE_Message_Block* lengths_block(0);
+ ACE_Message_Block* curr_payload_block(0);
+ ACE_Message_Block* payload_blocks(0);
+
+ // Read the fixed-length portion of the protocol header.
+ int bytes_received = stream.recv_n(header_block->wr_ptr(), FIXED_HEADER_LENGTH_);
+ if (bytes_received == FIXED_HEADER_LENGTH_)
+ {
+ total_bytes_received = bytes_received;
+
+ // Determine number of data message blocks in the payload.
+ header_block->rd_ptr(FIXED_HEADER_LENGTH_ - sizeof(num_payload_blocks_));
+ ACE_OS::memcpy(&num_payload_blocks_, header_block->rd_ptr(),
+ sizeof(num_payload_blocks_));
+
+ header_block->reset();
+ header_block->wr_ptr(bytes_received);
+
+ // Extract the length of each payload block.
+ if (num_payload_blocks_ > 0)
+ {
+ // Read the lengths of each block.
+ int bytes_to_read = num_payload_blocks_ * sizeof(ACE_UINT32);
+ lengths_block = new ACE_Message_Block(bytes_to_read);
+ bytes_received = stream.recv_n(lengths_block->wr_ptr(), bytes_to_read);
+
+ if (bytes_received == bytes_to_read)
+ {
+ total_bytes_received += bytes_received;
+ lengths_block->wr_ptr(bytes_received);
+
+ // The lengths of each block have been successfully written, so
+ // unpack them.
+ header_block->next(lengths_block);
+ set_block(header_block);
+
+ curr_payload_block = new ACE_Message_Block(payload_block_lengths_[0]);
+ payload_blocks = curr_payload_block;
+ unsigned int i = 0;
+ for (; i < num_payload_blocks_ && bytes_received != -1; ++i)
+ {
+ // Read the block.
+ bytes_received = stream.recv_n(curr_payload_block->wr_ptr(),
+ payload_block_lengths_[i]);
+ if (bytes_received > 0)
+ {
+ total_bytes_received += bytes_received;
+ curr_payload_block->wr_ptr(bytes_received);
+ if (i < (num_payload_blocks_ - 1))
+ {
+ curr_payload_block->next(
+ new ACE_Message_Block(payload_block_lengths_[i + 1]));
+
+ curr_payload_block = curr_payload_block->next();
+ }
+ else
+ {
+ curr_payload_block->next(0);
+ }
+
+ }
+ else
+ {
+ total_bytes_received = -1;
+ std::cout << "deserialize: didn't read enough bytes" << std::endl;
+ break;
+ }
+ }
+ }
+ else
+ {
+ total_bytes_received = -1;
+
+ std::cout << "Deserialize: didnt read enought bytes" << std::endl;
+ }
+ }
+ }
+ else
+ {
+ total_bytes_received = -1;
+ std::cout << "Deserialize:didn't receive enought bytes: got " << bytes_received << std::endl;
+ }
+
+ if (total_bytes_received > 0)
+ {
+ if (message_type_ == ACCEL)
+ {
+ next_ = new ACE_PIP_Accel_Message;
+ }
+ else
+ {
+ next_ = new ACE_PIP_Data_Message;
+ }
+
+ // Pass the payload blocks to the next message struct
+ // so it can unpack it.
+ next_->set_block(payload_blocks);
+ }
+ else if (block_)
+ {
+ // Something failed during reading, so cleanup any allocated memory.
+ block_->release();
+ }
+
+ return total_bytes_received;
+}
+
+void ACE_PIP_Protocol_Message::set_next(ACE_PIP_Message* next)
+{
+ // Determine the number and length of payload blocks.
+ payload_block_lengths_.clear();
+ num_payload_blocks_ = 0;
+ next->pack();
+ ACE_Message_Block* curr_block = next->get_block();
+ while (curr_block)
+ {
+ ++num_payload_blocks_;
+ payload_block_lengths_.push_back(curr_block->length());
+ curr_block = curr_block->next();
+ }
+
+ next_ = next;
+ dirty_ = true;
+}
+
+void ACE_PIP_Protocol_Message::process_message_payload(ACE_Message_Block* payload)
+{
+ payload_block_lengths_.clear();
+ num_payload_blocks_ = 0;
+
+ // Determine the length and number of payload blocks.
+ ACE_Message_Block* curr_block = payload;
+ while (curr_block)
+ {
+ ++num_payload_blocks_;
+ payload_block_lengths_.push_back(curr_block->length());
+ curr_block = curr_block->next();
+ }
+
+ if (!next_)
+ {
+ if (message_type_ == ACCEL)
+ {
+ next_ = new ACE_PIP_Accel_Message;
+ }
+ else
+ {
+ next_ = new ACE_PIP_Data_Message;
+ }
+ }
+
+ next_->set_block(payload);
+ dirty_ = true;
+}
+
+void ACE_PIP_Protocol_Message::pack()
+{
+ int total_bytes_sent(0);
+ if (!block_)
+ {
+ // Create the message buffer for the protocol header.
+ block_ = new ACE_Message_Block(FIXED_HEADER_LENGTH_);
+
+ // Create the message buffer for the list of payload block lengths.
+ block_->next(new ACE_Message_Block(num_payload_blocks_ * sizeof(ACE_UINT32)));
+ block_->next()->next(0);
+ dirty_ = true;
+ }
+ if (dirty_)
+ {
+ // Set the buffer pointers to the start of the buffer
+ // so that we write to the appropriate location.
+ block_->reset();
+
+ // pack the process Id.
+ ACE_OS::memcpy(block_->wr_ptr(), &message_id_, sizeof(message_id_));
+ block_->wr_ptr(sizeof (message_id_));
+
+ // Pack the message type.
+ ACE_OS::memcpy(block_->wr_ptr(), &message_type_, sizeof(message_type_));
+ block_->wr_ptr(sizeof(message_type_));
+
+ // Number of blocks in payload.
+ ACE_OS::memcpy(block_->wr_ptr(), &num_payload_blocks_, sizeof(num_payload_blocks_));
+ block_->wr_ptr(sizeof(num_payload_blocks_));
+
+ ACE_Message_Block* next_block = block_->next();
+ if (next_block)
+ {
+ next_block->reset();
+
+ // Write the block lengths into the message block.
+ for (unsigned int i = 0; i < num_payload_blocks_; ++i)
+ {
+ ACE_OS::memcpy(next_block->wr_ptr(),
+ &payload_block_lengths_[i],
+ sizeof(ACE_UINT32));
+
+ next_block->wr_ptr(sizeof(ACE_UINT32));
+ }
+ }
+
+ dirty_ = false;
+ }
+}
+
+void ACE_PIP_Protocol_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ // char* read_ptr = block_->rd_ptr();
+ block_->reset();
+
+ // Extract the process ID.
+ ACE_OS::memcpy(&message_id_, block_->rd_ptr(), sizeof(message_id_));
+ block_->rd_ptr(sizeof (message_id_));
+
+ // Extract the message type.
+ ACE_OS::memcpy(&message_type_, block_->rd_ptr(), sizeof(message_type_));
+ block_->rd_ptr(sizeof(message_type_));
+
+ // Number of blocks in payload.
+ ACE_OS::memcpy(&num_payload_blocks_, block_->rd_ptr(),
+ sizeof(num_payload_blocks_));
+
+ block_->rd_ptr(sizeof(num_payload_blocks_));
+
+ // Reset buffer pointers to be where they were prior to unpacking.
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+
+ // The next block holds the lengths of each payload block.
+ ACE_Message_Block* next_block = block_->next();
+ if (next_block)
+ {
+ write_ptr = next_block->wr_ptr();
+ next_block->reset();
+ payload_block_lengths_.resize(num_payload_blocks_, 0);
+ ACE_UINT32 block_length(0);
+
+ // Extract the lengths of each payload block, which will
+ // be used to recreate the structure of the original payload.
+ for (ACE_UINT32 i = 0; i < num_payload_blocks_; ++i)
+ {
+ ACE_OS::memcpy(&block_length, next_block->rd_ptr(), sizeof(block_length));
+ next_block->rd_ptr(sizeof(block_length));
+ payload_block_lengths_[i] = block_length;
+ }
+
+ // Reset the buffer pointers to where they were prior to unpacking.
+ next_block->reset();
+ next_block->wr_ptr(write_ptr);
+ }
+ }
+
+ dirty_ = false;
+}
+
+ACE_PIP_Protocol_Message* ACE_PIP_Protocol_Message::copy()
+{
+ ACE_PIP_Protocol_Message* message_copy = new ACE_PIP_Protocol_Message;
+ message_copy->message_type_ = message_type_;
+ message_copy->num_payload_blocks_ = num_payload_blocks_;
+ for (ACE_UINT32 block_index = 0; block_index < num_payload_blocks_; ++block_index)
+ {
+ message_copy->payload_block_lengths_[block_index] = payload_block_lengths_[block_index];
+ }
+}
+
+void ACE_PIP_Protocol_Message::print() const
+{
+ std::cout << "Type: " << message_type_ << std::endl
+ << "MessageID: " << message_id_ << std::endl
+ << "NumPayload: " << num_payload_blocks_ << std::endl;
+
+ for (unsigned int i = 0; i < num_payload_blocks_; ++i)
+ {
+ std::cout << "BlockLength[" << i << "] = " << payload_block_lengths_[i] << std::endl;
+ }
+}
+
diff --git a/ACE/ace/PIP_Messages.h b/ACE/ace/PIP_Messages.h
new file mode 100644
index 00000000000..683bedb1ff9
--- /dev/null
+++ b/ACE/ace/PIP_Messages.h
@@ -0,0 +1,446 @@
+ /**
+ * @file PIP_Messages
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a heirarchy of
+ * classes that represent the various messages used in the
+ * priority inheritance protocol
+*/
+
+#ifndef _PIP_MESSAGES_H_
+#define _PIP_MESSAGES_H_
+
+#include "ace/Message_Block.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Vector_T.h"
+
+#include <iostream>
+
+/**
+ * @class ACE_PIP_Message
+ * @brief Base class for all messages used in
+ * the implementation of a distributed priority inheritance
+ * protocol.
+ *
+ * Base class for all messages used in the implementation of a distributed
+ * priority inheritance protocol. Provides an interface for message (de)serialization,
+ * message chaining, packing, unpacking, and payload ownership transfer
+ */
+class ACE_Export ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Message();
+ virtual ~ACE_PIP_Message();
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream) = 0;
+
+ /// Get the next message struct.
+ virtual ACE_PIP_Message* get_next();
+
+ /// Set the next message struct.
+ virtual void set_next(ACE_PIP_Message* next);
+
+ /// Returns the next message, making the caller
+ /// the new owner.
+ virtual ACE_PIP_Message* release_next();
+
+ /// Get the message block.
+ virtual ACE_Message_Block* get_block();
+
+ /// Set the message block and populate the message struct
+ /// with message contents.
+ virtual void set_block(ACE_Message_Block* block);
+
+ /// Get the message block, making the caller the new owner.
+ virtual ACE_Message_Block* release_block();
+
+ /// Place the values in the message struct into the message block.
+ virtual void pack() = 0;
+
+ /// Populate the message struct using values from the message block.
+ virtual void unpack() = 0;
+
+ /// This is temporarily public to facilitate testing.
+ /// It should eventually be made private.
+ ACE_Message_Block* block_;
+
+ /// Print the contents of this struct to stdout.
+ virtual void print() const = 0;
+
+
+ protected:
+
+ // Indicates values in structure are newer than values in the
+ // message block.
+ bool dirty_;
+
+ ACE_PIP_Message* next_;
+};
+
+/**
+ * @class ACE_PIP_Data_Message
+ * @brief Structure representing the fields of an application-
+ * level protocol message and associated header values
+ *
+ * Structure representing the fields of an appliation level
+ * protocol message and associated header values. Structure is that
+ * of several contiguous ACE_Message_Block's. The message is configurable
+ * to support any application-level protocol that contains at least the following
+ * data: source address, destination address, reply expectation, and priority
+ *
+*/
+class ACE_Export ACE_PIP_Data_Message : public ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Data_Message();
+ virtual ~ACE_PIP_Data_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ /// Determine if a reply message is expected
+ bool get_reply_expected() const;
+ void set_reply_expected(bool expected);
+
+ /// Determine the priority at which this message should be handled
+ ACE_UINT32 get_message_priority() const;
+ void set_message_priority(ACE_UINT32 priority);
+
+ /// Determine the ID of the destination handler
+ ACE_UINT32 get_destination_handler_ID() const;
+ void set_destination_handler_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the sending handler
+ ACE_UINT32 get_source_handler_ID() const;
+ void set_source_handler_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the destination site
+ ACE_UINT32 get_destination_site_ID() const;
+ void set_destination_site_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the sending site
+ ACE_UINT32 get_source_site_ID() const;
+ void set_source_site_ID(ACE_UINT32 ID);
+
+ // Place the values from the struct into the message blocks.
+ virtual void pack();
+
+ // Extract the values from the message blocks into the structs.
+ virtual void unpack();
+
+ /// Print the contents of this struct to stdout.
+ virtual void print() const;
+
+ private:
+
+ ACE_UINT32 message_priority_;
+ bool reply_expected_;
+ ACE_UINT32 source_handler_ID_;
+ ACE_UINT32 destination_handler_ID_;
+ ACE_UINT32 source_site_ID_;
+ ACE_UINT32 destination_site_ID_;
+};
+
+/**
+ * @class ACE_PIP_Protocol_Message
+ * @brief Structure representing a message supported by the priority
+ * inheritance protocol
+ *
+*/
+
+class ACE_Export ACE_PIP_Accel_Message : public ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Accel_Message();
+ virtual ~ACE_PIP_Accel_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ ACE_UINT32 get_old_priority() const;
+ void set_old_priority(ACE_UINT32 priority);
+
+ ACE_UINT32 get_new_priority() const;
+ void set_new_priority(ACE_UINT32 priority);
+
+ /// Get the address of the application receiving the message.
+ ACE_UINT32 get_destination_address() const;
+ void set_destination_address(const ACE_UINT32& address);
+
+ u_short get_destination_port() const;
+ void set_destination_port(u_short port);
+
+ /// Place the values in the message struct into the message block.
+ virtual void pack();
+
+ /// Extract the values from the message block and store them in the struct.
+ virtual void unpack();
+
+ /// Print the contents of this struct to stdout.
+ virtual void print() const;
+
+ /// Return a copy of the this message
+ ACE_PIP_Accel_Message* copy();
+
+ private:
+
+ const ACE_UINT32 ACCEL_HEADER_LENGTH_;
+ ACE_UINT32 destination_address_;
+ u_short destination_port_;
+ ACE_UINT32 new_priority_;
+ ACE_UINT32 old_priority_;
+};
+
+/**
+ * @class ACE_PIP_Accel_Message
+ * @brief Structure representing an acceleration message
+ * used in the implementation of a priority inheritance protocol
+ *
+ * Structure representing an acceleration message used in the
+ * implementation of a priority inheritance protocol. Indicates the
+ * old and new priority of the targeted process, as well as the address
+ * of handler to which the associated message was sent.
+*/
+class ACE_Export ACE_PIP_Protocol_Message : public ACE_PIP_Message
+{
+ public:
+
+ enum Message_Type { NONE, ACCEL, DATA, REQUEST, RESPONSE };
+
+ ACE_PIP_Protocol_Message();
+ virtual ~ACE_PIP_Protocol_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ /// Receive the contents of this message from the stream.
+ virtual int deserialize(ACE_SOCK_Stream& stream);
+
+ /// Set the next message in the chain.
+ virtual void set_next(ACE_PIP_Message* next);
+
+ /// Determine the type of message this header has been tacked onto.
+ Message_Type get_message_type() const;
+ void set_message_type(Message_Type type);
+
+ /// Determine which call chain this message is associated with.
+ ACE_UINT64 get_message_id() const;
+ void set_message_id(ACE_UINT64 id);
+
+ /// Attach message block as payload of priority inheritance
+ /// protocol message.
+ void process_message_payload(ACE_Message_Block* payload);
+
+ virtual void pack();
+ virtual void unpack();
+
+ /// Print the contents of this struct to stdout.
+ virtual void print() const;
+
+ /// Make a copy of the header of this message, i.e. without
+ /// data or accel payload
+ ACE_PIP_Protocol_Message* copy();
+
+ const int FIXED_HEADER_LENGTH_;
+
+ private:
+
+ Message_Type message_type_;
+ ACE_UINT32 num_payload_blocks_;
+ ACE_Vector<ACE_UINT32> payload_block_lengths_;
+ ACE_UINT64 message_id_;
+};
+
+
+/**************************************************
+ *
+ * ACE_PIP_Message - Inline Methods
+ *
+ **************************************************/
+inline ACE_PIP_Message* ACE_PIP_Message::get_next()
+{
+ return next_;
+}
+
+inline void ACE_PIP_Message::set_next(ACE_PIP_Message* message)
+{
+ next_ = message;
+}
+
+inline ACE_PIP_Message* ACE_PIP_Message::release_next()
+{
+ ACE_PIP_Message* temp = next_;
+ next_ = 0;
+ return temp;
+}
+
+inline ACE_Message_Block* ACE_PIP_Message::get_block()
+{
+ return block_;
+}
+
+inline ACE_Message_Block* ACE_PIP_Message::release_block()
+{
+ ACE_Message_Block* temp_block = block_;
+ block_ = 0;
+ dirty_ = true;
+ return temp_block;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Data_Message - Inline Methods
+ *
+ **************************************************/
+
+inline bool ACE_PIP_Data_Message::get_reply_expected() const
+{
+ return reply_expected_;
+}
+
+inline void ACE_PIP_Data_Message::set_reply_expected(bool expected)
+{
+ dirty_ = true;
+ reply_expected_ = expected;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::get_message_priority() const
+{
+ return message_priority_;
+}
+
+inline void ACE_PIP_Data_Message::set_message_priority(ACE_UINT32 priority)
+{
+ dirty_ = true;
+ message_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_handler_ID() const
+{
+ return destination_handler_ID_;
+}
+
+inline void ACE_PIP_Data_Message::set_destination_handler_ID(ACE_UINT32 ID)
+{
+ destination_handler_ID_ = ID;
+ dirty_ = true;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::get_source_handler_ID() const
+{
+ return source_handler_ID_;
+}
+
+inline void ACE_PIP_Data_Message::set_source_handler_ID(ACE_UINT32 ID)
+{
+ source_handler_ID_ = ID;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::get_source_site_ID() const
+{
+ return source_site_ID_;
+}
+
+inline void ACE_PIP_Data_Message::set_source_site_ID(ACE_UINT32 ID)
+{
+ source_site_ID_ = ID;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_site_ID() const
+{
+ return destination_site_ID_;
+}
+
+inline void ACE_PIP_Data_Message::set_destination_site_ID(ACE_UINT32 ID)
+{
+ destination_site_ID_ = ID;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Accel_Message - Inline Methods
+ *
+ **************************************************/
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::get_old_priority() const
+{
+ return old_priority_;
+}
+
+inline void ACE_PIP_Accel_Message::set_old_priority(ACE_UINT32 priority)
+{
+ dirty_ = true;
+ old_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::get_new_priority() const
+{
+ return new_priority_;
+}
+
+inline void ACE_PIP_Accel_Message::set_new_priority(ACE_UINT32 priority)
+{
+ dirty_ = true;
+ new_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::get_destination_address() const
+{
+ return destination_address_;
+}
+
+inline void ACE_PIP_Accel_Message::set_destination_address(const ACE_UINT32& address)
+{
+ dirty_ = true;
+ destination_address_ = address;
+}
+
+inline u_short ACE_PIP_Accel_Message::get_destination_port() const
+{
+ return destination_port_;
+}
+
+inline void ACE_PIP_Accel_Message::set_destination_port(u_short port)
+{
+ destination_port_ = port;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Protocol_Message - Inline Methods
+ *
+ **************************************************/
+
+inline void ACE_PIP_Protocol_Message::
+ set_message_type(ACE_PIP_Protocol_Message::Message_Type type)
+{
+ message_type_ = type;
+ dirty_ = true;
+}
+
+inline ACE_PIP_Protocol_Message::Message_Type ACE_PIP_Protocol_Message::
+ get_message_type() const
+{
+ return message_type_;
+}
+
+inline ACE_UINT64 ACE_PIP_Protocol_Message::get_message_id() const
+{
+ return message_id_;
+}
+
+inline void ACE_PIP_Protocol_Message::set_message_id(ACE_UINT64 id)
+{
+ dirty_ = true;
+ message_id_ = id;
+}
+
+#endif
+
diff --git a/ACE/ace/PIP_Reactive_IO_Handler.cpp b/ACE/ace/PIP_Reactive_IO_Handler.cpp
new file mode 100644
index 00000000000..925be857608
--- /dev/null
+++ b/ACE/ace/PIP_Reactive_IO_Handler.cpp
@@ -0,0 +1,64 @@
+// $Id$
+
+#include "ace/OS_NS_sys_time.h"
+#include "ace/PIP_Reactive_IO_Handler.h"
+#include "ace/PIP_Invocation_Manager.h"
+
+/// Constructor
+ACE_PIP_Reactive_IO_Handler::ACE_PIP_Reactive_IO_Handler()
+{
+}
+
+ACE_PIP_Reactive_IO_Handler::~ACE_PIP_Reactive_IO_Handler()
+{
+}
+
+/// Closes all remote connections.
+int ACE_PIP_Reactive_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
+{
+ int result(0);
+ switch(close_mask)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ read_closed_ = true;
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ write_closed_ = true;
+ break;
+ };
+
+ if (read_closed_ && write_closed_)
+ {
+ // Close our end of the connection
+ peer_.close_reader();
+ peer_.close_writer();
+
+ // un-register with invocation manager so it doesn't
+ // try to use the handler for IO
+ ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this);
+
+ delete this;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/// Enqueue a message to be sent
+int ACE_PIP_Reactive_IO_Handler::put_message (ACE_PIP_Protocol_Message* message)
+{
+ big_lock_.acquire();
+ outgoing_message_queue_.enqueue_head(message);
+ big_lock_.release();
+
+ // Register so Reactor tells us to send the message
+ ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::WRITE_MASK);
+ ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
+
+ return 0;
+}
+
+
+
+
diff --git a/ACE/ace/PIP_Reactive_IO_Handler.h b/ACE/ace/PIP_Reactive_IO_Handler.h
new file mode 100644
index 00000000000..ae50ebf9b27
--- /dev/null
+++ b/ACE/ace/PIP_Reactive_IO_Handler.h
@@ -0,0 +1,54 @@
+ /**
+ * @file PIP_IO_Handler.cpp
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that manages network I/O
+*/
+
+
+#ifndef _PIP_REACTIVE_IO_HANDLER_H_
+#define _PIP_REACTIVE_IO_HANDLER_H_
+
+
+#include "ace/Message_Queue.h"
+#include "ace/PIP_IO_Handler.h"
+#include "ace/PIP_Messages.h"
+
+/**
+ * @class ACE_PIP_Reactive_IO_Handler
+ *
+ * @brief Performs reactive network I/O in
+ * the context of a distributed system
+ * employing the the priority inheritance
+ * protocol
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+class ACE_Export ACE_PIP_Reactive_IO_Handler :
+ public ACE_PIP_IO_Handler
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Reactive_IO_Handler ();
+ ~ACE_PIP_Reactive_IO_Handler();
+
+ /// Closes all remote connections.
+ virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
+
+ /// Enqueue a message to be sent
+ virtual int put_message (ACE_PIP_Protocol_Message* message);
+
+ private:
+
+};
+
+
+
+#endif /* _PIP_Reactive_IO_Handler_H_ */
+
+