diff options
author | jmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-07-17 03:38:43 +0000 |
---|---|---|
committer | jmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2008-07-17 03:38:43 +0000 |
commit | 764103e1aaa91fc7cede07beaa312730e170f502 (patch) | |
tree | c575da2bbb55d1d02269d9fe75f608aa2214252e | |
parent | 1d845c80795563c69fe52c086ca9ea526e5f6891 (diff) | |
download | ATCD-764103e1aaa91fc7cede07beaa312730e170f502.tar.gz |
Initial commit of rough priority inheritance protocol implementation. Files still need to be made consistent with ACE coding standards and stl references need to be removed
-rw-r--r-- | ACE/ace/PIP_Active_IO_Handler.cpp | 99 | ||||
-rw-r--r-- | ACE/ace/PIP_Connection_Manager.cpp | 242 | ||||
-rw-r--r-- | ACE/ace/PIP_Connection_Manager.h | 79 | ||||
-rw-r--r-- | ACE/ace/PIP_DA_Strategy_Adapter.cpp | 4 | ||||
-rw-r--r-- | ACE/ace/PIP_DA_Strategy_Adapter.h | 261 | ||||
-rw-r--r-- | ACE/ace/PIP_Dispatcher.cpp | 505 | ||||
-rw-r--r-- | ACE/ace/PIP_Dispatcher.h | 188 | ||||
-rw-r--r-- | ACE/ace/PIP_IO_Handler.cpp | 185 | ||||
-rw-r--r-- | ACE/ace/PIP_IO_Handler.h | 94 | ||||
-rw-r--r-- | ACE/ace/PIP_Message_Handler.cpp | 105 | ||||
-rw-r--r-- | ACE/ace/PIP_Message_Handler.h | 67 | ||||
-rw-r--r-- | ACE/ace/PIP_Messages.cpp | 607 | ||||
-rw-r--r-- | ACE/ace/PIP_Messages.h | 446 | ||||
-rw-r--r-- | ACE/ace/PIP_Reactive_IO_Handler.cpp | 64 | ||||
-rw-r--r-- | ACE/ace/PIP_Reactive_IO_Handler.h | 54 |
15 files changed, 3000 insertions, 0 deletions
diff --git a/ACE/ace/PIP_Active_IO_Handler.cpp b/ACE/ace/PIP_Active_IO_Handler.cpp new file mode 100644 index 00000000000..46bfc8fcee2 --- /dev/null +++ b/ACE/ace/PIP_Active_IO_Handler.cpp @@ -0,0 +1,99 @@ +// $Id$ + +#include "ace/PIP_Active_IO_Handler.h" + + +#include <iostream> +/// Constructor +ACE_PIP_Active_IO_Handler::ACE_PIP_Active_IO_Handler() + : shutdown_(false) +{ + // acquire the shutdown lock so that when shutdown_svc is called, + // the caller cannot return until shutdown has been completed and + // lock relinquished + shutdown_lock_.acquire(); +} + +/// Closes all remote connections. +int ACE_PIP_Active_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) +{ + int result(0); + switch(close_mask) + { + case ACE_Event_Handler::READ_MASK: + read_closed_ = true; + break; + case ACE_Event_Handler::WRITE_MASK: + write_closed_ = true; + break; + }; + + if (read_closed_ && write_closed_) + { + // Close our end of the connection + peer_.close_reader(); + peer_.close_writer(); + delete this; + return -1; + } + + return 0; +} + + +/// Enqueue a message to be sent +int ACE_PIP_Active_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) +{ + outgoing_message_queue_.enqueue(message); +} + +int ACE_PIP_Active_IO_Handler::svc() +{ + int result(0); + ssize_t bytes_available(0); + char byte; + + // run until we're told to quit + while (!shutdown_) + { + // peek to see if incoming message available + bytes_available = peer_.recv(&byte, 1, MSG_PEEK); + if (bytes_available > 0) + { + handle_input(); + } + + // handle outgoing message + result = handle_output(); + if (result == -2) + { + // indicate to caller that the + // handler is no longer active + return -1; + } + + bytes_available = 0; + } + + + return 0; +} + +void ACE_PIP_Active_IO_Handler::shutdown_svc() +{ + shutdown_ = true; + shutdown_lock_.acquire(); + + handle_close(0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK); + +} + +int ACE_PIP_Active_IO_Handler::open(void*) +{ + std::cout << "activate" << std::endl; + this->activate(); +} + + + + diff --git a/ACE/ace/PIP_Connection_Manager.cpp b/ACE/ace/PIP_Connection_Manager.cpp new file mode 100644 index 00000000000..d92d2e27895 --- /dev/null +++ b/ACE/ace/PIP_Connection_Manager.cpp @@ -0,0 +1,242 @@ + /** + * @file PIP_Connection_Manager.cpp + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + */ + + + + +#include <ace/INET_Addr.h> +#include <ace/PIP_Connection_Manager.h> + + +ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::connection_manager_ = 0; +ACE_Mutex ACE_PIP_Connection_Manager::instance_lock_; +bool ACE_PIP_Connection_Manager::delete_manager_ = false; + +/// Default Constructor +ACE_PIP_Connection_Manager::ACE_PIP_Connection_Manager() +{ + +} + +/// Destructor +ACE_PIP_Connection_Manager::~ACE_PIP_Connection_Manager() +{ + +} + +ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::instance() +{ + if (connection_manager_ == 0) + { + instance_lock_.acquire(); + + if (ACE_PIP_Connection_Manager::connection_manager_ == 0) + { + ACE_NEW_RETURN (ACE_PIP_Connection_Manager::connection_manager_, + ACE_PIP_Connection_Manager, + 0); + + delete_manager_ = true; + } + + instance_lock_.release(); + } + + return connection_manager_; +} + +int ACE_PIP_Connection_Manager::establish_connections(ACE_UINT32 source_site_id) +{ + int result(0); + + //establish connections + for (int i = 0; i < connection_definitions_->size(); ++i) + { + if ((*connection_definitions_)[i]->source_site_id == + source_site_id) + { + ACE_INET_Addr address; + address.set((*connection_definitions_)[i]->port, + (*connection_definitions_)[i]->address.c_str()); + + if ((*connection_definitions_)[i]->type == + Connection_Definition::ACTIVE) + { + ACE_PIP_Active_IO_Handler* handler = new ACE_PIP_Active_IO_Handler; + result = active_connector_.connect(handler, address); + if (result == -1) + { + return -1; + } + else + { + handler->init( + (*connection_definitions_)[i]->source_site_id, + (*connection_definitions_)[i]->destination_site_id, + (*connection_definitions_)[i]->priority); + + handlers_.push_back(handler); + } + } + else + { + ACE_PIP_Reactive_IO_Handler* handler = new ACE_PIP_Reactive_IO_Handler; + result = reactive_connector_.connect(handler, address); + if (result == -1) + { + std::cerr << "Unable to connect to " + << (*connection_definitions_)[i]->address << " " + << (*connection_definitions_)[i]->port + << std::endl; + + return -1; + } + else + { + handler->init( + (*connection_definitions_)[i]->source_site_id, + (*connection_definitions_)[i]->destination_site_id, + (*connection_definitions_)[i]->priority); + + handlers_.push_back(handler); + } + } + } + + } + + return result; +} + +int ACE_PIP_Connection_Manager::process_connection_file(char* file_name) +{ + // Expecting the file to contain one tuple per line + // where each is of form (source_id, dest_id, dest_address, dest_port, priority, type) + std::ifstream* my_stream = new std::ifstream; + + my_stream->open(file_name); + + if (my_stream->fail()) + { + std::cerr << "Failed to open connection file: " << file_name + << std::endl; + + return -1; + } + + std::string line; + std::string token; + int strlen; + int first_pos; + int second_pos; + Connection_Definition* current_definition(0); + + std::getline(*my_stream, line); + int num_entries = atoi(line.c_str()); + + connection_definitions_ = new ACE_Vector<Connection_Definition*>; + for (int i = 0; i < num_entries; ++i) + { + current_definition = new Connection_Definition; + std::getline(*my_stream, line); + strlen = line.length(); + first_pos = line.find("("); + if (first_pos > strlen) + { + delete current_definition; + return -1; + } + + second_pos = line.find(",", first_pos); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // source site ID + token.assign(line, first_pos + 1, second_pos - first_pos - 1); + current_definition->source_site_id = atoi(token.c_str()); + + first_pos = second_pos; + second_pos = line.find(",", first_pos + 1); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // destination site ID + token.assign(line, first_pos + 1, second_pos - first_pos - 1); + current_definition->destination_site_id = atoi(token.c_str()); + + first_pos = second_pos; + second_pos = line.find(",", first_pos + 1); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // IP address + current_definition->address.assign(line, first_pos + 1, second_pos - first_pos - 1); + + first_pos = second_pos; + second_pos = line.find(",", first_pos + 1); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // IP port + token.assign(line, first_pos + 1, second_pos - first_pos - 1); + current_definition->port = atoi(token.c_str()); + + first_pos = second_pos; + second_pos = line.find(",", first_pos + 1); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // Connection priority + token.assign(line, first_pos + 1, second_pos - first_pos - 1); + current_definition->priority = atoi(token.c_str()); + + first_pos = second_pos; + second_pos = line.find(")", first_pos + 1); + if (second_pos > strlen) + { + delete current_definition; + return -1; + } + + // Connection Type + token.assign(line, first_pos + 1, second_pos - first_pos - 1); + if (token == "ACTIVE") + { + current_definition->type = Connection_Definition::ACTIVE; + } + else + { + current_definition->type = Connection_Definition::REACTIVE; + } + + connection_definitions_->push_back(current_definition); + } + + return 0; +} + +const ACE_Vector<ACE_PIP_Connection_Manager::Connection_Definition*>* ACE_PIP_Connection_Manager::get_connections() const +{ + return connection_definitions_; +} diff --git a/ACE/ace/PIP_Connection_Manager.h b/ACE/ace/PIP_Connection_Manager.h new file mode 100644 index 00000000000..e4925ec4568 --- /dev/null +++ b/ACE/ace/PIP_Connection_Manager.h @@ -0,0 +1,79 @@ + /** + * @file PIP_Connection_Manager.h + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + */ + +#ifndef _PIP_CONNECTION_MANAGER_H_ +#define _PIP_CONNECTION_MANAGER_H_ + +#include <ace/Connector.h> +#include <ace/PIP_Active_IO_Handler.h> +#include <ace/PIP_Reactive_IO_Handler.h> +#include <ace/Reactor.h> +#include <ace/SOCK_Connector.h> +#include <ace/Vector_T.h> + +#include <fstream> +#include <iostream> +#include <string> +#include <vector> + +class ACE_Export ACE_PIP_Connection_Manager +{ + public: + + /// Informationa associated with a connection + struct Connection_Definition + { + enum Handler_Type {ACTIVE, REACTIVE}; + + ACE_UINT32 source_site_id; + ACE_UINT32 destination_site_id; + std::string address; + u_short port; + ACE_UINT32 priority; + Handler_Type type; + }; + + /// Default Constructor + ACE_PIP_Connection_Manager(); + + /// Destructor + virtual ~ACE_PIP_Connection_Manager(); + + /// obtain the single instance of the manager + static ACE_PIP_Connection_Manager* instance(); + + /// Extract all connection information from a file + virtual int process_connection_file(char* filename); + + /// Establish all connection for which source_site_id is the source + virtual int establish_connections(ACE_UINT32 source_site_id); + + const ACE_Vector<Connection_Definition*>* get_connections() const; + + private: + + ACE_Vector<Connection_Definition*>* connection_definitions_; + + // The connector used to actively connect to a remote site + ACE_Connector< + ACE_PIP_Active_IO_Handler, + ACE_SOCK_Connector> active_connector_; + + ACE_Connector< + ACE_PIP_Reactive_IO_Handler, + ACE_SOCK_Connector> reactive_connector_; + + static ACE_PIP_Connection_Manager* connection_manager_; + static ACE_Mutex instance_lock_; + static bool delete_manager_; + + std::vector<ACE_PIP_IO_Handler*> handlers_; +}; + +#endif diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.cpp b/ACE/ace/PIP_DA_Strategy_Adapter.cpp new file mode 100644 index 00000000000..ecfd42a4747 --- /dev/null +++ b/ACE/ace/PIP_DA_Strategy_Adapter.cpp @@ -0,0 +1,4 @@ +// $Id$ + +#include "PIP_DA_Strategy_Adapter.h" + diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.h b/ACE/ace/PIP_DA_Strategy_Adapter.h new file mode 100644 index 00000000000..a0899aedaa7 --- /dev/null +++ b/ACE/ace/PIP_DA_Strategy_Adapter.h @@ -0,0 +1,261 @@ + /** + * @file PIP_DA_Strategy_Adapter.h + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + * This file contains the specification for a class + * that adapts a deadlock avoidance strategy to additionally + * support priority inheritance protocol annotations +*/ + + +#ifndef _PIP_DA_STRATEGY_ADAPTER_ +#define _PIP_DA_STRATEGY_ADAPTER_ + +#include "ace/DA_Strategy_Base.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Unbounded_Set.h" +#include "ace/Mutex.h" +#include "ace/Null_Mutex.h" + + +#include <iostream> + +/** + * @class ACE_PIP_DA_Strategy_Adapter + * @brief Extends deadlock avoidance strategies + * to support priority inheritance annotations + * + * Deadlock avoidance strategies associate a resource cost annotation + * with each handle. This class extends the strategies to support + * the association of annotations with each priority at which the + * handle can be dispatched, i.e. the priority at which the corresponding + * thread resource can dispatch the handle +*/ +template <typename Handle_Id, typename Lock> +class ACE_PIP_DA_Strategy_Adapter +{ + public: + + /// Constructor that takes the deadlock avoidance strategy that + /// the Strategy Adapter adapts. + ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy); + ~ACE_PIP_DA_Strategy_Adapter(); + + /// Indicates whether allocating a thread to the handle + /// at the specified priority could potentially result in deadlock. + int is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority); + + /// Grant the handle a thread at the specified priority. + void grant(Handle_Id handle, ACE_UINT32 priority); + + /// Release the thread + void release(Handle_Id handle, ACE_UINT32 priority); + + /// Determine the number of threads being managed by + /// the DA_Strategy adapter. + int get_max_threads(); + + /// Add an annotation value for the handle / priority pair. + int add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation); + + /// Remove every annotation associated with this handle. + int remove_annotation (Handle_Id handle); + int remove_annotation (Handle_Id handle, ACE_UINT32 priority); + +private: + + /// Associates each message handler with an internally generated id + /// which can be used, along with a priority, to lookup an annotation. + typedef ACE_Hash_Map_Manager_Ex<Handle_Id, + ACE_UINT32, + ACE_Hash<Handle_Id>, + ACE_Equal_To<Handle_Id>, + ACE_Null_Mutex> HANDLE_ID_MAP; + + /// Associates each message handler with a set of potential priorities. + /// Message handler represented by internally generated id. + typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32, + ACE_Unbounded_Set<ACE_UINT32>*, + ACE_Hash<ACE_UINT32>, + ACE_Equal_To<ACE_UINT32>, + ACE_Null_Mutex> HANDLE_ID_PRIORITY_MAP; + + /// Determines an id that uniquely identifies a handler/priority pair. + ACE_UINT64 hash_handle_id_and_priority(ACE_UINT32 handle_id, + ACE_UINT32 priority) const; + + /// Generates an annotation ID given the actual handle and priority. + ACE_UINT64 get_annotation_id(Handle_Id handle, ACE_UINT32 priority); + + DA_Strategy_Base<ACE_UINT64>* DA_strategy_; + HANDLE_ID_MAP handle_ids_; + HANDLE_ID_PRIORITY_MAP id_to_priority_map_; + Lock lock_; + ACE_UINT32 next_id_; +}; + +template <typename Handle_Id, typename Lock> +ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy) +: DA_strategy_(DA_strategy) +, next_id_(0) +{ +} + +template <typename Handle_Id, typename Lock> +ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::~ACE_PIP_DA_Strategy_Adapter() +{ + HANDLE_ID_PRIORITY_MAP::iterator it = id_to_priority_map_.begin(); + for (; it != id_to_priority_map_.end(); ++it) + { + delete it->item(); + } +} + +template <typename Handle_Id, typename Lock> +ACE_INLINE int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::get_max_threads() +{ + return DA_strategy_->get_max_threads(); +} + +template <typename Handle_Id, typename Lock> +ACE_INLINE ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + hash_handle_id_and_priority(ACE_UINT32 handle_id, ACE_UINT32 priority) const +{ + ACE_UINT64 result = handle_id; + result = (result << 32) | priority; + return result; +} + +template <typename Handle_Id, typename Lock> +int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard<Lock> guard(lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + return DA_strategy_->is_deadlock_potential(annotation_id); +} + +template <typename Handle_Id, typename Lock> +void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + grant(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard<Lock> guard(lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + return DA_strategy_->grant(annotation_id); +} + +template <typename Handle_Id, typename Lock> +void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + release(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard<Lock> guard(lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + DA_strategy_->release(annotation_id); +} + +template <typename Handle_Id, typename Lock> +int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation) +{ + ACE_UINT32 internal_handle_id(0); + ACE_Unbounded_Set<ACE_UINT32>* priorities(0); + + ACE_Guard<Lock> guard(lock_); + if (handle_ids_.find(handle, internal_handle_id) == -1) + { + // This is the first time handle has been encountered, so generate an + // internal handle id. + internal_handle_id = next_id_++; + handle_ids_.bind(handle, internal_handle_id); + priorities = new ACE_Unbounded_Set<ACE_UINT32>; + id_to_priority_map_.bind(internal_handle_id, priorities); + } + else + { + id_to_priority_map_.find(internal_handle_id, priorities); + } + + priorities->insert(priority); + + return DA_strategy_->add_annotation( + hash_handle_id_and_priority(internal_handle_id, priority), annotation); +} + +template <typename Handle_Id, typename Lock> +int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + remove_annotation (Handle_Id handle) +{ + ACE_Guard<Lock> guard(lock_); + ACE_UINT32 internal_handle_id(0); + if (handle_ids_.unbind(handle, internal_handle_id) != -1) + { + ACE_Unbounded_Set<ACE_UINT32>* priorities(0); + if (id_to_priority_map_.unbind(internal_handle_id, priorities) != -1) + { + for (ACE_Unbounded_Set<ACE_UINT32>::ITERATOR it = priorities->begin(); + it != priorities->end(); + ++it) + { + DA_strategy_->remove_annotation( + get_annotation_id(internal_handle_id, *it)); + } + + delete priorities; + } + } + + return 0; +} + +template <typename Handle_Id, typename Lock> +int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + remove_annotation (Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard<Lock> guard(lock_); + ACE_UINT32 internal_handle_id(0); + int result(0); + if (handle_ids_.find(handle, internal_handle_id) != -1) + { + ACE_Unbounded_Set<ACE_UINT32>* priorities(0); + if (id_to_priority_map_.find(internal_handle_id, priorities) != -1) + { + if (priorities->remove(priority) != -1) + { + result = DA_strategy_->remove_annotation( + get_annotation_id(internal_handle_id, priority)); + } + if (priorities->is_empty()) + { + // This was the last annotation for this handle, + // so remove the handle information + id_to_priority_map_.unbind(internal_handle_id, priorities); + delete priorities; + handle_ids_.unbind(handle, internal_handle_id); + } + } + } + + return result; +} + +template <typename Handle_Id, typename Lock> +ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>:: + get_annotation_id(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_UINT64 annotation_id(0); + ACE_UINT32 handle_id(0); + + if (handle_ids_.find(handle, handle_id) != -1) + { + annotation_id = hash_handle_id_and_priority(handle_id, priority); + } + + return annotation_id; +} + +#endif + diff --git a/ACE/ace/PIP_Dispatcher.cpp b/ACE/ace/PIP_Dispatcher.cpp new file mode 100644 index 00000000000..dbc0931edbf --- /dev/null +++ b/ACE/ace/PIP_Dispatcher.cpp @@ -0,0 +1,505 @@ +#include "ace/PIP_Dispatcher.h" +#include "ace/PIP_Invocation_Manager.h" +#include "ace/PIP_Messages.h" +#include "ace/Reactor.h" + +#include <iostream> + +ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::dispatcher_ = 0; +ACE_Mutex ACE_PIP_Dispatcher::instance_lock_; +bool ACE_PIP_Dispatcher::delete_dispatcher_ = false; +bool ACE_PIP_Dispatcher::shutdown_ = false; + +/// Constructor +ACE_PIP_Dispatcher::ACE_PIP_Dispatcher() + : current_highest_priority_(ACE_Event_Handler::LO_PRIORITY) + , current_lowest_priority_(ACE_Event_Handler::LO_PRIORITY) + , DA_strategy_adapter_(0) + , message_available_signal_(0) + , threads_available_signal_(0) + , waiting_for_message_(false) +{ +} + +/// Destructor +ACE_PIP_Dispatcher::~ACE_PIP_Dispatcher() +{ + ACE_PIP_Protocol_Message* message(0); + + // Destroy all messages that have yet to be dispatched + pending_messages_lock_.acquire(); + while (pending_messages_by_message_id_.current_size() != 0) + { + pending_messages_by_message_id_.unbind( + pending_messages_by_message_id_.begin()->key(), + message); + + if (message) + { + delete message; + message = 0; + } + } + pending_messages_lock_.release(); +} + + +ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::instance() +{ + if (ACE_PIP_Dispatcher::dispatcher_ == 0) + { + instance_lock_.acquire(); + + if (ACE_PIP_Dispatcher::dispatcher_ == 0) + { + ACE_NEW_RETURN (ACE_PIP_Dispatcher::dispatcher_, + ACE_PIP_Dispatcher, + 0); + + delete_dispatcher_ = true; + } + + instance_lock_.release(); + } + + return dispatcher_; +} + +/// Receive a message for eventual dispatching +void ACE_PIP_Dispatcher::process_message(ACE_PIP_Protocol_Message* message) +{ + switch (message->get_message_type()) + { + case ACE_PIP_Protocol_Message::ACCEL: + process_incoming_acceleration(message); + break; + + case ACE_PIP_Protocol_Message::REQUEST: + process_incoming_request(message); + break; + + case ACE_PIP_Protocol_Message::RESPONSE: + // Forward the response to the invocation manager + ACE_PIP_Invocation_Manager::instance()->process_inbound_response(message); + break; + + default: + std::cerr << "PIP_Dispatcher::process_message: Invalid Message type of " << message->get_message_type() << std::endl; + } +} + + +/// Signals the dispatcher to dispatch a new message if possible. +int ACE_PIP_Dispatcher::handle_output (ACE_HANDLE) +{ + ACE_PIP_Protocol_Message* message(0); + bool message_dispatched(false); + + while (!message_dispatched && !shutdown_) + { + // get the highest priority message + pending_messages_lock_.acquire(); + message = retrieve_highest_priority_pending_message(); + if (message) + { + ACE_PIP_Data_Message* data_message = + static_cast<ACE_PIP_Data_Message*>(message->get_next()); + + deadlock_avoidance_lock_.acquire(); + + /// If dispatching could potentially cause deadlock, try to accelerate all lower priority + /// messages and then wait for threads to become available + num_threads_needed_ = DA_strategy_adapter_->is_deadlock_potential( + data_message->get_destination_handler_ID(), + data_message->get_message_priority()); + + if (num_threads_needed_ > 0) + { + deadlock_avoidance_lock_.release(); + find_and_accelerate_lower_priority_message(data_message->get_message_priority()); + + // Wait for signal indicating enough threads exist to dispatch the message + threads_available_signal_.acquire(); + + // Before grabing the deadlock avoidance lock, check to make sure + // we haven't been told to shutdown. + if (shutdown_) + break; + + deadlock_avoidance_lock_.acquire(); + } + + // At this point, sufficient threads exist to dispatch the message + // without threat of deadlock, so grant a thread + DA_strategy_adapter_->grant(data_message->get_destination_handler_ID(), + data_message->get_message_priority()); + + deadlock_avoidance_lock_.release(); + + // Transfer the message to the "dispatched" list + dispatched_messages_lock_.acquire(); + Dispatched_Message_Data dispatch_record; + dispatch_record.id = message->get_message_id(); + dispatch_record.priority = data_message->get_message_priority(); + dispatched_messages_data_.insert(dispatch_record); + dispatched_messages_lock_.release(); + + //-------------TEST DATA------------------ + // store statistics to be printed later + Dispatch_Test_Data test_data; + test_data.id = message->get_message_id(); + test_data.priority = data_message->get_message_priority(); + test_data.num_pending = num_pending_messages_; + test_data.highest_priority = current_highest_priority_; + test_data.lowest_priority = current_lowest_priority_; + dispatch_records_.push_back(test_data); + + dispatched_ids_.push_back(message->get_message_id()); + + ++num_messages_dispatched_; + --num_pending_messages_; + pending_messages_lock_.release(); + //----------------------------------------- + + // Request another thread to be associated with dispatcher + ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); + + message_dispatched = true; + + // Pass the message to the invocation manager for processing + ACE_PIP_Invocation_Manager::instance()->process_inbound_request(message); + + // All processing associated with the message has been completed + // so discard the record + dispatched_messages_lock_.acquire(); + dispatched_messages_data_.erase(dispatch_record); + dispatched_messages_lock_.release(); + + // Cleanup message information and release the thread resource + deadlock_avoidance_lock_.acquire(); + DA_strategy_adapter_->release(data_message->get_destination_handler_ID(), + data_message->get_message_priority()); + + if (num_threads_needed_ > 0) + { + --num_threads_needed_; + if (num_threads_needed_ == 0) + { + threads_available_signal_.release(); + } + } + + deadlock_avoidance_lock_.release(); + } + else + { + // There are no messages to dispatch, so wait for one to arrive + waiting_for_message_ = true; + pending_messages_lock_.release(); + message_available_signal_.acquire(); + + // Before dispatching a message, make sure we haven't been + // instructed to shutdown + if (shutdown_) + break; + } + } + + return 0; +} + + +/// Initializes dispatcher +void ACE_PIP_Dispatcher::init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter) +{ + DA_strategy_adapter_ = DA_strategy_adapter; + waiting_for_message_ = true; + ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); +} + +/// store the message +void ACE_PIP_Dispatcher::process_incoming_request(ACE_PIP_Protocol_Message* message) +{ + // Store the message token 2 ways to enable efficient dispatching as well as + // efficient lookup for accelerations + pending_messages_lock_.acquire(); + + //-------TEST DATA------------------------ + ++num_messages_received_; + ++num_pending_messages_; + received_ids_.push_back(message->get_message_id()); + + //------------------------------------------ + ACE_UINT32 priority = + static_cast<ACE_PIP_Data_Message*>(message->get_next())->get_message_priority(); + + // update the priority upper and lower bounds. These values are stored to + // avoid checking the full range of priorities when dispatching messages + if (priority > current_highest_priority_) + { + current_highest_priority_ = priority; + } + else if (priority < current_lowest_priority_) + { + current_lowest_priority_ = priority; + } + + PRIORITY_MESSAGE_LIST_MAP::iterator + message_iter = pending_messages_by_priority_.find(priority); + + if (message_iter == pending_messages_by_priority_.end()) + { + // Create a new entry for this priority level + std::list<ACE_PIP_Protocol_Message*> new_priority_list; + new_priority_list.push_back(message); + pending_messages_by_priority_.insert( + make_pair(priority, new_priority_list)); + } + else + { + // Priority already exists, so add the message token to the list + message_iter->second.push_back(message); + } + + pending_messages_by_message_id_.bind(message->get_message_id(), message); + + if (waiting_for_message_) + { + waiting_for_message_ = false; + + // Signal waiting dispatcher thread to dispatch new message + message_available_signal_.release(); + } + + pending_messages_lock_.release(); + +} + +/// Find the highest priority message and return it +ACE_PIP_Protocol_Message* ACE_PIP_Dispatcher:: + retrieve_highest_priority_pending_message() +{ + ACE_PIP_Protocol_Message* message(0); + for (ACE_INT32 current_priority = (ACE_INT32)current_highest_priority_; + current_priority >= (ACE_INT32)current_lowest_priority_; + --current_priority) + { + PRIORITY_MESSAGE_LIST_MAP::iterator + pending_message_iter = pending_messages_by_priority_.find(current_priority); + + for (; pending_message_iter != pending_messages_by_priority_.end(); + ++pending_message_iter) + { + std::list<ACE_PIP_Protocol_Message*>::iterator next_message_iter = + pending_message_iter->second.begin(); + + if (next_message_iter != pending_message_iter->second.end()) + { + // The highest-priority message has been found. Grab the message + // and remove it from both containers + message = *next_message_iter; + pending_message_iter->second.pop_front(); + pending_messages_by_message_id_.unbind(message->get_message_id()); + break; + } + else + { + // There are no messages at this priority. Since the search begins at + // the highest priority, lower the highest priority until a message + // is found + if (current_highest_priority_ > current_lowest_priority_) + { + --current_highest_priority_; + } + } + } + + if (message) + { + break; + } + } + + return message; +} + +bool ACE_PIP_Dispatcher:: +find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority) +{ + bool found(false); + bool erased_this_pass(true); + + dispatched_messages_lock_.acquire(); + + while(erased_this_pass) + { + erased_this_pass = false; + + std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin(); + + // Find all dispatched messages having priority lower than new_priority. For each + // send an acceleration message, and update the dispatch record + for (; iter != dispatched_messages_data_.end() && + num_threads_needed_ > 0; ++iter) + { + if (iter->priority < new_priority) + { + // A message has been found that has a lower priority, + // so the send an acceleration message + ACE_PIP_Accel_Message* accel_message = new ACE_PIP_Accel_Message; + accel_message->set_old_priority(iter->priority); + accel_message->set_new_priority(new_priority); + + ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message; + protocol_message->set_message_type(ACE_PIP_Protocol_Message::ACCEL); + protocol_message->set_message_id(iter->id); + protocol_message->set_next(accel_message); + + Dispatched_Message_Data dispatch_record = *iter; + dispatched_messages_data_.erase(iter); + dispatch_record.priority = new_priority; + dispatched_messages_data_.insert(dispatch_record); + std::cout << "PIP_Dispatcher::find_and_accel : accelerating " << iter->id << std::endl; + ACE_PIP_Invocation_Manager::instance()->process_acceleration(protocol_message); + found = true; + erased_this_pass = true; + break; + } + } + } + + dispatched_messages_lock_.release(); + pending_messages_lock_.release(); +} + +void ACE_PIP_Dispatcher::shutdown() +{ + shutdown_ = true; + + // Pulse signals so waiting threads can quit + message_available_signal_.release(); + threads_available_signal_.release(); +} + +void ACE_PIP_Dispatcher::process_incoming_acceleration(ACE_PIP_Protocol_Message* message) +{ + bool updated_pending(false); + // Look for pending message. If the message is pending, update the priority, move it around in data structures, and quit + + ACE_PIP_Accel_Message* accel_message = + static_cast<ACE_PIP_Accel_Message*>(message->get_next()); + + pending_messages_lock_.acquire(); + ACE_Hash_Map_Entry<ACE_UINT64, ACE_PIP_Protocol_Message*>* entry(0); + if (pending_messages_by_message_id_.find(message->get_message_id(), entry) == 0) + { + ACE_PIP_Data_Message* data_message = + static_cast<ACE_PIP_Data_Message*>(entry->item()->get_next()); + + data_message->set_message_priority(accel_message->get_new_priority()); + + // move the message from one priority to the other + updated_pending = true; + + std::cout << "Dispatcher::Accelerated pending message" << std::endl; + } + pending_messages_lock_.release(); + + if (!updated_pending) + { + bool found(false); + ACE_Guard<ACE_Mutex> guard(dispatched_messages_lock_); + // Message is not pending, so must already be dispatche + std::set<Dispatched_Message_Data>::iterator iter = dispatched_messages_data_.begin(); + + // Find all dispatched messages having priority lower than new_priority. For each + // send an acceleration message, and update the dispatch record + for (; iter != dispatched_messages_data_.end(); ++iter) + { + if ((iter->id == message->get_message_id()) && + (iter->priority < accel_message->get_new_priority())) + { + std::cout << "Dispatcher::Accelerated dispatched message" << std::endl; + Dispatched_Message_Data dispatch_record = *iter; + dispatched_messages_data_.erase(iter); + dispatch_record.priority = accel_message->get_new_priority(); + dispatched_messages_data_.insert(dispatch_record); + ACE_PIP_Invocation_Manager::instance()->process_acceleration(message); + found = true; + break; + } + } + + if (!found) + { + for (std::vector<ACE_UINT64>::iterator it = received_ids_.begin(); + it != received_ids_.end(); ++it) + { + if (*it == message->get_message_id()) + { + std::cout << "MessageID: " << *it << " already came and left" << std::endl; + found = true; + break; + } + } + if (!found) + { + std::cout << "Accel for messageID: " << message->get_message_id() << + " beat message to the remote dispatcher" << std::endl; + } + } + + } +} + + + +void ACE_PIP_Dispatcher::print_results() +{ + std::cout << "----------------------DISPATCHER_RESULTS-------------" << std::endl; + std::cout << std::endl; + std::cout << "Num received: " << num_messages_received_ << std::endl; + std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl; + std::cout << std::endl; + + std::cout << "Received Ids: " << std::endl; + for (std::vector<ACE_UINT64>::iterator rec_id_iter = received_ids_.begin(); + rec_id_iter != received_ids_.end(); + ++rec_id_iter) + { + std::cout << *rec_id_iter << std::endl; + } + + std::cout << std::endl; + std::cout << "Dispatched Ids: " << std::endl; + for (std::vector<ACE_UINT64>::iterator disp_id_iter = dispatched_ids_.begin(); + disp_id_iter != dispatched_ids_.end(); + ++disp_id_iter) + + { + std::cout << *disp_id_iter << std::endl; + } + + std::cout << std::endl; + std::cout << "Dispatch Records: " << std::endl; + for (std::vector<ACE_PIP_Dispatcher::Dispatch_Test_Data>::iterator rec_iter = dispatch_records_.begin(); + rec_iter != dispatch_records_.end(); + ++rec_iter) + + { + std::cout << "Id: " << rec_iter->id << std::endl; + std::cout << "Priority: " << rec_iter->priority << std::endl; + std::cout << "Num Pending: " << rec_iter->num_pending << std::endl; + std::cout << "Highest Priority " << rec_iter->highest_priority << std::endl; + std::cout << "Lowest Priority " << rec_iter->lowest_priority << std::endl; + std::cout << std::endl; + } + + std::cout << std::endl; + std::cout << "Num received: " << num_messages_received_ << std::endl; + std::cout << "Num dispatched: " << num_messages_dispatched_ << std::endl; + std::cout << std::endl; + + + std::cout << "-----------------------------------------------------" << std::endl; +} diff --git a/ACE/ace/PIP_Dispatcher.h b/ACE/ace/PIP_Dispatcher.h new file mode 100644 index 00000000000..d93b2957ca6 --- /dev/null +++ b/ACE/ace/PIP_Dispatcher.h @@ -0,0 +1,188 @@ + /** + * @file PIP_Dispatcher.h + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + * This file contains the specification for a class + * that dispatches priority inheritance protocol messages + * to the appropriate message handler. +*/ + + +#ifndef _PIP_DISPATCHER_H_ +#define _PIP_DISPATCHER_H_ + +// ACE definitions +#include "ace/Event_Handler.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/PIP_DA_Strategy_Adapter.h" +#include "ace/PIP_Messages.h" +#include "ace/RW_Thread_Mutex.h" +#include "ace/Semaphore.h" +#include "ace/Singleton.h" + +// STL definitions +#include <list> +#include <map> +#include <set> +#include <vector> + +// Forward Declarations +class ACE_PIP_Protocol_Message; + +typedef std::map<ACE_UINT32, std::list<ACE_PIP_Protocol_Message*> > + PRIORITY_MESSAGE_LIST_MAP; + +// Associate each message with a message ID +typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64, + ACE_PIP_Protocol_Message*, + ACE_Hash<ACE_UINT64>, + ACE_Equal_To<ACE_UINT64>, + ACE_Null_Mutex> ID_MESSAGE_MAP; + + +/** + * @class ACE_Dispatcher + * @brief Dispatches ACE_PIP_Priority_Messages in priority order + * message handlers. Additionally, notifies handlers when priority inversion is + * detected. + * + * The PIP_Message_Dispatcher implements the priority inheritance protocol. + * Upon receipt of messages, it determines the highest-priority message to + * be dispatched, and dispatches providing enough resources exist. If not enough exist, + * and a lower priority message has been dispatched, an acceleration message is sent + * to the corresponding handler to raise the priority of the message, thus + * mitigating the inversion. +*/ +class ACE_Export ACE_PIP_Dispatcher : public ACE_Event_Handler +{ + public: + + /// Constructor + ACE_PIP_Dispatcher(); + + /// Destructor + virtual ~ACE_PIP_Dispatcher(); + + /// obtain the single instance of the dispatcher + static ACE_PIP_Dispatcher* instance(); + + /// Receive a message for eventual dispatching + void process_message(ACE_PIP_Protocol_Message* message); + + /// Signals the dispatcher to dispatch a new message if possible. + virtual int handle_output (ACE_HANDLE); + + /// Initializes dispatcher + void init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter); + + /// Tell the dispatcher to stop dispatching and release all threads ASAP + void shutdown(); + + /// Accelerate the appropriate message + void process_incoming_acceleration(ACE_PIP_Protocol_Message* message); + + /// Print statistics + void print_results(); + + private: + + // Dispatched_Message_Data stores the ID and priority + // of a dispatched message + class Dispatched_Message_Data + { + public: + + bool operator<(const Dispatched_Message_Data& other) const + { + return (priority < other.priority); + } + + bool operator==(const Dispatched_Message_Data& other) const + { + return (id == other.id); + } + + bool operator!=(const Dispatched_Message_Data& other) const + { + return !(*this == other); + } + + ACE_UINT64 id; + ACE_UINT32 priority; + }; + + class Dispatch_Test_Data + { + public: + ACE_UINT64 id; + ACE_UINT64 priority; + ACE_UINT32 num_pending; + ACE_UINT32 highest_priority; + ACE_UINT32 lowest_priority; + }; + + /// store the message + void process_incoming_request(ACE_PIP_Protocol_Message* message); + + /// Find the highest priority message and return it + ACE_PIP_Protocol_Message* retrieve_highest_priority_pending_message(); + + bool find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority); + + + // Dispatched message data is stored to determine which messages are + // currently assigned to a thread. This is useful for finding messages + // whose priority needs to be accelerated in the case where an inversion + // is detected. + std::set<Dispatched_Message_Data> dispatched_messages_data_; + ACE_Mutex dispatched_messages_lock_; + + ACE_UINT32 current_highest_priority_; + ACE_UINT32 current_lowest_priority_; + + // Pending messages (those not dispatched) are stored in 2 ways for efficiency + // 1.) By message id - this is useful for managing priority accelerations + // because we can find the appropriate message in constant time + // 2.) By priority - this is useful for determining which message to dispatch next + // as messages are dispatched in priority order + PRIORITY_MESSAGE_LIST_MAP pending_messages_by_priority_; + ID_MESSAGE_MAP pending_messages_by_message_id_; + ACE_Mutex pending_messages_lock_; + + // Indicates the dispatcher has a thread waiting to + // dispatch a message + bool waiting_for_message_; + + // Number of threads that need to be returned in order to + // dispatch the current message + int num_threads_needed_; + + ACE_Semaphore message_available_signal_; + ACE_Semaphore threads_available_signal_; + + ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter_; + ACE_Mutex deadlock_avoidance_lock_; + + static ACE_PIP_Dispatcher* dispatcher_; + static ACE_Mutex instance_lock_; + static bool delete_dispatcher_; + static bool shutdown_; + + // Test variables + ACE_UINT32 num_pending_messages_; + ACE_UINT32 num_messages_received_; + ACE_UINT32 num_messages_dispatched_; + std::vector<ACE_UINT64> received_ids_; + std::vector<ACE_UINT64> dispatched_ids_; + std::vector<Dispatch_Test_Data> dispatch_records_; + +}; + +// Define a singleton class to make the dispatcher globally accessible +typedef ACE_Singleton<ACE_PIP_Dispatcher, ACE_Mutex> + ACE_PIP_Dispatcher_Singleton; + +#endif diff --git a/ACE/ace/PIP_IO_Handler.cpp b/ACE/ace/PIP_IO_Handler.cpp new file mode 100644 index 00000000000..f30713751a5 --- /dev/null +++ b/ACE/ace/PIP_IO_Handler.cpp @@ -0,0 +1,185 @@ +// $Id$ + +#include "ace/Guard_T.h" +#include "ace/PIP_IO_Handler.h" +#include "ace/PIP_Invocation_Manager.h" +#include "ace/PIP_Dispatcher.h" + +/// Constructor +ACE_PIP_IO_Handler::ACE_PIP_IO_Handler() + : priority_set_(false) + , destination_site_id_(0) + , site_id_(0) + , handler_id_(0) + , millisecond_(0, 1000) +{ + // Temporarily assign the priority to be highest possible. + // The first message received by the handler will be the priority + this->priority(ACE_Event_Handler::HI_PRIORITY); +} + +/// Destructor +ACE_PIP_IO_Handler::~ACE_PIP_IO_Handler( ) +{ + // Tell the Invocation Manager to stop sending us messages + ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); + + // Delete all outgoing messages + ACE_PIP_Protocol_Message* message(0); + while (!outgoing_message_queue_.is_empty()) + { + outgoing_message_queue_.dequeue_tail(message); + delete message; + } +} + +void ACE_PIP_IO_Handler::site_id(ACE_UINT32 site_id) +{ + site_id_ = site_id; +} + +ACE_UINT32 ACE_PIP_IO_Handler::site_id() const +{ + return site_id_; +} + +ACE_UINT32 ACE_PIP_IO_Handler::destination_site_id() const +{ + return destination_site_id_; +} + +void ACE_PIP_IO_Handler::handler_id(ACE_UINT32 handler_id) +{ + handler_id_ = handler_id; +} + +ACE_UINT32 ACE_PIP_IO_Handler::handler_id() const +{ + return handler_id_; +} + +/// Initialize the priority of the handler, and inform the other end +/// of the priority +void ACE_PIP_IO_Handler::init(ACE_UINT32 site_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 priority) +{ + this->priority(priority); + site_id_ = site_id; + destination_site_id_ = destination_site_id; + + // Inform other end of this connections priority + peer_.send(&priority, sizeof(priority)); + + // Inform other end of this end's site id + peer_.send(&site_id, sizeof(site_id)); + priority_set_ = true; + + // Register to receive outgoing messages + ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); +} + +void ACE_PIP_IO_Handler::extract_priority() +{ + ACE_UINT32 priority(0); + if (peer_.recv(&priority, sizeof(priority)) == sizeof(priority)) + { + this->priority(priority); + } + else + { + this->priority(ACE_Event_Handler::LO_PRIORITY); + } + + // Receive the other end's site id + if (peer_.recv(&destination_site_id_, sizeof(destination_site_id_)) != sizeof(destination_site_id_)) + { + destination_site_id_ = 0; + } + + priority_set_ = true; +} + +/// Handles read event on socket. +int ACE_PIP_IO_Handler::handle_input (ACE_HANDLE fd) +{ + int result(0); + int bytes_read(0); + + if (!priority_set_) + { + // incoming message is the priority of this connection + extract_priority(); + ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); + } + else + { + // Read the next incoming message + ACE_PIP_Protocol_Message* message = new ACE_PIP_Protocol_Message; + bytes_read = message->deserialize(peer_); + if (bytes_read > 0) + { + if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL) + { + std::cout << "Accel Message Received" << std::endl; + } + + ACE_PIP_Dispatcher::instance()->process_message(message); + } + else if (bytes_read < 0) + { + // The connection is broken, so handler should be deleted + delete message; + result = -1; + } + } + + return result; +} + + +/// Handles output event on socket +int ACE_PIP_IO_Handler::handle_output (ACE_HANDLE fd) +{ + int bytes_sent(0); + // determine if outgoing messages exist + ACE_PIP_Protocol_Message* message(0); + + write_closed_ = false; + big_lock_.acquire(); + if (outgoing_message_queue_.dequeue_tail(message) != -1) + { + if (message->get_message_type() == ACE_PIP_Protocol_Message::ACCEL) + { + std::cout << "Sending accel message" << std::endl; + } + bytes_sent = message->serialize(peer_); + delete message; + if (bytes_sent >= 0) + { + big_lock_.release(); + return 0; + } + else + { + write_closed_ = true; + big_lock_.release(); + // indicate the outgoing connection is closed + return -2; + } + } + else + { + // indicate that there was no message to output + + big_lock_.release(); + return -1; + } +} + +ACE_INET_Addr ACE_PIP_IO_Handler::get_remote_address() const +{ + ACE_INET_Addr addr; + peer_.get_remote_addr(addr); + return addr; +} diff --git a/ACE/ace/PIP_IO_Handler.h b/ACE/ace/PIP_IO_Handler.h new file mode 100644 index 00000000000..90665097b5e --- /dev/null +++ b/ACE/ace/PIP_IO_Handler.h @@ -0,0 +1,94 @@ + /** + * @file PIP_IO_Handler.h + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + * This file contains the specification for a class + * that manages network I/O +*/ + + +#ifndef _PIP_IO_HANDLER_H_ +#define _PIP_IO_HANDLER_H_ + + +#include "ace/Message_Queue.h" +#include "ace/Mutex.h" +#include "ace/PIP_Messages.h" +#include "ace/Svc_Handler.h" +#include "ace/Thread_Mutex.h" + +// Typedefs +typedef ACE_Message_Queue_Ex<ACE_PIP_Protocol_Message, ACE_NULL_SYNCH> + PROTO_MESSAGE_QUEUE_TYPE; + +/** + * @class ACE_PIP_IO_Handler + * + * @brief Performs network I/O + * + * @author John Moore <ljohn7@gmail.com> + */ +class ACE_Export ACE_PIP_IO_Handler : + public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_MT_SYNCH> +{ + public: + + /// Constructor + ACE_PIP_IO_Handler (); + + /// Destructor + virtual ~ACE_PIP_IO_Handler(); + + /// Enqueue a message to be sent + virtual int put_message (ACE_PIP_Protocol_Message* message) = 0; + + /// Initialize the priority of the handler, and inform the other end + /// of the priority + virtual void init(ACE_UINT32 site_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 priority); + + /// Handles read event on socket. + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + + /// Handles read event on socket. + virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE); + + /// Determine the id of the site at which the handler is located, + void site_id(ACE_UINT32 site_id); + ACE_UINT32 site_id() const; + + /// Determine the other end's site id + ACE_UINT32 destination_site_id() const; + + /// Determine the id that uniquely identifies this handler, + void handler_id(ACE_UINT32 handler_id); + ACE_UINT32 handler_id() const; + ACE_INET_Addr get_remote_address() const; + + protected: + + /// Reads priority from socket + void extract_priority(); + + // variables to track the state of the handler + bool read_closed_; + bool write_closed_; + bool priority_set_; + + ACE_UINT32 site_id_; + ACE_UINT32 handler_id_; + ACE_UINT32 destination_site_id_; + + const ACE_Time_Value millisecond_; + + PROTO_MESSAGE_QUEUE_TYPE outgoing_message_queue_; + ACE_Thread_Mutex big_lock_; +}; + +#endif /* _PIP_IO_Handler_H_ */ + + diff --git a/ACE/ace/PIP_Message_Handler.cpp b/ACE/ace/PIP_Message_Handler.cpp new file mode 100644 index 00000000000..caf7a8f1ff6 --- /dev/null +++ b/ACE/ace/PIP_Message_Handler.cpp @@ -0,0 +1,105 @@ +#include "ace/PIP_Message_Handler.h" +#include "ace/PIP_Invocation_Manager.h" + +ACE_PIP_Message_Handler::ACE_PIP_Message_Handler() + : handler_id_(0) + , site_id_(0) +{ + +} + +ACE_PIP_Message_Handler::ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id) + : site_id_(site_id) + , handler_id_(handler_id) +{ +} + +ACE_PIP_Protocol_Message* ACE_PIP_Message_Handler::create_protocol_message(ACE_UINT64 message_id, + bool reply_expected, + ACE_UINT32 source_handler_id, + ACE_UINT32 source_site_id, + ACE_UINT32 destination_handler_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 message_priority, + ACE_PIP_Protocol_Message::Message_Type type, + const std::string& data_payload) +{ + // setup the proto message header + ACE_PIP_Protocol_Message* proto_message = new ACE_PIP_Protocol_Message; + proto_message->set_message_id(message_id); + proto_message->set_message_type(type); + + ACE_PIP_Data_Message* data_message = new ACE_PIP_Data_Message; + data_message->set_reply_expected(reply_expected); + + data_message->set_source_handler_ID(source_handler_id); + data_message->set_source_site_ID(source_site_id); + data_message->set_destination_handler_ID(destination_handler_id); + data_message->set_destination_site_ID(destination_site_id); + + data_message->set_message_priority(message_priority); + + // Create data message header and body, then pass to protocol message to be parsed + // and unpacked + ACE_Message_Block* header = new ACE_Message_Block(sizeof(ACE_PIP_Data_Message)); + ACE_Message_Block* body = new ACE_Message_Block(data_payload.length() + 1); + + ACE_OS::memcpy(body->wr_ptr(), data_payload.c_str(), data_payload.length() + 1); + body->wr_ptr(data_payload.length() + 1); + + // attach the data body to the header + header->next(body); + + // pack the header values into the message block + // set the write ptr ahead so pack() will know to put it back where it should be + header->wr_ptr(sizeof(ACE_PIP_Data_Message)); + data_message->block_ = header; + data_message->pack(); + + proto_message->set_next(data_message); + return proto_message; +} + +void ACE_PIP_Message_Handler::send_request(ACE_Message_Block* message, + ACE_UINT64 message_id, + ACE_Message_Block*& response) +{ + ACE_Future<ACE_Message_Block*>* response_holder(0); + ACE_PIP_Invocation_Manager::instance()->process_outbound_request(message, message_id, response_holder); + if (response_holder) + { + if (response_holder->get(response) == -1) + { + std::cerr << "Error receiving response in ::send_request" << std::endl; + response = 0; + } + } +} + +void ACE_PIP_Message_Handler::send_response(ACE_Message_Block* message, + ACE_UINT64 message_id) +{ + ACE_PIP_Invocation_Manager::instance()->process_outbound_response(message, message_id); +} + + +ACE_UINT32 ACE_PIP_Message_Handler::get_handler_id() const +{ + return handler_id_; +} +void ACE_PIP_Message_Handler::set_handler_id(ACE_UINT32 id) +{ + handler_id_ = id; +} + +ACE_UINT32 ACE_PIP_Message_Handler::get_site_id() const +{ + return site_id_; +} + +void ACE_PIP_Message_Handler::set_site_id(ACE_UINT32 id) +{ + site_id_ = id; +} + + diff --git a/ACE/ace/PIP_Message_Handler.h b/ACE/ace/PIP_Message_Handler.h new file mode 100644 index 00000000000..2dec7cb628e --- /dev/null +++ b/ACE/ace/PIP_Message_Handler.h @@ -0,0 +1,67 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file PIP_Message_Handler.h + * + * + * @author John Moore + */ +//============================================================================= + + +#ifndef _PIP_MESSAGE_HANDLER_H_ +#define _PIP_MESSAGE_HANDLER_H_ + +#include "ace/Message_Block.h" +#include "ace/PIP_Messages.h" +#include "ace/Event_Handler.h" + +class ACE_Export ACE_PIP_Message_Handler +{ + public: + + ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 message_id); + ACE_PIP_Message_Handler(); + virtual ~ACE_PIP_Message_Handler(){} + + virtual void process_incoming_message(ACE_Message_Block* message, + ACE_UINT64 message_id) = 0; + + ACE_UINT32 get_handler_id() const; + void set_handler_id(ACE_UINT32 id); + + ACE_UINT32 get_site_id() const; + void set_site_id(ACE_UINT32 id); + + + + protected: + + ACE_UINT32 handler_id_; + ACE_UINT32 site_id_; + ACE_INET_Addr my_address_; + + // Pass a message to a remote handler + virtual void send_request(ACE_Message_Block* message, + ACE_UINT64 message_id, + ACE_Message_Block*& response); + + // Pass a response message to a remote handler + virtual void send_response(ACE_Message_Block* message, + ACE_UINT64 message_id); + + ACE_PIP_Protocol_Message* create_protocol_message(ACE_UINT64 message_id, + bool reply_expected, + ACE_UINT32 source_handler_id, + ACE_UINT32 source_site_id, + ACE_UINT32 destination_handler_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 message_priority, + ACE_PIP_Protocol_Message::Message_Type type, + const std::string& data_payload); + +}; + +#endif + diff --git a/ACE/ace/PIP_Messages.cpp b/ACE/ace/PIP_Messages.cpp new file mode 100644 index 00000000000..67fb886a3ec --- /dev/null +++ b/ACE/ace/PIP_Messages.cpp @@ -0,0 +1,607 @@ +// $Id$ + +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_string.h" +#include "ace/PIP_Messages.h" + +#include <iostream> + +ACE_PIP_Message::ACE_PIP_Message() + : block_(0) + , dirty_(false) + , next_(0) +{} + +ACE_PIP_Message::~ACE_PIP_Message() +{ + if (next_) + { + delete next_; + } + + if (block_) + { + block_->release(); + } +} + +void ACE_PIP_Message::set_block(ACE_Message_Block* block) +{ + // Remove the other block if it exist. + if (block_) + { + block_->release(); + } + + block_ = block; + + // Extract the values from the block. + unpack(); +} + +ACE_PIP_Data_Message::ACE_PIP_Data_Message() + : message_priority_(-1) + , reply_expected_(false) + , source_handler_ID_(-1) + , source_site_ID_(-1) + , destination_handler_ID_(-1) + , destination_site_ID_(-1) +{ +} + +int ACE_PIP_Data_Message::serialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_sent(0); + + // Only serialize if there is a block. If not, + // there's nothing we can do but fail since we don't + // have enough information to create a block and unpack it. + if (block_) + { + if (dirty_) + { + pack(); + } + + ACE_Message_Block* curr_block = block_; + int bytes_sent(0); + while(curr_block) + { + bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); + if (bytes_sent > 0) + { + total_bytes_sent += bytes_sent; + curr_block = curr_block->next(); + } + else + { + std::cout << "Data_Mess:serialize: - didn't send any bytes" << std::endl; + total_bytes_sent = -1; + break; + } + } + } + else + { + std::cout << "DataMessage::Serialize - there is no block" << std::endl; + total_bytes_sent = -1; + } + + return total_bytes_sent; +} + +void ACE_PIP_Data_Message::pack() +{ + char* write_ptr = block_->wr_ptr(); + char* read_ptr = block_->rd_ptr(); + + block_->reset(); + + // Pack reply expected into buffer. + ACE_OS::memcpy(block_->wr_ptr(), &reply_expected_, sizeof(reply_expected_)); + block_->wr_ptr(sizeof(reply_expected_)); + + // Pack the message priority into the buffer. + ACE_OS::memcpy(block_->wr_ptr(), &message_priority_, sizeof(message_priority_)); + block_->wr_ptr(sizeof(message_priority_)); + + // Pack the destination handler ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &destination_handler_ID_, sizeof(destination_handler_ID_)); + block_->wr_ptr(sizeof(destination_handler_ID_)); + + // Pack the source handler ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &source_handler_ID_, sizeof(source_handler_ID_)); + block_->wr_ptr(sizeof(source_handler_ID_)); + + // Pack the destination site ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &destination_site_ID_, sizeof(destination_site_ID_)); + block_->wr_ptr(sizeof(destination_site_ID_)); + + // Pack the source site ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &source_site_ID_, sizeof(source_site_ID_)); + block_->wr_ptr(sizeof(source_site_ID_)); + + // Reset the buffer pointers to where they were so that the message length remains + // accurate. + block_->rd_ptr(read_ptr); + block_->wr_ptr(write_ptr); +} + +void ACE_PIP_Data_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + block_->reset(); + + // reply_expected_ + ACE_OS::memcpy(&reply_expected_, block_->rd_ptr(), sizeof(reply_expected_)); + block_->rd_ptr(sizeof(reply_expected_)); + + // message priority + ACE_OS::memcpy(&message_priority_, block_->rd_ptr(), sizeof(message_priority_)); + block_->rd_ptr(sizeof(message_priority_)); + + // destination handler ID + ACE_OS::memcpy(&destination_handler_ID_, block_->rd_ptr(), sizeof(destination_handler_ID_)); + block_->rd_ptr(sizeof(destination_handler_ID_)); + + // source handler ID + ACE_OS::memcpy(&source_handler_ID_, block_->rd_ptr(), sizeof(source_handler_ID_)); + block_->rd_ptr(sizeof(source_handler_ID_)); + + // destination site ID + ACE_OS::memcpy(&destination_site_ID_, block_->rd_ptr(), sizeof(destination_site_ID_)); + block_->rd_ptr(sizeof(destination_site_ID_)); + + // source site ID + ACE_OS::memcpy(&source_site_ID_, block_->rd_ptr(), sizeof(source_site_ID_)); + block_->rd_ptr(sizeof(source_site_ID_)); + + block_->reset(); + block_->wr_ptr(write_ptr); + } + + dirty_ = false; +} + +void ACE_PIP_Data_Message::print() const +{ + std::cout << "Priority: " << message_priority_ << std::endl + << "Reply?: " << reply_expected_ << std::endl + << "Dest_Handler_ID: " << destination_handler_ID_ << std::endl + << "Source_Handler_ID: " << source_handler_ID_ << std::endl + << "Dest_Site_ID: " << destination_site_ID_ << std::endl + << "Source_Site_ID: " << source_site_ID_ << std::endl + << "Payload: " << block_->next()->base() << std::endl; +} + +ACE_PIP_Accel_Message::ACE_PIP_Accel_Message() + : ACCEL_HEADER_LENGTH_(2*sizeof(ACE_UINT32)) + , new_priority_(0) + , old_priority_(0) +{ +} + +int ACE_PIP_Accel_Message::serialize(ACE_SOCK_Stream& stream) +{ + pack(); + + int bytes_sent = stream.send_n(block_->rd_ptr(), block_->length()); + if (bytes_sent <= 0) + { + std::cout << "Accel:serial: didn't send any bytes" << std::endl; + } + + return bytes_sent; +} + +void ACE_PIP_Accel_Message::pack() +{ + if (!block_) + { + block_ = new ACE_Message_Block(ACCEL_HEADER_LENGTH_); + dirty_ = true; + } + + if (dirty_) + { + + // Set the buffer pointers to the start of the buffer to + // ensure we're writing to the correct location + block_->reset(); + + // Pack the contents of the struct into the message block + ACE_OS::memcpy(block_->wr_ptr(), &old_priority_, sizeof(old_priority_)); + block_->wr_ptr(sizeof(old_priority_)); + + ACE_OS::memcpy(block_->wr_ptr(), &new_priority_, sizeof(new_priority_)); + block_->wr_ptr(sizeof(new_priority_)); + + dirty_ = false; + } +} + +void ACE_PIP_Accel_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + block_->reset(); + + old_priority_ = (*block_->rd_ptr()); + block_->rd_ptr(sizeof(old_priority_)); + + new_priority_ = (*block_->rd_ptr()); + block_->rd_ptr(sizeof (new_priority_)); + + // Reset the read and write pointers to their original location + // in the block. + block_->reset(); + block_->wr_ptr(write_ptr); + } + + dirty_ = false; +} + +ACE_PIP_Accel_Message* ACE_PIP_Accel_Message::copy() +{ + ACE_PIP_Accel_Message* copy = new ACE_PIP_Accel_Message; + + copy->new_priority_ = new_priority_; + copy->old_priority_ = old_priority_; + copy->pack(); + + return copy; +} + +void ACE_PIP_Accel_Message::print() const +{ + std::cout << "DestAddr: " << destination_address_ << std::endl + << "OldPriority: " << old_priority_ << std::endl + << "NewPriority: " << new_priority_ << std::endl; + +} + +ACE_PIP_Protocol_Message::ACE_PIP_Protocol_Message() + : message_type_(NONE) + , num_payload_blocks_(0) + , message_id_(0) + , FIXED_HEADER_LENGTH_(sizeof(Message_Type) + + sizeof(message_id_) + + sizeof(num_payload_blocks_)) +{ +} + +int ACE_PIP_Protocol_Message::serialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_sent(0); + + pack(); + + ACE_Message_Block* curr_block = block_; + int bytes_sent(0); + + // Write each of the message blocks associated with this + // header into the stream + while(curr_block) + { + bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); + if (bytes_sent > 0) + { + total_bytes_sent += bytes_sent; + curr_block = curr_block->next(); + } + else + { + total_bytes_sent = -1; + break; + } + } + if ((total_bytes_sent > 0) && next_) + { + int next_sent = next_->serialize(stream); + if (next_sent > 0) + { + total_bytes_sent += next_sent; + } + else + { + total_bytes_sent = -1; + } + } + else + { + total_bytes_sent = -1; + } + + return total_bytes_sent; +} + +int ACE_PIP_Protocol_Message::deserialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_received(-1); + + ACE_Message_Block* header_block = new ACE_Message_Block(FIXED_HEADER_LENGTH_); + ACE_Message_Block* lengths_block(0); + ACE_Message_Block* curr_payload_block(0); + ACE_Message_Block* payload_blocks(0); + + // Read the fixed-length portion of the protocol header. + int bytes_received = stream.recv_n(header_block->wr_ptr(), FIXED_HEADER_LENGTH_); + if (bytes_received == FIXED_HEADER_LENGTH_) + { + total_bytes_received = bytes_received; + + // Determine number of data message blocks in the payload. + header_block->rd_ptr(FIXED_HEADER_LENGTH_ - sizeof(num_payload_blocks_)); + ACE_OS::memcpy(&num_payload_blocks_, header_block->rd_ptr(), + sizeof(num_payload_blocks_)); + + header_block->reset(); + header_block->wr_ptr(bytes_received); + + // Extract the length of each payload block. + if (num_payload_blocks_ > 0) + { + // Read the lengths of each block. + int bytes_to_read = num_payload_blocks_ * sizeof(ACE_UINT32); + lengths_block = new ACE_Message_Block(bytes_to_read); + bytes_received = stream.recv_n(lengths_block->wr_ptr(), bytes_to_read); + + if (bytes_received == bytes_to_read) + { + total_bytes_received += bytes_received; + lengths_block->wr_ptr(bytes_received); + + // The lengths of each block have been successfully written, so + // unpack them. + header_block->next(lengths_block); + set_block(header_block); + + curr_payload_block = new ACE_Message_Block(payload_block_lengths_[0]); + payload_blocks = curr_payload_block; + unsigned int i = 0; + for (; i < num_payload_blocks_ && bytes_received != -1; ++i) + { + // Read the block. + bytes_received = stream.recv_n(curr_payload_block->wr_ptr(), + payload_block_lengths_[i]); + if (bytes_received > 0) + { + total_bytes_received += bytes_received; + curr_payload_block->wr_ptr(bytes_received); + if (i < (num_payload_blocks_ - 1)) + { + curr_payload_block->next( + new ACE_Message_Block(payload_block_lengths_[i + 1])); + + curr_payload_block = curr_payload_block->next(); + } + else + { + curr_payload_block->next(0); + } + + } + else + { + total_bytes_received = -1; + std::cout << "deserialize: didn't read enough bytes" << std::endl; + break; + } + } + } + else + { + total_bytes_received = -1; + + std::cout << "Deserialize: didnt read enought bytes" << std::endl; + } + } + } + else + { + total_bytes_received = -1; + std::cout << "Deserialize:didn't receive enought bytes: got " << bytes_received << std::endl; + } + + if (total_bytes_received > 0) + { + if (message_type_ == ACCEL) + { + next_ = new ACE_PIP_Accel_Message; + } + else + { + next_ = new ACE_PIP_Data_Message; + } + + // Pass the payload blocks to the next message struct + // so it can unpack it. + next_->set_block(payload_blocks); + } + else if (block_) + { + // Something failed during reading, so cleanup any allocated memory. + block_->release(); + } + + return total_bytes_received; +} + +void ACE_PIP_Protocol_Message::set_next(ACE_PIP_Message* next) +{ + // Determine the number and length of payload blocks. + payload_block_lengths_.clear(); + num_payload_blocks_ = 0; + next->pack(); + ACE_Message_Block* curr_block = next->get_block(); + while (curr_block) + { + ++num_payload_blocks_; + payload_block_lengths_.push_back(curr_block->length()); + curr_block = curr_block->next(); + } + + next_ = next; + dirty_ = true; +} + +void ACE_PIP_Protocol_Message::process_message_payload(ACE_Message_Block* payload) +{ + payload_block_lengths_.clear(); + num_payload_blocks_ = 0; + + // Determine the length and number of payload blocks. + ACE_Message_Block* curr_block = payload; + while (curr_block) + { + ++num_payload_blocks_; + payload_block_lengths_.push_back(curr_block->length()); + curr_block = curr_block->next(); + } + + if (!next_) + { + if (message_type_ == ACCEL) + { + next_ = new ACE_PIP_Accel_Message; + } + else + { + next_ = new ACE_PIP_Data_Message; + } + } + + next_->set_block(payload); + dirty_ = true; +} + +void ACE_PIP_Protocol_Message::pack() +{ + int total_bytes_sent(0); + if (!block_) + { + // Create the message buffer for the protocol header. + block_ = new ACE_Message_Block(FIXED_HEADER_LENGTH_); + + // Create the message buffer for the list of payload block lengths. + block_->next(new ACE_Message_Block(num_payload_blocks_ * sizeof(ACE_UINT32))); + block_->next()->next(0); + dirty_ = true; + } + if (dirty_) + { + // Set the buffer pointers to the start of the buffer + // so that we write to the appropriate location. + block_->reset(); + + // pack the process Id. + ACE_OS::memcpy(block_->wr_ptr(), &message_id_, sizeof(message_id_)); + block_->wr_ptr(sizeof (message_id_)); + + // Pack the message type. + ACE_OS::memcpy(block_->wr_ptr(), &message_type_, sizeof(message_type_)); + block_->wr_ptr(sizeof(message_type_)); + + // Number of blocks in payload. + ACE_OS::memcpy(block_->wr_ptr(), &num_payload_blocks_, sizeof(num_payload_blocks_)); + block_->wr_ptr(sizeof(num_payload_blocks_)); + + ACE_Message_Block* next_block = block_->next(); + if (next_block) + { + next_block->reset(); + + // Write the block lengths into the message block. + for (unsigned int i = 0; i < num_payload_blocks_; ++i) + { + ACE_OS::memcpy(next_block->wr_ptr(), + &payload_block_lengths_[i], + sizeof(ACE_UINT32)); + + next_block->wr_ptr(sizeof(ACE_UINT32)); + } + } + + dirty_ = false; + } +} + +void ACE_PIP_Protocol_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + // char* read_ptr = block_->rd_ptr(); + block_->reset(); + + // Extract the process ID. + ACE_OS::memcpy(&message_id_, block_->rd_ptr(), sizeof(message_id_)); + block_->rd_ptr(sizeof (message_id_)); + + // Extract the message type. + ACE_OS::memcpy(&message_type_, block_->rd_ptr(), sizeof(message_type_)); + block_->rd_ptr(sizeof(message_type_)); + + // Number of blocks in payload. + ACE_OS::memcpy(&num_payload_blocks_, block_->rd_ptr(), + sizeof(num_payload_blocks_)); + + block_->rd_ptr(sizeof(num_payload_blocks_)); + + // Reset buffer pointers to be where they were prior to unpacking. + block_->reset(); + block_->wr_ptr(write_ptr); + + // The next block holds the lengths of each payload block. + ACE_Message_Block* next_block = block_->next(); + if (next_block) + { + write_ptr = next_block->wr_ptr(); + next_block->reset(); + payload_block_lengths_.resize(num_payload_blocks_, 0); + ACE_UINT32 block_length(0); + + // Extract the lengths of each payload block, which will + // be used to recreate the structure of the original payload. + for (ACE_UINT32 i = 0; i < num_payload_blocks_; ++i) + { + ACE_OS::memcpy(&block_length, next_block->rd_ptr(), sizeof(block_length)); + next_block->rd_ptr(sizeof(block_length)); + payload_block_lengths_[i] = block_length; + } + + // Reset the buffer pointers to where they were prior to unpacking. + next_block->reset(); + next_block->wr_ptr(write_ptr); + } + } + + dirty_ = false; +} + +ACE_PIP_Protocol_Message* ACE_PIP_Protocol_Message::copy() +{ + ACE_PIP_Protocol_Message* message_copy = new ACE_PIP_Protocol_Message; + message_copy->message_type_ = message_type_; + message_copy->num_payload_blocks_ = num_payload_blocks_; + for (ACE_UINT32 block_index = 0; block_index < num_payload_blocks_; ++block_index) + { + message_copy->payload_block_lengths_[block_index] = payload_block_lengths_[block_index]; + } +} + +void ACE_PIP_Protocol_Message::print() const +{ + std::cout << "Type: " << message_type_ << std::endl + << "MessageID: " << message_id_ << std::endl + << "NumPayload: " << num_payload_blocks_ << std::endl; + + for (unsigned int i = 0; i < num_payload_blocks_; ++i) + { + std::cout << "BlockLength[" << i << "] = " << payload_block_lengths_[i] << std::endl; + } +} + diff --git a/ACE/ace/PIP_Messages.h b/ACE/ace/PIP_Messages.h new file mode 100644 index 00000000000..683bedb1ff9 --- /dev/null +++ b/ACE/ace/PIP_Messages.h @@ -0,0 +1,446 @@ + /** + * @file PIP_Messages + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + * This file contains the specification for a heirarchy of + * classes that represent the various messages used in the + * priority inheritance protocol +*/ + +#ifndef _PIP_MESSAGES_H_ +#define _PIP_MESSAGES_H_ + +#include "ace/Message_Block.h" +#include "ace/SOCK_Stream.h" +#include "ace/Vector_T.h" + +#include <iostream> + +/** + * @class ACE_PIP_Message + * @brief Base class for all messages used in + * the implementation of a distributed priority inheritance + * protocol. + * + * Base class for all messages used in the implementation of a distributed + * priority inheritance protocol. Provides an interface for message (de)serialization, + * message chaining, packing, unpacking, and payload ownership transfer + */ +class ACE_Export ACE_PIP_Message +{ + public: + + ACE_PIP_Message(); + virtual ~ACE_PIP_Message(); + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream) = 0; + + /// Get the next message struct. + virtual ACE_PIP_Message* get_next(); + + /// Set the next message struct. + virtual void set_next(ACE_PIP_Message* next); + + /// Returns the next message, making the caller + /// the new owner. + virtual ACE_PIP_Message* release_next(); + + /// Get the message block. + virtual ACE_Message_Block* get_block(); + + /// Set the message block and populate the message struct + /// with message contents. + virtual void set_block(ACE_Message_Block* block); + + /// Get the message block, making the caller the new owner. + virtual ACE_Message_Block* release_block(); + + /// Place the values in the message struct into the message block. + virtual void pack() = 0; + + /// Populate the message struct using values from the message block. + virtual void unpack() = 0; + + /// This is temporarily public to facilitate testing. + /// It should eventually be made private. + ACE_Message_Block* block_; + + /// Print the contents of this struct to stdout. + virtual void print() const = 0; + + + protected: + + // Indicates values in structure are newer than values in the + // message block. + bool dirty_; + + ACE_PIP_Message* next_; +}; + +/** + * @class ACE_PIP_Data_Message + * @brief Structure representing the fields of an application- + * level protocol message and associated header values + * + * Structure representing the fields of an appliation level + * protocol message and associated header values. Structure is that + * of several contiguous ACE_Message_Block's. The message is configurable + * to support any application-level protocol that contains at least the following + * data: source address, destination address, reply expectation, and priority + * +*/ +class ACE_Export ACE_PIP_Data_Message : public ACE_PIP_Message +{ + public: + + ACE_PIP_Data_Message(); + virtual ~ACE_PIP_Data_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + /// Determine if a reply message is expected + bool get_reply_expected() const; + void set_reply_expected(bool expected); + + /// Determine the priority at which this message should be handled + ACE_UINT32 get_message_priority() const; + void set_message_priority(ACE_UINT32 priority); + + /// Determine the ID of the destination handler + ACE_UINT32 get_destination_handler_ID() const; + void set_destination_handler_ID(ACE_UINT32 ID); + + /// Determine the ID of the sending handler + ACE_UINT32 get_source_handler_ID() const; + void set_source_handler_ID(ACE_UINT32 ID); + + /// Determine the ID of the destination site + ACE_UINT32 get_destination_site_ID() const; + void set_destination_site_ID(ACE_UINT32 ID); + + /// Determine the ID of the sending site + ACE_UINT32 get_source_site_ID() const; + void set_source_site_ID(ACE_UINT32 ID); + + // Place the values from the struct into the message blocks. + virtual void pack(); + + // Extract the values from the message blocks into the structs. + virtual void unpack(); + + /// Print the contents of this struct to stdout. + virtual void print() const; + + private: + + ACE_UINT32 message_priority_; + bool reply_expected_; + ACE_UINT32 source_handler_ID_; + ACE_UINT32 destination_handler_ID_; + ACE_UINT32 source_site_ID_; + ACE_UINT32 destination_site_ID_; +}; + +/** + * @class ACE_PIP_Protocol_Message + * @brief Structure representing a message supported by the priority + * inheritance protocol + * +*/ + +class ACE_Export ACE_PIP_Accel_Message : public ACE_PIP_Message +{ + public: + + ACE_PIP_Accel_Message(); + virtual ~ACE_PIP_Accel_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + ACE_UINT32 get_old_priority() const; + void set_old_priority(ACE_UINT32 priority); + + ACE_UINT32 get_new_priority() const; + void set_new_priority(ACE_UINT32 priority); + + /// Get the address of the application receiving the message. + ACE_UINT32 get_destination_address() const; + void set_destination_address(const ACE_UINT32& address); + + u_short get_destination_port() const; + void set_destination_port(u_short port); + + /// Place the values in the message struct into the message block. + virtual void pack(); + + /// Extract the values from the message block and store them in the struct. + virtual void unpack(); + + /// Print the contents of this struct to stdout. + virtual void print() const; + + /// Return a copy of the this message + ACE_PIP_Accel_Message* copy(); + + private: + + const ACE_UINT32 ACCEL_HEADER_LENGTH_; + ACE_UINT32 destination_address_; + u_short destination_port_; + ACE_UINT32 new_priority_; + ACE_UINT32 old_priority_; +}; + +/** + * @class ACE_PIP_Accel_Message + * @brief Structure representing an acceleration message + * used in the implementation of a priority inheritance protocol + * + * Structure representing an acceleration message used in the + * implementation of a priority inheritance protocol. Indicates the + * old and new priority of the targeted process, as well as the address + * of handler to which the associated message was sent. +*/ +class ACE_Export ACE_PIP_Protocol_Message : public ACE_PIP_Message +{ + public: + + enum Message_Type { NONE, ACCEL, DATA, REQUEST, RESPONSE }; + + ACE_PIP_Protocol_Message(); + virtual ~ACE_PIP_Protocol_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + /// Receive the contents of this message from the stream. + virtual int deserialize(ACE_SOCK_Stream& stream); + + /// Set the next message in the chain. + virtual void set_next(ACE_PIP_Message* next); + + /// Determine the type of message this header has been tacked onto. + Message_Type get_message_type() const; + void set_message_type(Message_Type type); + + /// Determine which call chain this message is associated with. + ACE_UINT64 get_message_id() const; + void set_message_id(ACE_UINT64 id); + + /// Attach message block as payload of priority inheritance + /// protocol message. + void process_message_payload(ACE_Message_Block* payload); + + virtual void pack(); + virtual void unpack(); + + /// Print the contents of this struct to stdout. + virtual void print() const; + + /// Make a copy of the header of this message, i.e. without + /// data or accel payload + ACE_PIP_Protocol_Message* copy(); + + const int FIXED_HEADER_LENGTH_; + + private: + + Message_Type message_type_; + ACE_UINT32 num_payload_blocks_; + ACE_Vector<ACE_UINT32> payload_block_lengths_; + ACE_UINT64 message_id_; +}; + + +/************************************************** + * + * ACE_PIP_Message - Inline Methods + * + **************************************************/ +inline ACE_PIP_Message* ACE_PIP_Message::get_next() +{ + return next_; +} + +inline void ACE_PIP_Message::set_next(ACE_PIP_Message* message) +{ + next_ = message; +} + +inline ACE_PIP_Message* ACE_PIP_Message::release_next() +{ + ACE_PIP_Message* temp = next_; + next_ = 0; + return temp; +} + +inline ACE_Message_Block* ACE_PIP_Message::get_block() +{ + return block_; +} + +inline ACE_Message_Block* ACE_PIP_Message::release_block() +{ + ACE_Message_Block* temp_block = block_; + block_ = 0; + dirty_ = true; + return temp_block; +} + +/************************************************** + * + * ACE_PIP_Data_Message - Inline Methods + * + **************************************************/ + +inline bool ACE_PIP_Data_Message::get_reply_expected() const +{ + return reply_expected_; +} + +inline void ACE_PIP_Data_Message::set_reply_expected(bool expected) +{ + dirty_ = true; + reply_expected_ = expected; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::get_message_priority() const +{ + return message_priority_; +} + +inline void ACE_PIP_Data_Message::set_message_priority(ACE_UINT32 priority) +{ + dirty_ = true; + message_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_handler_ID() const +{ + return destination_handler_ID_; +} + +inline void ACE_PIP_Data_Message::set_destination_handler_ID(ACE_UINT32 ID) +{ + destination_handler_ID_ = ID; + dirty_ = true; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::get_source_handler_ID() const +{ + return source_handler_ID_; +} + +inline void ACE_PIP_Data_Message::set_source_handler_ID(ACE_UINT32 ID) +{ + source_handler_ID_ = ID; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::get_source_site_ID() const +{ + return source_site_ID_; +} + +inline void ACE_PIP_Data_Message::set_source_site_ID(ACE_UINT32 ID) +{ + source_site_ID_ = ID; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::get_destination_site_ID() const +{ + return destination_site_ID_; +} + +inline void ACE_PIP_Data_Message::set_destination_site_ID(ACE_UINT32 ID) +{ + destination_site_ID_ = ID; +} + +/************************************************** + * + * ACE_PIP_Accel_Message - Inline Methods + * + **************************************************/ + +inline ACE_UINT32 ACE_PIP_Accel_Message::get_old_priority() const +{ + return old_priority_; +} + +inline void ACE_PIP_Accel_Message::set_old_priority(ACE_UINT32 priority) +{ + dirty_ = true; + old_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Accel_Message::get_new_priority() const +{ + return new_priority_; +} + +inline void ACE_PIP_Accel_Message::set_new_priority(ACE_UINT32 priority) +{ + dirty_ = true; + new_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Accel_Message::get_destination_address() const +{ + return destination_address_; +} + +inline void ACE_PIP_Accel_Message::set_destination_address(const ACE_UINT32& address) +{ + dirty_ = true; + destination_address_ = address; +} + +inline u_short ACE_PIP_Accel_Message::get_destination_port() const +{ + return destination_port_; +} + +inline void ACE_PIP_Accel_Message::set_destination_port(u_short port) +{ + destination_port_ = port; +} + +/************************************************** + * + * ACE_PIP_Protocol_Message - Inline Methods + * + **************************************************/ + +inline void ACE_PIP_Protocol_Message:: + set_message_type(ACE_PIP_Protocol_Message::Message_Type type) +{ + message_type_ = type; + dirty_ = true; +} + +inline ACE_PIP_Protocol_Message::Message_Type ACE_PIP_Protocol_Message:: + get_message_type() const +{ + return message_type_; +} + +inline ACE_UINT64 ACE_PIP_Protocol_Message::get_message_id() const +{ + return message_id_; +} + +inline void ACE_PIP_Protocol_Message::set_message_id(ACE_UINT64 id) +{ + dirty_ = true; + message_id_ = id; +} + +#endif + diff --git a/ACE/ace/PIP_Reactive_IO_Handler.cpp b/ACE/ace/PIP_Reactive_IO_Handler.cpp new file mode 100644 index 00000000000..925be857608 --- /dev/null +++ b/ACE/ace/PIP_Reactive_IO_Handler.cpp @@ -0,0 +1,64 @@ +// $Id$ + +#include "ace/OS_NS_sys_time.h" +#include "ace/PIP_Reactive_IO_Handler.h" +#include "ace/PIP_Invocation_Manager.h" + +/// Constructor +ACE_PIP_Reactive_IO_Handler::ACE_PIP_Reactive_IO_Handler() +{ +} + +ACE_PIP_Reactive_IO_Handler::~ACE_PIP_Reactive_IO_Handler() +{ +} + +/// Closes all remote connections. +int ACE_PIP_Reactive_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) +{ + int result(0); + switch(close_mask) + { + case ACE_Event_Handler::READ_MASK: + read_closed_ = true; + break; + case ACE_Event_Handler::WRITE_MASK: + write_closed_ = true; + break; + }; + + if (read_closed_ && write_closed_) + { + // Close our end of the connection + peer_.close_reader(); + peer_.close_writer(); + + // un-register with invocation manager so it doesn't + // try to use the handler for IO + ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); + + delete this; + return -1; + } + + return 0; +} + + +/// Enqueue a message to be sent +int ACE_PIP_Reactive_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) +{ + big_lock_.acquire(); + outgoing_message_queue_.enqueue_head(message); + big_lock_.release(); + + // Register so Reactor tells us to send the message + ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::WRITE_MASK); + ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK); + + return 0; +} + + + + diff --git a/ACE/ace/PIP_Reactive_IO_Handler.h b/ACE/ace/PIP_Reactive_IO_Handler.h new file mode 100644 index 00000000000..ae50ebf9b27 --- /dev/null +++ b/ACE/ace/PIP_Reactive_IO_Handler.h @@ -0,0 +1,54 @@ + /** + * @file PIP_IO_Handler.cpp + * + * // $Id$ + * + * @author John Moore <ljohn7@gmail.com> + * + * This file contains the specification for a class + * that manages network I/O +*/ + + +#ifndef _PIP_REACTIVE_IO_HANDLER_H_ +#define _PIP_REACTIVE_IO_HANDLER_H_ + + +#include "ace/Message_Queue.h" +#include "ace/PIP_IO_Handler.h" +#include "ace/PIP_Messages.h" + +/** + * @class ACE_PIP_Reactive_IO_Handler + * + * @brief Performs reactive network I/O in + * the context of a distributed system + * employing the the priority inheritance + * protocol + * + * @author John Moore <ljohn7@gmail.com> + */ +class ACE_Export ACE_PIP_Reactive_IO_Handler : + public ACE_PIP_IO_Handler +{ + public: + + /// Constructor + ACE_PIP_Reactive_IO_Handler (); + ~ACE_PIP_Reactive_IO_Handler(); + + /// Closes all remote connections. + virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); + + /// Enqueue a message to be sent + virtual int put_message (ACE_PIP_Protocol_Message* message); + + private: + +}; + + + +#endif /* _PIP_Reactive_IO_Handler_H_ */ + + |