summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-07-25 03:00:01 +0000
committerjmoore <jmoore@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2008-07-25 03:00:01 +0000
commitec9c7fc2857ffd49059789828843c555d9c428ca (patch)
tree24f9c4ed5b8826f4d1c152b492978482803aa7ee
parent61c9e259def180a489922620437cf5e2693b3b42 (diff)
downloadATCD-ec9c7fc2857ffd49059789828843c555d9c428ca.tar.gz
initial Priority Inheritance Protocol commit
-rw-r--r--ACE/ace/MT_Priority_Reactor.cpp611
-rw-r--r--ACE/ace/MT_Priority_Reactor.h252
-rw-r--r--ACE/ace/PIP_Active_IO_Handler.cpp109
-rw-r--r--ACE/ace/PIP_Active_IO_Handler.h55
-rw-r--r--ACE/ace/PIP_Connection_Manager.cpp207
-rw-r--r--ACE/ace/PIP_Connection_Manager.h81
-rw-r--r--ACE/ace/PIP_DA_Strategy_Adapter.cpp4
-rw-r--r--ACE/ace/PIP_DA_Strategy_Adapter.h258
-rw-r--r--ACE/ace/PIP_Dispatcher.cpp468
-rw-r--r--ACE/ace/PIP_Dispatcher.h188
-rw-r--r--ACE/ace/PIP_IO_Handler.cpp151
-rw-r--r--ACE/ace/PIP_IO_Handler.h103
-rw-r--r--ACE/ace/PIP_IO_Handler.inl36
-rw-r--r--ACE/ace/PIP_Invocation_Manager.cpp316
-rw-r--r--ACE/ace/PIP_Invocation_Manager.h150
-rw-r--r--ACE/ace/PIP_Message_Handler.cpp38
-rw-r--r--ACE/ace/PIP_Message_Handler.h82
-rw-r--r--ACE/ace/PIP_Message_Handler.inl22
-rw-r--r--ACE/ace/PIP_Messages.cpp577
-rw-r--r--ACE/ace/PIP_Messages.h255
-rw-r--r--ACE/ace/PIP_Messages.inl186
-rw-r--r--ACE/ace/PIP_Reactive_IO_Handler.cpp64
-rw-r--r--ACE/ace/PIP_Reactive_IO_Handler.h54
23 files changed, 4267 insertions, 0 deletions
diff --git a/ACE/ace/MT_Priority_Reactor.cpp b/ACE/ace/MT_Priority_Reactor.cpp
new file mode 100644
index 00000000000..582b59ad25b
--- /dev/null
+++ b/ACE/ace/MT_Priority_Reactor.cpp
@@ -0,0 +1,611 @@
+// $Id$
+
+#include "ace/MT_Priority_Reactor.h"
+#include "ace/Thread.h"
+#include "ace/Timer_Queue.h"
+#include "ace/Sig_Handler.h"
+#include "ace/Log_Msg.h"
+#include "ace/OS_NS_sys_time.h"
+
+#include <iostream> // for debugging
+
+#if !defined (__ACE_INLINE__)
+#include "ace/MT_Priority_Reactor.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID (ace,
+ MT_Priority_Reactor,
+ "$Id$")
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+ACE_ALLOC_HOOK_DEFINE (ACE_MT_Priority_Reactor)
+
+ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor (ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals,
+ int s_queue)
+: ACE_TP_Reactor (sh, tq, mask_signals, s_queue)
+ , _bucket(0)
+ , _bucketize_socket_events(false)
+ , _current_priority(ACE_Event_Handler::HI_PRIORITY)
+ , _max_priority(0)
+ , _min_priority(0)
+ , _num_socket_events(0)
+ , _tuple_allocator(0)
+{
+ ACE_TRACE ("ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor");
+ this->init_bucket();
+}
+
+ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor (size_t max_number_of_handles,
+ int restart,
+ ACE_Sig_Handler *sh,
+ ACE_Timer_Queue *tq,
+ int mask_signals,
+ int s_queue)
+ : ACE_TP_Reactor (max_number_of_handles, restart, sh, tq, mask_signals, s_queue)
+ , _bucket(0)
+ , _current_priority(ACE_Event_Handler::HI_PRIORITY)
+ , _max_priority(0)
+ , _min_priority(0)
+ , _num_socket_events(0)
+ , _tuple_allocator(0)
+{
+ ACE_TRACE ("ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor");
+ this->init_bucket();
+}
+
+ACE_MT_Priority_Reactor::~ACE_MT_Priority_Reactor()
+{
+ ACE_TRACE ("ACE_MT_Priority_Reactor::~ACE_MT_Priority_Reactor");
+ for (int index = 0; index < npriorities; ++index)
+ {
+ delete _bucket[index];
+ }
+
+ delete [] _bucket;
+ delete _tuple_allocator;
+}
+
+int
+ACE_MT_Priority_Reactor::handle_events (ACE_Time_Value &max_wait_time)
+{
+ return this->handle_events (&max_wait_time);
+}
+
+int
+ACE_MT_Priority_Reactor::handle_events (ACE_Time_Value *max_wait_time)
+{
+ ACE_TRACE ("ACE_MT_Priority_Reactor::handle_events");
+
+ // Stash the current time -- the destructor of this object will
+ // automatically compute how much time elapsed since this method was
+ // called.
+ ACE_Countdown_Time countdown (max_wait_time);
+
+
+ // Instantiate the token guard which will try grabbing the token for
+ // this thread.
+ ACE_TP_Token_Guard guard (this->token_);
+
+ int const result = guard.acquire_read_token (max_wait_time);
+
+ // If the guard is NOT the owner just return the retval
+ if (!guard.is_owner ())
+ return result;
+
+ // After getting the lock just just for deactivation.
+ if (this->deactivated_)
+ return -1;
+
+ // Update the countdown to reflect time waiting for the token.
+ countdown.update ();
+
+ return this->dispatch_handler (max_wait_time,
+ guard);
+}
+
+int
+ACE_MT_Priority_Reactor::dispatch_handler (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard)
+{
+ int num_events =
+ this->get_an_event_for_dispatching (max_wait_time);
+
+ int initial_event_count = num_events;
+ int result = 0;
+
+ result = this->handle_timer (num_events,
+ guard);
+
+ // If we've dispatched a timer event handler, return the thread
+ if (result > 0)
+ return result;
+
+ // Else just go ahead fall through for further handling.
+ if (num_events > 0)
+ {
+ // Next dispatch the notification handlers (if there are any to
+ // dispatch). These are required to handle multiple-threads
+ // that are trying to update the <Reactor>.
+ result = this->handle_notify (_num_socket_events,
+ guard);
+
+ if (result > 0)
+ return result;
+
+ // Else just fall through for further handling
+ }
+
+ if (num_events > 0)
+ {
+ // Sort events into priority buckets
+ if (_bucketize_socket_events)
+ {
+ bucketize_socket_events();
+ }
+
+ // Handle the highest priority event
+ result = this->handle_socket(num_events,
+ guard);
+ }
+
+ return result;
+}
+
+int
+ACE_MT_Priority_Reactor::get_an_event_for_dispatching (ACE_Time_Value *max_wait_time)
+{
+ // If the reactor handler state has changed, clear any remembered
+ // ready bits and re-scan from the master wait_set.
+ if (this->state_changed_)
+ {
+ this->ready_set_.rd_mask_.reset ();
+ this->ready_set_.wr_mask_.reset ();
+ this->ready_set_.ex_mask_.reset ();
+
+ this->state_changed_ = false;
+ }
+ else
+ {
+ // This is a hack... somewhere, under certain conditions (which
+ // I don't understand...) the mask will have all of its bits clear,
+ // yet have a size_ > 0. This is an attempt to remedy the affect,
+ // without knowing why it happens.
+
+ this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
+ this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
+ this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
+ }
+
+ if (_num_socket_events > 0)
+ return _num_socket_events;
+
+ else
+ {
+ int num_events = wait_for_multiple_events (this->ready_set_,
+ max_wait_time);
+
+ _bucketize_socket_events = true;
+
+ if (num_events > 0)
+ preprocess_new_event_set();
+
+ return num_events;
+ }
+}
+
+int
+ACE_MT_Priority_Reactor::handle_timer(int & /*event_count*/,
+ ACE_TP_Token_Guard &guard)
+{
+ if (this->timer_queue_ == 0 || this->timer_queue_->is_empty())
+ { // Empty timer queue so cannot have any expired timers.
+ return 0;
+ }
+
+ // Get the current time
+ ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () +
+ this->timer_queue_->timer_skew ());
+
+ // Look for a node in the timer queue whose timer <= the present
+ // time.
+ ACE_Timer_Node_Dispatch_Info info;
+
+ if (this->timer_queue_->dispatch_info (cur_time,
+ info))
+ {
+ const void *upcall_act = 0;
+
+ // Preinvoke.
+ this->timer_queue_->preinvoke (info,
+ cur_time,
+ upcall_act);
+
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ // call the functor
+ this->timer_queue_->upcall (info,
+ cur_time);
+
+ // Postinvoke
+ this->timer_queue_->postinvoke (info,
+ cur_time,
+ upcall_act);
+
+ // We have dispatched a timer
+ return 1;
+ }
+
+ return 0;
+}
+
+int
+ACE_MT_Priority_Reactor::handle_notify(int & /*event_count*/,
+ ACE_TP_Token_Guard &guard)
+{
+
+ // Get the handle on which notify calls could have occured
+ ACE_HANDLE notify_handle =
+ this->get_the_notify_handle ();
+
+ int result = 0;
+
+ // The notify was not in the list returned by
+ // wait_for_multiple_events ().
+ if (notify_handle == ACE_INVALID_HANDLE)
+ return result;
+
+ // Now just do a read on the pipe..
+ ACE_Notification_Buffer buffer;
+
+ // Clear the handle of the read_mask of our <ready_set_>
+ this->ready_set_.rd_mask_.clr_bit (notify_handle);
+
+ // Keep reading notifies till we empty it or till we have a
+ // dispatchable buffer
+ while (this->notify_handler_->read_notify_pipe (notify_handle,
+ buffer) > 0)
+ {
+ // Just figure out whether we can read any buffer that has
+ // dispatchable info. If not we have just been unblocked by
+ // another thread trying to update the reactor. If we get any
+ // buffer that needs dispatching we will dispatch that after
+ // releasing the lock
+ if (this->notify_handler_->is_dispatchable (buffer) > 0)
+ {
+ // Release the token before dispatching notifies...
+ guard.release_token ();
+
+ // Dispatch the upcall for the notify
+ this->notify_handler_->dispatch_notify (buffer);
+
+ // We had a successful dispatch.
+ result = 1;
+
+ // break out of the while loop
+ break;
+ }
+
+
+ }
+
+ // If we did some work, then we just return 1 which will allow us
+ // to get out of here. If we return 0, then we will be asked to do
+ // some work ie. dispacth socket events
+ return result;
+}
+
+int
+ACE_MT_Priority_Reactor::handle_socket(int &event_count,
+ ACE_TP_Token_Guard &guard)
+{
+ // We got the lock, lets handle some I/O events.
+ ACE_Handle_Dispatch_Info dispatch_info;
+
+ if (this->get_sock_event_info (dispatch_info) == 0)
+ {
+ return 0;
+ }
+
+ // If there is any event handler that is ready to be dispatched, the
+ // dispatch information is recorded in dispatch_info.
+ if (!dispatch_info.dispatch ())
+ {
+ // Check for removed handlers.
+ if (dispatch_info.event_handler_ == 0)
+ {
+ this->handler_rep_.unbind(dispatch_info.handle_,
+ dispatch_info.mask_);
+ }
+
+ return 0;
+ }
+
+ // Suspend the handler so that other threads don't start dispatching
+ // it, if we can't suspend then return directly
+ if (dispatch_info.event_handler_ != this->notify_handler_)
+ if (this->suspend_i (dispatch_info.handle_) == -1)
+ return 0;
+
+ // Call add_reference() if needed.
+ if (dispatch_info.reference_counting_required_)
+ dispatch_info.event_handler_->add_reference ();
+
+ // Do any pre-dispatch processing
+ preprocess_chosen_handler(dispatch_info);
+
+ // Release the lock so that another thread can begin dispatching
+ guard.release_token ();
+
+ int result = 0;
+
+ // Dispatched an event
+ if (this->dispatch_sock_event (dispatch_info) == 0)
+ ++result;
+
+ return result;
+}
+
+ACE_HANDLE
+ACE_MT_Priority_Reactor::get_the_notify_handle (void)
+{
+ // Call the notify handler to get a handle on which we would have a
+ // notify waiting
+ ACE_HANDLE const read_handle =
+ this->notify_handler_->notify_handle ();
+
+ // Check whether the rd_mask has been set on that handle. If so
+ // return the handle.
+ if (read_handle != ACE_INVALID_HANDLE &&
+ this->ready_set_.rd_mask_.is_set (read_handle))
+ {
+ return read_handle;
+ }
+
+ // None found..
+ return ACE_INVALID_HANDLE;
+}
+
+int
+ACE_MT_Priority_Reactor::get_sock_event_info (ACE_Handle_Dispatch_Info &event)
+{
+ int found_io = 0;
+
+ // Iterate through buckets to find the highest-priority socket event
+ if (_num_socket_events > 0)
+ {
+ ACE_Handle_Dispatch_Info event_info;
+ _current_priority = ACE_Event_Handler::HI_PRIORITY;
+ while (_current_priority >= _min_priority)
+ {
+ if (!_bucket[_current_priority]->is_empty())
+ {
+ size_t size = _bucket[_current_priority]->size();
+ size_t count = 0;
+ while (count < size)
+ {
+ ++count;
+ --_num_socket_events;
+ _bucket[_current_priority]->dequeue_head(event_info);
+ if (!this->is_suspended_i(event_info.handle_))
+ {
+ found_io = 1;
+ event = event_info;
+ break;
+ }
+ }
+ if (found_io)
+ {
+ break;
+ }
+ }
+ else
+ {
+ // There are no more messages at this priority, so continue
+ // to the next lowest priority
+ --_current_priority;
+ }
+ }
+ }
+
+ return found_io;
+}
+
+void ACE_MT_Priority_Reactor::init_bucket()
+{
+ // Create a tuple allocator
+ ACE_NEW (this->_tuple_allocator,
+ TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE));
+
+ // Create a list of pointers, one per priority level
+ ACE_NEW (this->_bucket,
+ QUEUE *[npriorities]);
+
+ // Create a bucket for each priority level
+ for (int i = 0; i < npriorities; ++i)
+ ACE_NEW (this->_bucket[i],
+ QUEUE (this->_tuple_allocator));
+}
+
+// Dispatches a single event handler
+int
+ACE_MT_Priority_Reactor::dispatch_sock_event (ACE_Handle_Dispatch_Info &dispatch_info)
+{
+ ACE_TRACE ("ACE_MT_Priority_Reactor::dispatch_socket_event");
+
+ ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
+ ACE_EH_PTMF const callback = dispatch_info.callback_;
+
+ // Check for removed handlers.
+ if (event_handler == 0)
+ return -1;
+
+ // Upcall. If the handler returns positive value (requesting a
+ // reactor callback) don't set the ready-bit because it will be
+ // ignored if the reactor state has changed. Just call back
+ // as many times as the handler requests it. Other threads are off
+ // handling other things.
+ int status = 1;
+
+ while (status > 0)
+ status = (event_handler->*callback) (dispatch_info.handle_);
+
+ // Post process socket event
+ return this->post_process_sock_event (dispatch_info, status);
+}
+
+int
+ACE_MT_Priority_Reactor::post_process_sock_event (ACE_Handle_Dispatch_Info &dispatch_info,
+ int status)
+{
+ int result = 0;
+
+ // First check if we really have to post process something, if not, then
+ // we don't acquire the token which saves us a lot of time.
+ if (status < 0 ||
+ (dispatch_info.event_handler_ != this->notify_handler_ &&
+ dispatch_info.resume_flag_ ==
+ ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
+ {
+ // Get the reactor token and with this token acquired remove first the
+ // handler and resume it at the same time. This must be atomic, see also
+ // bugzilla 2395. When this is not atomic it can be that we resume the
+ // handle after it is reused by the OS.
+ ACE_TP_Token_Guard guard (this->token_);
+
+ result = guard.acquire_token ();
+
+ // If the guard is NOT the owner just return the retval
+ if (!guard.is_owner ())
+ return result;
+
+ // A different event handler may have been registered during the
+ // upcall if the handle was closed and then reopened, for
+ // example. Make sure we're removing and/or resuming the event
+ // handler used during the upcall.
+ ACE_Event_Handler const * const eh =
+ this->handler_rep_.find (dispatch_info.handle_);
+
+ // Only remove or resume the event handler used during the
+ // upcall.
+ if (eh == dispatch_info.event_handler_)
+ {
+ if (status < 0)
+ {
+ result =
+ this->remove_handler_i (dispatch_info.handle_,
+ dispatch_info.mask_);
+ }
+
+ // Resume handler if required.
+ if (dispatch_info.event_handler_ != this->notify_handler_ &&
+ dispatch_info.resume_flag_ ==
+ ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
+ this->resume_i (dispatch_info.handle_);
+ }
+ }
+
+ // Call remove_reference() if needed.
+ if (dispatch_info.reference_counting_required_)
+ dispatch_info.event_handler_->remove_reference ();
+
+ return result;
+}
+
+void
+ACE_MT_Priority_Reactor::notify_handle (ACE_HANDLE,
+ ACE_Reactor_Mask,
+ ACE_Handle_Set &,
+ ACE_Event_Handler *eh,
+ ACE_EH_PTMF)
+{
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_MT_Priority_Reactor::notify_handle: ")
+ ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n")));
+
+ ACE_ASSERT (eh == 0);
+ ACE_UNUSED_ARG (eh);
+}
+
+int ACE_MT_Priority_Reactor::bucketize_socket_events()
+{
+ int result(0);
+ _bucketize_socket_events = false;
+ ACE_HANDLE handle;
+
+ // Place each active handle in the appropriate priority bucket
+ ACE_Handle_Set_Iterator wr_handle_iter (this->ready_set_.wr_mask_);
+ while ((handle = wr_handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Dispatch_Info event;
+
+ // To avoid dispatching an event whose associated handle
+ // is already suspended, don't place that event in the bucket
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Create a token containing all information
+ // necessary to dispatch the event at the appropriate
+ // moment
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::WRITE_MASK,
+ &ACE_Event_Handler::handle_output);
+
+
+ ++_num_socket_events;
+
+ // Store the token in the appropriate bucket
+ _bucket[event.event_handler_->priority()]->enqueue_tail(event);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ }
+
+ ACE_Handle_Set_Iterator ex_handle_iter (this->ready_set_.ex_mask_);
+ while ((handle = ex_handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Dispatch_Info event;
+
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::EXCEPT_MASK,
+ &ACE_Event_Handler::handle_exception);
+
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ }
+
+ ACE_Handle_Set_Iterator rd_handle_iter (this->ready_set_.rd_mask_);
+ while ((handle = rd_handle_iter ()) != ACE_INVALID_HANDLE)
+ {
+ ACE_Handle_Dispatch_Info event;
+
+ if (this->is_suspended_i (handle))
+ continue;
+
+ // Remember this info
+ event.set (handle,
+ this->handler_rep_.find (handle),
+ ACE_Event_Handler::READ_MASK,
+ &ACE_Event_Handler::handle_input);
+
+ ++_num_socket_events;
+ _bucket[event.event_handler_->priority()]->enqueue_tail(event);
+ this->ready_set_.wr_mask_.clr_bit (handle);
+ this->ready_set_.ex_mask_.clr_bit (handle);
+ this->ready_set_.rd_mask_.clr_bit (handle);
+ }
+
+ return result;
+}
+
+ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/MT_Priority_Reactor.h b/ACE/ace/MT_Priority_Reactor.h
new file mode 100644
index 00000000000..a8df0ae66c7
--- /dev/null
+++ b/ACE/ace/MT_Priority_Reactor.h
@@ -0,0 +1,252 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file MT_Priority_Reactor.h
+ *
+ * $Id$
+ *
+ * The ACE_MT_Priority_Reactor, like the ACE_TP_Reactor, uses the
+ * Leader/Followers pattern to demultiplex events among a pool of
+ * threads. When using a thread pool reactor, an application
+ * pre-spawns a _fixed_ number of threads. When these threads
+ * invoke the ACE_MT_Priority_Reactor's <handle_events> method, one thread
+ * will become the leader and wait for an event. The other
+ * follower threads will queue up waiting for their turn to become
+ * the leader. When an event occurs, the leader will pick a
+ * follower to become the leader and go on to handle the event.
+ * The consequence of using ACE_TP_Reactor is the amortization of
+ * the costs used to creating threads. The context switching cost
+ * will also reduce. More over, the total resources used by
+ * threads are bounded because there are a fixed number of threads.
+ *
+ * The ACE_MT_Priority_Reactor differs from the TP reactor in that it
+ * dispatches socket events in priority order, according to the priorites
+ * specified in each registered event handler
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+//=============================================================================
+
+
+#ifndef _ACE_MT_PRIORITY_REACTOR_H_
+#define _ACE_MT_PRIORITY_REACTOR_H_
+
+#include /**/ "ace/pre.h"
+
+#include "ace/TP_Reactor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+#include "ace/Malloc_T.h"
+#include "ace/Unbounded_Queue.h"
+
+#define npriorities ACE_Event_Handler::HI_PRIORITY-ACE_Event_Handler::LO_PRIORITY+1
+
+/**
+ * @class ACE_Handle_Dispatch_Info
+ *
+ * @brief This structure contains all information for needed to
+ * dispatch the corresponding event handler
+ */
+class ACE_Handle_Dispatch_Info
+{
+ public:
+
+ /// Default constructor
+ ACE_Handle_Dispatch_Info (void);
+
+ /// Method for setting
+ void set (ACE_HANDLE handle,
+ ACE_Event_Handler *event_handler,
+ ACE_Reactor_Mask mask,
+ ACE_EH_PTMF callback);
+
+ /// Dispatch indicates whether or not it's safe to dispatch
+ /// the associated handler
+ bool dispatch (void) const;
+
+ ACE_HANDLE handle_;
+ ACE_Event_Handler *event_handler_;
+ ACE_Reactor_Mask mask_;
+ ACE_EH_PTMF callback_;
+ int resume_flag_;
+ bool reference_counting_required_;
+
+ private:
+ bool dispatch_;
+};
+
+
+typedef ACE_Cached_Allocator
+< ACE_Node< ACE_Handle_Dispatch_Info >,
+ ACE_SYNCH_NULL_MUTEX > TUPLE_ALLOCATOR;
+
+typedef ACE_Unbounded_Queue_Iterator
+< ACE_Handle_Dispatch_Info > QUEUE_ITERATOR;
+
+/**
+ * @class ACE_MT_Priority_Reactor
+ *
+ * @brief Specialization of Select Reactor to support prioritized, thread-pool
+ * based event dispatching.
+ *
+ * The multi-threaded priority reactor combines the benefits of
+ * the thread-pool-based TP_Reactor with the prioritized dispatching of
+ * event handlers - a feature found in the single-threaded priority reactor
+ */
+class ACE_Export ACE_MT_Priority_Reactor : public ACE_TP_Reactor
+{
+public:
+
+ /// Initialize ACE_MT_Priority_Reactor with the default size.
+ ACE_MT_Priority_Reactor (ACE_Sig_Handler * = 0,
+ ACE_Timer_Queue * = 0,
+ int mask_signals = 1,
+ int s_queue = ACE_Select_Reactor_Token::FIFO);
+
+ /**
+ * Initialize the ACE_MT_Priority_Reactor to manage
+ * @a max_number_of_handles. If @a restart is non-0 then the
+ * ACE_Reactor's @c handle_events() method will be restarted
+ * automatically when @c EINTR occurs. If @a sh or
+ * @a tq are non-0 they are used as the signal handler and
+ * timer queue, respectively.
+ */
+ ACE_MT_Priority_Reactor (size_t max_number_of_handles,
+ int restart = 0,
+ ACE_Sig_Handler *sh = 0,
+ ACE_Timer_Queue *tq = 0,
+ int mask_signals = 1,
+ int s_queue = ACE_Select_Reactor_Token::FIFO);
+
+
+ /**
+ *Destructor
+ */
+ virtual ~ACE_MT_Priority_Reactor();
+
+ /**
+ * This event loop driver that blocks for @a max_wait_time before
+ * returning. It will return earlier if timer events, I/O events,
+ * or signal events occur. Note that @a max_wait_time can be 0, in
+ * which case this method blocks indefinitely until events occur.
+ *
+ * @a max_wait_time is decremented to reflect how much time this call
+ * took. For instance, if a time value of 3 seconds is passed to
+ * handle_events and an event occurs after 2 seconds,
+ * @a max_wait_time will equal 1 second. This can be used if an
+ * application wishes to handle events for some fixed amount of
+ * time.
+ *
+ * Returns the total number of ACE_Event_Handlers that were
+ * dispatched, 0 if the @a max_wait_time elapsed without dispatching
+ * any handlers, or -1 if something goes wrong.
+ */
+ virtual int handle_events (ACE_Time_Value *max_wait_time = 0);
+
+ virtual int handle_events (ACE_Time_Value &max_wait_time);
+
+
+ /// Called from handle events
+ static void no_op_sleep_hook (void *);
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+ protected:
+
+ /// Template method from the base class.
+ virtual void clear_dispatch_mask (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask);
+
+ /// Dispatch just 1 signal, timer, notification handlers
+ int dispatch_handler (ACE_Time_Value *max_wait_time,
+ ACE_TP_Token_Guard &guard);
+
+ /// Template method called when new, non-empty set of events is
+ /// returned from call to select
+ virtual void preprocess_new_event_set () {}
+
+ /// Template method called when an event handler has been
+ /// selected for dispatch
+ virtual void preprocess_chosen_handler (ACE_Handle_Dispatch_Info dispatch_info) {}
+
+ /// Get the event that needs dispatching. It could be either a
+ /// signal, timer, notification handlers or return possibly 1 I/O
+ /// handler for dispatching. In the most common use case, this would
+ /// return 1 I/O handler for dispatching
+ int get_an_event_for_dispatching (ACE_Time_Value *max_wait_time);
+
+ /// Handle timer events
+ int handle_timer(int &event_count,
+ ACE_TP_Token_Guard &g);
+
+ /// Handle notify events
+ int handle_notify (int &event_count,
+ ACE_TP_Token_Guard &g);
+
+ /// handle socket events
+ int handle_socket(int &event_count,
+ ACE_TP_Token_Guard &g);
+
+ /// This method shouldn't get called.
+ virtual void notify_handle (ACE_HANDLE handle,
+ ACE_Reactor_Mask mask,
+ ACE_Handle_Set &,
+ ACE_Event_Handler *eh,
+ ACE_EH_PTMF callback);
+ private:
+
+ typedef ACE_Unbounded_Queue< ACE_Handle_Dispatch_Info > QUEUE;
+
+ /// Deny access since member-wise won't work...
+ ACE_MT_Priority_Reactor (const ACE_MT_Priority_Reactor &);
+ ACE_MT_Priority_Reactor &operator = (const ACE_MT_Priority_Reactor &);
+
+ /// Get the handle of the notify pipe from the ready set if there is
+ /// an event in the notify pipe.
+ ACE_HANDLE get_the_notify_handle (void);
+
+ /// Get socket event dispatch information.
+ int get_sock_event_info (ACE_Handle_Dispatch_Info &info);
+
+ /// Allocates storage for event handler tokens
+ void init_bucket (void);
+
+ /// Notify the appropriate <callback> in the context of the <eh>
+ /// associated with <handle> that a particular event has occurred.
+ int dispatch_sock_event (ACE_Handle_Dispatch_Info &dispatch_info);
+
+ /// Clear the @a handle from the read_set
+ void clear_read_set (ACE_HANDLE handle);
+
+ int post_process_sock_event (ACE_Handle_Dispatch_Info &dispatch_info,int status);
+
+
+ /// Divide existing socket events into buckets by priority
+ int bucketize_socket_events();
+
+ // Private data members
+ QUEUE** _bucket;
+ bool _bucketize_socket_events;
+ ACE_Allocator* _tuple_allocator;
+ int _num_socket_events;
+ int _current_priority;
+ int _min_priority;
+ int _max_priority;
+};
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/MT_Priority_Reactor.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_MT_PRIORITY_REACTOR_H */
diff --git a/ACE/ace/PIP_Active_IO_Handler.cpp b/ACE/ace/PIP_Active_IO_Handler.cpp
new file mode 100644
index 00000000000..e6fe89bd348
--- /dev/null
+++ b/ACE/ace/PIP_Active_IO_Handler.cpp
@@ -0,0 +1,109 @@
+// $Id$
+
+#include "ace/PIP_Active_IO_Handler.h"
+
+/// Constructor
+ACE_PIP_Active_IO_Handler::ACE_PIP_Active_IO_Handler()
+ : shutdown_(false)
+{
+ // acquire the shutdown lock so that when shutdown_svc is called,
+ // the caller cannot return until shutdown has been completed and
+ // lock relinquished
+ this->shutdown_lock_.acquire();
+}
+
+/// Closes all remote connections.
+int ACE_PIP_Active_IO_Handler::handle_close (ACE_HANDLE handle,
+ ACE_Reactor_Mask close_mask)
+{
+ ACE_UNUSED_ARG(handle);
+ switch(close_mask)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ this->read_closed_ = true;
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ this->write_closed_ = true;
+ break;
+ };
+
+ if (read_closed_ && write_closed_)
+ {
+ // Close our end of the connection
+ this->peer_.close_reader();
+ this->peer_.close_writer();
+ delete this;
+ return -1;
+ }
+
+ return 0;
+}
+
+/// Enqueue a message to be sent
+int ACE_PIP_Active_IO_Handler::put_message (ACE_PIP_Protocol_Message* message)
+{
+ int result = this->outgoing_message_queue_.enqueue(message);
+ if (result > -1)
+ {
+ return result;
+ }
+
+ return -1;
+}
+
+int ACE_PIP_Active_IO_Handler::svc()
+{
+ int result(0);
+ ssize_t bytes_available(0);
+ char byte;
+
+ // Run until told to shutdown
+ while (!this->shutdown_)
+ {
+ // Peek to see if an incoming message available
+ bytes_available = this->peer_.recv(&byte, 1, MSG_PEEK);
+ if (bytes_available > 0)
+ {
+ this->handle_input();
+ }
+
+ // Handle outgoing messages
+ result = this->handle_output();
+ if (result == -2)
+ {
+ // Indicate to caller that the
+ // handler is no longer active
+ return -1;
+ }
+
+ bytes_available = 0;
+ }
+
+ return 0;
+}
+
+void ACE_PIP_Active_IO_Handler::shutdown_svc()
+{
+ this->shutdown_ = true;
+ this->shutdown_lock_.acquire();
+
+ this->handle_close(0,
+ ACE_Event_Handler::READ_MASK |
+ ACE_Event_Handler::WRITE_MASK);
+
+}
+
+int ACE_PIP_Active_IO_Handler::open(void*)
+{
+ int result = this->activate();
+ if (result > 0)
+ {
+ return 0;
+ }
+
+ return -1;
+}
+
+
+
+
diff --git a/ACE/ace/PIP_Active_IO_Handler.h b/ACE/ace/PIP_Active_IO_Handler.h
new file mode 100644
index 00000000000..9d2ed86dcde
--- /dev/null
+++ b/ACE/ace/PIP_Active_IO_Handler.h
@@ -0,0 +1,55 @@
+ /**
+ * @file PIP_Active_IO_Handler.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that manages network I/O in a dedicated thread
+*/
+
+
+#ifndef PIP_ACTIVE_IO_HANDLER_H
+#define PIP_ACTIVE_IO_HANDLER_H
+
+#include "ace/Mutex.h"
+#include "ace/PIP_IO_Handler.h"
+
+/**
+ * @class ACE_PIP_Active_IO_Handler
+ *
+ * @brief Performs network I/O in a dedicated thread
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+class ACE_Export ACE_PIP_Active_IO_Handler :
+ public ACE_PIP_IO_Handler
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Active_IO_Handler ();
+
+ /// Enqueue a message to be sent
+ virtual int put_message (ACE_PIP_Protocol_Message* message);
+
+ /// Closes all remote connections.
+ virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
+
+ /// Performs message I/O
+ virtual int svc();
+
+ /// Shuts down the service. Result is handler deactivated and
+ /// deleted
+ void shutdown_svc();
+
+ virtual int open(void* = 0);
+
+ private:
+
+ bool shutdown_;
+ ACE_Mutex shutdown_lock_;
+};
+
+#endif /* _PIP_Active_IO_Handler_H_ */
diff --git a/ACE/ace/PIP_Connection_Manager.cpp b/ACE/ace/PIP_Connection_Manager.cpp
new file mode 100644
index 00000000000..04d4ede7150
--- /dev/null
+++ b/ACE/ace/PIP_Connection_Manager.cpp
@@ -0,0 +1,207 @@
+ /**
+ * @file PIP_Connection_Manager.cpp
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ */
+
+#include <ace/OS_NS_stdio.h>
+#include <ace/OS_NS_string.h>
+#include <stdio.h>
+#include <ace/INET_Addr.h>
+#include <ace/PIP_Connection_Manager.h>
+
+
+ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::connection_manager_ = 0;
+ACE_Mutex ACE_PIP_Connection_Manager::instance_lock_;
+bool ACE_PIP_Connection_Manager::delete_manager_ = false;
+
+/// Default Constructor
+ACE_PIP_Connection_Manager::ACE_PIP_Connection_Manager()
+{
+}
+
+/// Destructor
+ACE_PIP_Connection_Manager::~ACE_PIP_Connection_Manager()
+{
+
+}
+
+ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::instance()
+{
+ if (connection_manager_ == 0)
+ {
+ instance_lock_.acquire();
+
+ if (ACE_PIP_Connection_Manager::connection_manager_ == 0)
+ {
+ ACE_NEW_RETURN (ACE_PIP_Connection_Manager::connection_manager_,
+ ACE_PIP_Connection_Manager,
+ 0);
+
+ delete_manager_ = true;
+ }
+
+ instance_lock_.release();
+ }
+
+ return connection_manager_;
+}
+
+int ACE_PIP_Connection_Manager::establish_connections(ACE_UINT32 source_site_id)
+{
+ int result(0);
+
+ //establish connections
+ for (unsigned i = 0; i < (this->connection_definitions_)->size(); ++i)
+ {
+ if ((*(this->connection_definitions_))[i]->source_site_id ==
+ source_site_id)
+ {
+ ACE_INET_Addr address;
+ address.set((*(this->connection_definitions_))[i]->port,
+ (*(this->connection_definitions_))[i]->address);
+
+ if ((*(this->connection_definitions_))[i]->type ==
+ Connection_Definition::ACTIVE)
+ {
+ ACE_PIP_Active_IO_Handler* handler(0);
+ ACE_NEW_RETURN (handler, ACE_PIP_Active_IO_Handler, 0);
+
+ result = this->active_connector_.connect(handler, address);
+ if (result == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ handler->init(
+ (*(this->connection_definitions_))[i]->source_site_id,
+ (*(this->connection_definitions_))[i]->destination_site_id,
+ (*(this->connection_definitions_))[i]->priority);
+
+ this->handlers_.push_back(handler);
+ }
+ }
+ else
+ {
+ ACE_PIP_Reactive_IO_Handler* handler(0);
+ ACE_NEW_RETURN(handler, ACE_PIP_Reactive_IO_Handler, 0);
+ result = this->reactive_connector_.connect(handler, address);
+ if (result == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ handler->init(
+ (*(this->connection_definitions_))[i]->source_site_id,
+ (*(this->connection_definitions_))[i]->destination_site_id,
+ (*(this->connection_definitions_))[i]->priority);
+
+ this->handlers_.push_back(handler);
+ }
+ }
+ }
+
+ }
+
+ return result;
+}
+
+int ACE_PIP_Connection_Manager::process_connection_file(char* file_name)
+{
+ // Expecting the file to contain one tuple per line
+ // where each is of form (source_id, dest_id, dest_address,
+ // dest_port, priority)
+
+ ACE_TCHAR line[100];
+ char* token(0);
+ FILE* fp = ACE_OS::fopen(file_name, "r");
+ Connection_Definition* current_definition(0);
+ if (fp)
+ {
+ int num_entries(0);
+ fscanf(fp, "%d", &num_entries);
+ if (num_entries > 0)
+ {
+ ACE_NEW_RETURN(this->connection_definitions_,
+ ACE_Vector<Connection_Definition*>, 0);
+ for (int i = 0; i < num_entries; ++i)
+ {
+ ACE_NEW_RETURN(current_definition, Connection_Definition, 0);
+ fscanf(fp, "%s", line);
+ token = strtok(line, " (,)");
+ if (!token)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // Store the source site ID
+ current_definition->source_site_id = atoi(token);
+
+ token = ACE_OS::strtok(0, " (,)");
+ if (!token)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // destination site ID
+ current_definition->destination_site_id = atoi(token);
+
+ token = ACE_OS::strtok(0, " (,)");
+ if (!token)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // IP address
+ size_t length = ACE_OS::strlen(token);
+ ACE_NEW_RETURN(current_definition->address, char[length + 1], 0);
+ ACE_OS::strncpy(current_definition->address, token, length);
+ current_definition->address[length] = 0;
+
+ token = ACE_OS::strtok(0, " (,)");
+ if (!token)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // IP port
+ current_definition->port = atoi(token);
+
+ token = ACE_OS::strtok(0, " (,)");
+ if (!token)
+ {
+ delete current_definition;
+ return -1;
+ }
+
+ // Connection Priority
+ current_definition->priority = atoi(token);
+ current_definition->type = Connection_Definition::REACTIVE;
+ this->connection_definitions_->push_back(current_definition);
+ }
+ }
+ }
+ else
+ {
+ //error
+ return -1;
+ }
+
+ return 0;
+}
+
+const ACE_Vector<ACE_PIP_Connection_Manager::Connection_Definition*>*
+ACE_PIP_Connection_Manager::get_connections() const
+{
+ return this->connection_definitions_;
+}
+
diff --git a/ACE/ace/PIP_Connection_Manager.h b/ACE/ace/PIP_Connection_Manager.h
new file mode 100644
index 00000000000..fb7dd8e3064
--- /dev/null
+++ b/ACE/ace/PIP_Connection_Manager.h
@@ -0,0 +1,81 @@
+ /**
+ * @file PIP_Connection_Manager.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ */
+
+#ifndef PIP_CONNECTION_MANAGER_H
+#define PIP_CONNECTION_MANAGER_H
+
+#include <ace/Connector.h>
+#include <ace/PIP_Active_IO_Handler.h>
+#include <ace/PIP_Reactive_IO_Handler.h>
+#include <ace/Reactor.h>
+#include <ace/SOCK_Connector.h>
+#include <ace/Vector_T.h>
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include <fstream>
+#include <iostream>
+
+class ACE_Export ACE_PIP_Connection_Manager
+{
+ public:
+
+ /// Informationa associated with a connection
+ struct Connection_Definition
+ {
+ enum Handler_Type {ACTIVE, REACTIVE};
+
+ ACE_UINT32 source_site_id;
+ ACE_UINT32 destination_site_id;
+ ACE_TCHAR* address;
+ u_short port;
+ ACE_UINT32 priority;
+ Handler_Type type;
+ };
+
+ /// Default Constructor
+ ACE_PIP_Connection_Manager();
+
+ /// Destructor
+ virtual ~ACE_PIP_Connection_Manager();
+
+ /// obtain the single instance of the manager
+ static ACE_PIP_Connection_Manager* instance();
+
+ /// Extract all connection information from a file
+ virtual int process_connection_file(char* filename);
+
+ /// Establish all connection for which source_site_id is the source
+ virtual int establish_connections(ACE_UINT32 source_site_id);
+
+ const ACE_Vector<Connection_Definition*>* get_connections() const;
+
+ private:
+
+ ACE_Vector<Connection_Definition*>* connection_definitions_;
+
+ // The connector used to actively connect to a remote site
+ ACE_Connector<
+ ACE_PIP_Active_IO_Handler,
+ ACE_SOCK_Connector> active_connector_;
+
+ ACE_Connector<
+ ACE_PIP_Reactive_IO_Handler,
+ ACE_SOCK_Connector> reactive_connector_;
+
+ static ACE_PIP_Connection_Manager* connection_manager_;
+ static ACE_Mutex instance_lock_;
+ static bool delete_manager_;
+
+ ACE_Vector<ACE_PIP_IO_Handler*> handlers_;
+};
+
+#endif
diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.cpp b/ACE/ace/PIP_DA_Strategy_Adapter.cpp
new file mode 100644
index 00000000000..ecfd42a4747
--- /dev/null
+++ b/ACE/ace/PIP_DA_Strategy_Adapter.cpp
@@ -0,0 +1,4 @@
+// $Id$
+
+#include "PIP_DA_Strategy_Adapter.h"
+
diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.h b/ACE/ace/PIP_DA_Strategy_Adapter.h
new file mode 100644
index 00000000000..50028a69d77
--- /dev/null
+++ b/ACE/ace/PIP_DA_Strategy_Adapter.h
@@ -0,0 +1,258 @@
+ /**
+ * @file PIP_DA_Strategy_Adapter.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that adapts a deadlock avoidance strategy to additionally
+ * support priority inheritance protocol annotations
+*/
+
+
+#ifndef PIP_DA_STRATEGY_ADAPTER
+#define PIP_DA_STRATEGY_ADAPTER
+
+#include "ace/DA_Strategy_Base.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Mutex.h"
+#include "ace/Null_Mutex.h"
+
+/**
+ * @class ACE_PIP_DA_Strategy_Adapter
+ * @brief Extends deadlock avoidance strategies
+ * to support priority inheritance annotations
+ *
+ * Deadlock avoidance strategies associate a resource cost annotation
+ * with each handle. This class extends the strategies to support
+ * the association of annotations with each priority at which the
+ * handle can be dispatched, i.e. the priority at which the corresponding
+ * thread resource can dispatch the handle
+*/
+template <typename Handle_Id, typename Lock>
+class ACE_PIP_DA_Strategy_Adapter
+{
+ public:
+
+ /// Constructor that takes the deadlock avoidance strategy that
+ /// the Strategy Adapter adapts.
+ ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy);
+ ~ACE_PIP_DA_Strategy_Adapter();
+
+ /// Indicates whether allocating a thread to the handle
+ /// at the specified priority could potentially result in deadlock.
+ int is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Grant the handle a thread at the specified priority.
+ void grant(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Release the thread
+ void release(Handle_Id handle, ACE_UINT32 priority);
+
+ /// Determine the number of threads being managed by
+ /// the DA_Strategy adapter.
+ int get_max_threads();
+
+ /// Add an annotation value for the handle / priority pair.
+ int add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation);
+
+ /// Remove every annotation associated with this handle.
+ int remove_annotation (Handle_Id handle);
+ int remove_annotation (Handle_Id handle, ACE_UINT32 priority);
+
+private:
+
+ /// Associates each message handler with an internally generated id
+ /// which can be used, along with a priority, to lookup an annotation.
+ typedef ACE_Hash_Map_Manager_Ex<Handle_Id,
+ ACE_UINT32,
+ ACE_Hash<Handle_Id>,
+ ACE_Equal_To<Handle_Id>,
+ ACE_Null_Mutex> HANDLE_ID_MAP;
+
+ /// Associates each message handler with a set of potential priorities.
+ /// Message handler represented by internally generated id.
+ typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32,
+ ACE_Unbounded_Set<ACE_UINT32>*,
+ ACE_Hash<ACE_UINT32>,
+ ACE_Equal_To<ACE_UINT32>,
+ ACE_Null_Mutex> HANDLE_ID_PRIORITY_MAP;
+
+ /// Determines an id that uniquely identifies a handler/priority pair.
+ ACE_UINT64 hash_handle_id_and_priority(ACE_UINT32 handle_id,
+ ACE_UINT32 priority) const;
+
+ /// Generates an annotation ID given the actual handle and priority.
+ ACE_UINT64 get_annotation_id(Handle_Id handle, ACE_UINT32 priority);
+
+ DA_Strategy_Base<ACE_UINT64>* DA_strategy_;
+ HANDLE_ID_MAP handle_ids_;
+ HANDLE_ID_PRIORITY_MAP id_to_priority_map_;
+ Lock lock_;
+ ACE_UINT32 next_id_;
+};
+
+template <typename Handle_Id, typename Lock>
+ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base<ACE_UINT64>* DA_strategy)
+: DA_strategy_(DA_strategy)
+, next_id_(0)
+{
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::~ACE_PIP_DA_Strategy_Adapter()
+{
+ HANDLE_ID_PRIORITY_MAP::iterator it = this->id_to_priority_map_.begin();
+ for (; it != this->id_to_priority_map_.end(); ++it)
+ {
+ delete it->item();
+ }
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_INLINE int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::get_max_threads()
+{
+ return this->DA_strategy_->get_max_threads();
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_INLINE ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ hash_handle_id_and_priority(ACE_UINT32 handle_id, ACE_UINT32 priority) const
+{
+ ACE_UINT64 result = handle_id;
+ result = (result << 32) | priority;
+ return result;
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(this->lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ return this->DA_strategy_->is_deadlock_potential(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ grant(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(this->lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ return this->DA_strategy_->grant(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+void ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ release(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(this->lock_);
+ ACE_UINT64 annotation_id = get_annotation_id(handle, priority);
+ this->DA_strategy_->release(annotation_id);
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation)
+{
+ ACE_UINT32 internal_handle_id(0);
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+
+ ACE_Guard<Lock> guard(this->lock_);
+ if (this->handle_ids_.find(handle, internal_handle_id) == -1)
+ {
+ // This is the first time handle has been encountered, so generate an
+ // internal handle id.
+ internal_handle_id = this->next_id_++;
+ this->handle_ids_.bind(handle, internal_handle_id);
+ priorities = new ACE_Unbounded_Set<ACE_UINT32>;
+ this->id_to_priority_map_.bind(internal_handle_id, priorities);
+ }
+ else
+ {
+ this->id_to_priority_map_.find(internal_handle_id, priorities);
+ }
+
+ priorities->insert(priority);
+
+ return this->DA_strategy_->add_annotation(
+ hash_handle_id_and_priority(internal_handle_id, priority), annotation);
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ remove_annotation (Handle_Id handle)
+{
+ ACE_Guard<Lock> guard(this->lock_);
+ ACE_UINT32 internal_handle_id(0);
+ if (this->handle_ids_.unbind(handle, internal_handle_id) != -1)
+ {
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+ if (this->id_to_priority_map_.unbind(internal_handle_id, priorities) != -1)
+ {
+ for (ACE_Unbounded_Set<ACE_UINT32>::ITERATOR it = priorities->begin();
+ it != priorities->end();
+ ++it)
+ {
+ this->DA_strategy_->remove_annotation(
+ get_annotation_id(internal_handle_id, *it));
+ }
+
+ delete priorities;
+ }
+ }
+
+ return 0;
+}
+
+template <typename Handle_Id, typename Lock>
+int ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ remove_annotation (Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_Guard<Lock> guard(this->lock_);
+ ACE_UINT32 internal_handle_id(0);
+ int result(0);
+ if (this->handle_ids_.find(handle, internal_handle_id) != -1)
+ {
+ ACE_Unbounded_Set<ACE_UINT32>* priorities(0);
+ if (this->id_to_priority_map_.find(internal_handle_id, priorities) != -1)
+ {
+ if (priorities->remove(priority) != -1)
+ {
+ result = this->DA_strategy_->remove_annotation(
+ get_annotation_id(internal_handle_id, priority));
+ }
+ if (priorities->is_empty())
+ {
+ // This was the last annotation for this handle,
+ // so remove the handle information
+ this->id_to_priority_map_.unbind(internal_handle_id, priorities);
+ delete priorities;
+ this->handle_ids_.unbind(handle, internal_handle_id);
+ }
+ }
+ }
+
+ return result;
+}
+
+template <typename Handle_Id, typename Lock>
+ACE_UINT64 ACE_PIP_DA_Strategy_Adapter<Handle_Id, Lock>::
+ get_annotation_id(Handle_Id handle, ACE_UINT32 priority)
+{
+ ACE_UINT64 annotation_id(0);
+ ACE_UINT32 handle_id(0);
+
+ if (this->handle_ids_.find(handle, handle_id) != -1)
+ {
+ annotation_id = hash_handle_id_and_priority(handle_id, priority);
+ }
+
+ return annotation_id;
+}
+
+#endif
+
diff --git a/ACE/ace/PIP_Dispatcher.cpp b/ACE/ace/PIP_Dispatcher.cpp
new file mode 100644
index 00000000000..45d3ed63d6a
--- /dev/null
+++ b/ACE/ace/PIP_Dispatcher.cpp
@@ -0,0 +1,468 @@
+#include "ace/PIP_Dispatcher.h"
+#include "ace/PIP_Invocation_Manager.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Reactor.h"
+
+ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::dispatcher_ = 0;
+ACE_Mutex ACE_PIP_Dispatcher::instance_lock_;
+bool ACE_PIP_Dispatcher::delete_dispatcher_ = false;
+bool ACE_PIP_Dispatcher::shutdown_ = false;
+
+/// Constructor
+ACE_PIP_Dispatcher::ACE_PIP_Dispatcher()
+ : current_highest_priority_(ACE_Event_Handler::LO_PRIORITY)
+ , current_lowest_priority_(ACE_Event_Handler::LO_PRIORITY)
+ , waiting_for_message_(false)
+ , num_threads_needed_(0)
+ , message_available_signal_(0)
+ , threads_available_signal_(0)
+ , DA_strategy_adapter_(0)
+ , num_pending_messages_(0)
+ , num_messages_received_(0)
+ , num_messages_dispatched_(0)
+{
+}
+
+/// Destructor
+ACE_PIP_Dispatcher::~ACE_PIP_Dispatcher()
+{
+ ACE_PIP_Protocol_Message* message(0);
+
+ // Destroy all messages that have yet to be dispatched
+ this->pending_messages_lock_.acquire();
+ while (this->pending_messages_by_message_id_.current_size() != 0)
+ {
+ this->pending_messages_by_message_id_.unbind(
+ this->pending_messages_by_message_id_.begin()->key(),
+ message);
+
+ if (message)
+ {
+ delete message;
+ message = 0;
+ }
+ }
+ this->pending_messages_lock_.release();
+}
+
+
+ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::instance()
+{
+ if (ACE_PIP_Dispatcher::dispatcher_ == 0)
+ {
+ instance_lock_.acquire();
+
+ if (ACE_PIP_Dispatcher::dispatcher_ == 0)
+ {
+ ACE_NEW_RETURN (ACE_PIP_Dispatcher::dispatcher_,
+ ACE_PIP_Dispatcher,
+ 0);
+
+ delete_dispatcher_ = true;
+ }
+
+ instance_lock_.release();
+ }
+
+ return dispatcher_;
+}
+
+/// Receive a message for eventual dispatching
+void ACE_PIP_Dispatcher::process_message(ACE_PIP_Protocol_Message* message)
+{
+ switch (message->message_type())
+ {
+ case ACE_PIP_Protocol_Message::ACCEL:
+ process_incoming_acceleration(message);
+ break;
+
+ case ACE_PIP_Protocol_Message::REQUEST:
+ process_incoming_request(message);
+ break;
+
+ case ACE_PIP_Protocol_Message::RESPONSE:
+ // Forward the response to the invocation manager
+ ACE_PIP_Invocation_Manager::instance()->process_inbound_response(message);
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+/// Signals the dispatcher to dispatch a new message if possible.
+int ACE_PIP_Dispatcher::handle_output (ACE_HANDLE)
+{
+ ACE_PIP_Protocol_Message* message(0);
+ bool message_dispatched(false);
+
+ while (!message_dispatched && !shutdown_)
+ {
+ // get the highest priority message
+ this->pending_messages_lock_.acquire();
+ message = retrieve_highest_priority_pending_message();
+ if (message)
+ {
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(message->next());
+
+ this->deadlock_avoidance_lock_.acquire();
+
+ /// If dispatching could potentially cause deadlock, try to accelerate all lower priority
+ /// messages and then wait for threads to become available
+ this->num_threads_needed_ = DA_strategy_adapter_->is_deadlock_potential(
+ data_message->destination_handler_ID(),
+ data_message->message_priority());
+
+ if (this->num_threads_needed_ > 0)
+ {
+ this->deadlock_avoidance_lock_.release();
+ find_and_accelerate_lower_priority_message(data_message->message_priority());
+
+ // Wait for signal indicating enough threads exist to dispatch the message
+ this->threads_available_signal_.acquire();
+
+ // Before grabing the deadlock avoidance lock, check to make sure
+ // we haven't been told to shutdown.
+ if (shutdown_)
+ break;
+
+ this->deadlock_avoidance_lock_.acquire();
+ }
+
+ // At this point, sufficient threads exist to dispatch the message
+ // without threat of deadlock, so grant a thread
+ DA_strategy_adapter_->grant(data_message->destination_handler_ID(),
+ data_message->message_priority());
+
+ this->deadlock_avoidance_lock_.release();
+
+ // Transfer the message to the "dispatched" list
+ this->dispatched_messages_lock_.acquire();
+ Dispatched_Message_Data dispatch_record;
+ dispatch_record.id = message->message_id();
+ dispatch_record.priority = data_message->message_priority();
+ this->dispatched_messages_data_.insert(dispatch_record);
+ this->dispatched_messages_lock_.release();
+
+ //-------------TEST DATA------------------
+ // store statistics to be printed later
+ Dispatch_Test_Data test_data;
+ test_data.id = message->message_id();
+ test_data.priority = data_message->message_priority();
+ test_data.num_pending = this->num_pending_messages_;
+ test_data.highest_priority = this->current_highest_priority_;
+ test_data.lowest_priority = this->current_lowest_priority_;
+ dispatch_records_.push_back(test_data);
+
+ dispatched_ids_.push_back(message->message_id());
+
+ ++num_messages_dispatched_;
+ --this->num_pending_messages_;
+ this->pending_messages_lock_.release();
+ //-----------------------------------------
+
+ // Request another thread to be associated with dispatcher
+ ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK);
+
+ message_dispatched = true;
+
+ // Pass the message to the invocation manager for processing
+ ACE_PIP_Invocation_Manager::instance()->process_inbound_request(message);
+
+ // All processing associated with the message has been completed
+ // so discard the record
+ this->dispatched_messages_lock_.acquire();
+ this->dispatched_messages_data_.erase(dispatch_record);
+ this->dispatched_messages_lock_.release();
+
+ // Cleanup message information and release the thread resource
+ this->deadlock_avoidance_lock_.acquire();
+ DA_strategy_adapter_->release(data_message->destination_handler_ID(),
+ data_message->message_priority());
+
+ if (this->num_threads_needed_ > 0)
+ {
+ --this->num_threads_needed_;
+ if (this->num_threads_needed_ == 0)
+ {
+ this->threads_available_signal_.release();
+ }
+ }
+
+ this->deadlock_avoidance_lock_.release();
+ }
+ else
+ {
+ // There are no messages to dispatch, so wait for one to arrive
+ this->waiting_for_message_ = true;
+ this->pending_messages_lock_.release();
+ this->message_available_signal_.acquire();
+
+ // Before dispatching a message, make sure we haven't been
+ // instructed to shutdown
+ if (shutdown_)
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+/// Initializes dispatcher
+void ACE_PIP_Dispatcher::init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter)
+{
+ DA_strategy_adapter_ = DA_strategy_adapter;
+ this->waiting_for_message_ = true;
+ ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK);
+}
+
+/// store the message
+void ACE_PIP_Dispatcher::process_incoming_request(ACE_PIP_Protocol_Message* message)
+{
+ // Store the message token 2 ways to enable efficient dispatching as well as
+ // efficient lookup for accelerations
+ this->pending_messages_lock_.acquire();
+
+ //-------TEST DATA------------------------
+ ++num_messages_received_;
+ ++this->num_pending_messages_;
+ received_ids_.push_back(message->message_id());
+
+ //------------------------------------------
+ ACE_UINT32 priority =
+ static_cast<ACE_PIP_Data_Message*>(message->next())->message_priority();
+
+ // update the priority upper and lower bounds. These values are stored to
+ // avoid checking the full range of priorities when dispatching messages
+ if (priority > this->current_highest_priority_)
+ {
+ this->current_highest_priority_ = priority;
+ }
+ else if (priority < this->current_lowest_priority_)
+ {
+ this->current_lowest_priority_ = priority;
+ }
+
+ PRIORITY_MESSAGE_LIST_MAP::iterator
+ message_iter = this->pending_messages_by_priority_.find(priority);
+
+ if (message_iter == this->pending_messages_by_priority_.end())
+ {
+ // Create a new entry for this priority level
+ std::list<ACE_PIP_Protocol_Message*> new_priority_list;
+ new_priority_list.push_back(message);
+ this->pending_messages_by_priority_.insert(
+ make_pair(priority, new_priority_list));
+ }
+ else
+ {
+ // Priority already exists, so add the message token to the list
+ message_iter->second.push_back(message);
+ }
+
+ this->pending_messages_by_message_id_.bind(message->message_id(), message);
+
+ if (this->waiting_for_message_)
+ {
+ this->waiting_for_message_ = false;
+
+ // Signal waiting dispatcher thread to dispatch new message
+ this->message_available_signal_.release();
+ }
+
+ this->pending_messages_lock_.release();
+
+}
+
+/// Find the highest priority message and return it
+ACE_PIP_Protocol_Message* ACE_PIP_Dispatcher::
+ retrieve_highest_priority_pending_message()
+{
+ ACE_PIP_Protocol_Message* message(0);
+ for (ACE_INT32 current_priority = (ACE_INT32)this->current_highest_priority_;
+ current_priority >= (ACE_INT32)this->current_lowest_priority_;
+ --current_priority)
+ {
+ PRIORITY_MESSAGE_LIST_MAP::iterator
+ pending_message_iter = this->pending_messages_by_priority_.find(current_priority);
+
+ for (; pending_message_iter != this->pending_messages_by_priority_.end();
+ ++pending_message_iter)
+ {
+ std::list<ACE_PIP_Protocol_Message*>::iterator next_message_iter =
+ pending_message_iter->second.begin();
+
+ if (next_message_iter != pending_message_iter->second.end())
+ {
+ // The highest-priority message has been found. Grab the message
+ // and remove it from both containers
+ message = *next_message_iter;
+ pending_message_iter->second.pop_front();
+ this->pending_messages_by_message_id_.unbind(message->message_id());
+ break;
+ }
+ else
+ {
+ // There are no messages at this priority. Since the search begins at
+ // the highest priority, lower the highest priority until a message
+ // is found
+ if (this->current_highest_priority_ > this->current_lowest_priority_)
+ {
+ --this->current_highest_priority_;
+ }
+ }
+ }
+
+ if (message)
+ {
+ break;
+ }
+ }
+
+ return message;
+}
+
+bool ACE_PIP_Dispatcher::
+find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority)
+{
+ bool found(false);
+ bool erased_this_pass(true);
+
+ this->dispatched_messages_lock_.acquire();
+
+ while(erased_this_pass)
+ {
+ erased_this_pass = false;
+
+ std::set<Dispatched_Message_Data>::iterator iter = this->dispatched_messages_data_.begin();
+
+ // Find all dispatched messages having priority lower than new_priority. For each
+ // send an acceleration message, and update the dispatch record
+ for (; iter != this->dispatched_messages_data_.end() &&
+ this->num_threads_needed_ > 0; ++iter)
+ {
+ if (iter->priority < new_priority)
+ {
+ // A message has been found that has a lower priority,
+ // so the send an acceleration message
+ ACE_PIP_Accel_Message* accel_message = new ACE_PIP_Accel_Message;
+ accel_message->old_priority(iter->priority);
+ accel_message->new_priority(new_priority);
+
+ ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message;
+ protocol_message->message_type(ACE_PIP_Protocol_Message::ACCEL);
+ protocol_message->message_id(iter->id);
+ protocol_message->next(accel_message);
+
+ Dispatched_Message_Data dispatch_record = *iter;
+ this->dispatched_messages_data_.erase(iter);
+ dispatch_record.priority = new_priority;
+ this->dispatched_messages_data_.insert(dispatch_record);
+ ACE_PIP_Invocation_Manager::instance()->process_acceleration(protocol_message);
+ found = true;
+ erased_this_pass = true;
+ break;
+ }
+ }
+ }
+
+ this->dispatched_messages_lock_.release();
+ this->pending_messages_lock_.release();
+
+ return found;
+}
+
+void ACE_PIP_Dispatcher::shutdown()
+{
+ shutdown_ = true;
+
+ // Pulse signals so waiting threads can quit
+ this->message_available_signal_.release();
+ this->threads_available_signal_.release();
+}
+
+void ACE_PIP_Dispatcher::process_incoming_acceleration(ACE_PIP_Protocol_Message* message)
+{
+ bool updated_pending(false);
+ // Look for pending message. If the message is pending, update the priority, move it around in data structures, and quit
+
+ ACE_PIP_Accel_Message* accel_message =
+ static_cast<ACE_PIP_Accel_Message*>(message->next());
+
+ this->pending_messages_lock_.acquire();
+ ACE_Hash_Map_Entry<ACE_UINT64, ACE_PIP_Protocol_Message*>* entry(0);
+ if (this->pending_messages_by_message_id_.find(message->message_id(), entry) == 0)
+ {
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(entry->item()->next());
+
+ data_message->message_priority(accel_message->new_priority());
+
+ // move the message from one priority to the other
+ updated_pending = true;
+ }
+ this->pending_messages_lock_.release();
+
+ if (!updated_pending)
+ {
+ bool found(false);
+ ACE_Guard<ACE_Mutex> guard(this->dispatched_messages_lock_);
+ // Message is not pending, so must already be dispatche
+ std::set<Dispatched_Message_Data>::iterator iter = this->dispatched_messages_data_.begin();
+
+ // Find all dispatched messages having priority lower than new_priority. For each
+ // send an acceleration message, and update the dispatch record
+ for (; iter != this->dispatched_messages_data_.end(); ++iter)
+ {
+ if ((iter->id == message->message_id()) &&
+ (iter->priority < accel_message->new_priority()))
+ {
+ Dispatched_Message_Data dispatch_record = *iter;
+ this->dispatched_messages_data_.erase(iter);
+ dispatch_record.priority = accel_message->new_priority();
+ this->dispatched_messages_data_.insert(dispatch_record);
+ ACE_PIP_Invocation_Manager::instance()->process_acceleration(message);
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ for (size_t index = 0; index < received_ids_.size(); ++index)
+ {
+ if (received_ids_[index] == message->message_id())
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+}
+void ACE_PIP_Dispatcher::print_results()
+{
+ printf("----------------------DISPATCHER_RESULTS-------------\n");
+ printf("Num received: %d\n", num_messages_received_);
+ printf("Num dispatched: %d\n\n", num_messages_dispatched_);
+ printf("Received Ids: \n");
+
+ for (size_t index = 0; index < received_ids_.size(); ++index)
+ {
+ printf("%lld\n", received_ids_[index]);
+ }
+
+ printf("\nDispatched Ids: \n");
+ for (size_t dispatched_index = 0; dispatched_index < dispatched_ids_.size();
+ ++dispatched_index)
+ {
+ printf("%lld\n", dispatched_ids_[dispatched_index]);
+ }
+
+ printf("Num received: %d\n", received_ids_.size());
+ printf("Num dispatched: %d\n", dispatched_ids_.size());
+ printf("---------------------\n");
+}
diff --git a/ACE/ace/PIP_Dispatcher.h b/ACE/ace/PIP_Dispatcher.h
new file mode 100644
index 00000000000..31f59f5d358
--- /dev/null
+++ b/ACE/ace/PIP_Dispatcher.h
@@ -0,0 +1,188 @@
+ /**
+ * @file PIP_Dispatcher.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that dispatches priority inheritance protocol messages
+ * to the appropriate message handler.
+*/
+
+
+#ifndef _PIP_DISPATCHER_H_
+#define _PIP_DISPATCHER_H_
+
+// ACE definitions
+#include "ace/Event_Handler.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/PIP_DA_Strategy_Adapter.h"
+#include "ace/PIP_Messages.h"
+#include "ace/RW_Thread_Mutex.h"
+#include "ace/Semaphore.h"
+#include "ace/Singleton.h"
+
+// STL definitions
+#include <list>
+#include <map>
+#include <set>
+#include <vector>
+
+// Forward Declarations
+class ACE_PIP_Protocol_Message;
+
+typedef std::map<ACE_UINT32, std::list<ACE_PIP_Protocol_Message*> >
+ PRIORITY_MESSAGE_LIST_MAP;
+
+// Associate each message with a message ID
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64,
+ ACE_PIP_Protocol_Message*,
+ ACE_Hash<ACE_UINT64>,
+ ACE_Equal_To<ACE_UINT64>,
+ ACE_Null_Mutex> ID_MESSAGE_MAP;
+
+
+/**
+ * @class ACE_Dispatcher
+ * @brief Dispatches ACE_PIP_Priority_Messages in priority order
+ * message handlers. Additionally, notifies handlers when priority inversion is
+ * detected.
+ *
+ * The PIP_Message_Dispatcher implements the priority inheritance protocol.
+ * Upon receipt of messages, it determines the highest-priority message to
+ * be dispatched, and dispatches providing enough resources exist. If not enough exist,
+ * and a lower priority message has been dispatched, an acceleration message is sent
+ * to the corresponding handler to raise the priority of the message, thus
+ * mitigating the inversion.
+*/
+class ACE_Export ACE_PIP_Dispatcher : public ACE_Event_Handler
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Dispatcher();
+
+ /// Destructor
+ virtual ~ACE_PIP_Dispatcher();
+
+ /// obtain the single instance of the dispatcher
+ static ACE_PIP_Dispatcher* instance();
+
+ /// Receive a message for eventual dispatching
+ void process_message(ACE_PIP_Protocol_Message* message);
+
+ /// Signals the dispatcher to dispatch a new message if possible.
+ virtual int handle_output (ACE_HANDLE);
+
+ /// Initializes dispatcher
+ void init(ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter);
+
+ /// Tell the dispatcher to stop dispatching and release all threads ASAP
+ void shutdown();
+
+ /// Accelerate the appropriate message
+ void process_incoming_acceleration(ACE_PIP_Protocol_Message* message);
+
+ /// Print statistics
+ void print_results();
+
+ private:
+
+ // Dispatched_Message_Data stores the ID and priority
+ // of a dispatched message
+ class Dispatched_Message_Data
+ {
+ public:
+
+ bool operator<(const Dispatched_Message_Data& other) const
+ {
+ return (priority < other.priority);
+ }
+
+ bool operator==(const Dispatched_Message_Data& other) const
+ {
+ return (id == other.id);
+ }
+
+ bool operator!=(const Dispatched_Message_Data& other) const
+ {
+ return !(*this == other);
+ }
+
+ ACE_UINT64 id;
+ ACE_UINT32 priority;
+ };
+
+ class Dispatch_Test_Data
+ {
+ public:
+ ACE_UINT64 id;
+ ACE_UINT64 priority;
+ ACE_UINT32 num_pending;
+ ACE_UINT32 highest_priority;
+ ACE_UINT32 lowest_priority;
+ };
+
+ /// store the message
+ void process_incoming_request(ACE_PIP_Protocol_Message* message);
+
+ /// Find the highest priority message and return it
+ ACE_PIP_Protocol_Message* retrieve_highest_priority_pending_message();
+
+ bool find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority);
+
+
+ // Dispatched message data is stored to determine which messages are
+ // currently assigned to a thread. This is useful for finding messages
+ // whose priority needs to be accelerated in the case where an inversion
+ // is detected.
+ std::set<Dispatched_Message_Data> dispatched_messages_data_;
+ ACE_Mutex dispatched_messages_lock_;
+
+ ACE_UINT32 current_highest_priority_;
+ ACE_UINT32 current_lowest_priority_;
+
+ // Pending messages (those not dispatched) are stored in 2 ways for efficiency
+ // 1.) By message id - this is useful for managing priority accelerations
+ // because we can find the appropriate message in constant time
+ // 2.) By priority - this is useful for determining which message to dispatch next
+ // as messages are dispatched in priority order
+ PRIORITY_MESSAGE_LIST_MAP pending_messages_by_priority_;
+ ID_MESSAGE_MAP pending_messages_by_message_id_;
+ ACE_Mutex pending_messages_lock_;
+
+ // Indicates the dispatcher has a thread waiting to
+ // dispatch a message
+ bool waiting_for_message_;
+
+ // Number of threads that need to be returned in order to
+ // dispatch the current message
+ int num_threads_needed_;
+
+ ACE_Semaphore message_available_signal_;
+ ACE_Semaphore threads_available_signal_;
+
+ ACE_PIP_DA_Strategy_Adapter<ACE_UINT32, ACE_Null_Mutex>* DA_strategy_adapter_;
+ ACE_Mutex deadlock_avoidance_lock_;
+
+ static ACE_PIP_Dispatcher* dispatcher_;
+ static ACE_Mutex instance_lock_;
+ static bool delete_dispatcher_;
+ static bool shutdown_;
+
+ // Test variables
+ ACE_UINT32 num_pending_messages_;
+ ACE_UINT32 num_messages_received_;
+ ACE_UINT32 num_messages_dispatched_;
+
+ ACE_Vector<ACE_UINT64> received_ids_;
+ ACE_Vector<ACE_UINT64> dispatched_ids_;
+ ACE_Vector<Dispatch_Test_Data> dispatch_records_;
+};
+
+// Define a singleton class to make the dispatcher globally accessible
+typedef ACE_Singleton<ACE_PIP_Dispatcher, ACE_Mutex>
+ ACE_PIP_Dispatcher_Singleton;
+
+#endif
diff --git a/ACE/ace/PIP_IO_Handler.cpp b/ACE/ace/PIP_IO_Handler.cpp
new file mode 100644
index 00000000000..be40fc03a8b
--- /dev/null
+++ b/ACE/ace/PIP_IO_Handler.cpp
@@ -0,0 +1,151 @@
+// $Id$
+
+#include "ace/Guard_T.h"
+#include "ace/PIP_IO_Handler.h"
+#include "ace/PIP_Invocation_Manager.h"
+#include "ace/PIP_Dispatcher.h"
+
+/// Constructor
+ACE_PIP_IO_Handler::ACE_PIP_IO_Handler()
+ : read_closed_(false)
+ , write_closed_(false)
+ , priority_set_(false)
+ , site_id_(0)
+ , handler_id_(0)
+ , destination_site_id_(0)
+ , millisecond_(0, 1000)
+{
+ // Temporarily assign the priority to be highest possible.
+ // The first message received by the handler will be the priority
+ this->priority(ACE_Event_Handler::HI_PRIORITY);
+}
+
+/// Destructor
+ACE_PIP_IO_Handler::~ACE_PIP_IO_Handler( )
+{
+ // Tell the Invocation Manager to stop sending us messages
+ ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this);
+
+ // Delete all outgoing messages
+ ACE_PIP_Protocol_Message* message(0);
+ while (!outgoing_message_queue_.is_empty())
+ {
+ outgoing_message_queue_.dequeue_tail(message);
+ delete message;
+ }
+}
+
+/// Initialize the priority of the handler, and inform the other end
+/// of the priority
+void ACE_PIP_IO_Handler::init(ACE_UINT32 site_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 priority)
+{
+ this->priority(priority);
+ site_id_ = site_id;
+ destination_site_id_ = destination_site_id;
+
+ // Inform other end of this connections priority
+ peer_.send(&priority, sizeof(priority));
+
+ // Inform other end of this end's site id
+ peer_.send(&site_id, sizeof(site_id));
+ priority_set_ = true;
+
+ // Register to receive outgoing messages
+ ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this);
+}
+
+void ACE_PIP_IO_Handler::extract_priority()
+{
+ ACE_UINT32 priority(0);
+ if (peer_.recv(&priority, sizeof(priority)) == sizeof(priority))
+ {
+ this->priority(priority);
+ }
+ else
+ {
+ this->priority(ACE_Event_Handler::LO_PRIORITY);
+ }
+
+ // Receive the other end's site id
+ if (peer_.recv(&destination_site_id_, sizeof(destination_site_id_)) != sizeof(destination_site_id_))
+ {
+ destination_site_id_ = 0;
+ }
+
+ priority_set_ = true;
+}
+
+/// Handles read event on socket.
+int ACE_PIP_IO_Handler::handle_input (ACE_HANDLE fd)
+{
+ ACE_UNUSED_ARG(fd);
+
+ int result(0);
+ int bytes_read(0);
+
+ if (!priority_set_)
+ {
+ // incoming message is the priority of this connection
+ extract_priority();
+ ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this);
+ }
+ else
+ {
+ ACE_PIP_Protocol_Message* message(0);
+ ACE_NEW_RETURN(message, ACE_PIP_Protocol_Message, 0);
+ // Read the next incoming message
+ bytes_read = message->deserialize(peer_);
+ if (bytes_read > 0)
+ {
+ ACE_PIP_Dispatcher::instance()->process_message(message);
+ }
+ else if (bytes_read < 0)
+ {
+ // The connection is broken, so handler should be deleted
+ delete message;
+ result = -1;
+ }
+ }
+
+ return result;
+}
+
+
+/// Handles output event on socket
+int ACE_PIP_IO_Handler::handle_output (ACE_HANDLE fd)
+{
+ ACE_UNUSED_ARG(fd);
+
+ int bytes_sent(0);
+ // determine if outgoing messages exist
+ ACE_PIP_Protocol_Message* message(0);
+
+ write_closed_ = false;
+ big_lock_.acquire();
+ if (outgoing_message_queue_.dequeue_tail(message) != -1)
+ {
+ bytes_sent = message->serialize(peer_);
+ delete message;
+ if (bytes_sent >= 0)
+ {
+ big_lock_.release();
+ return 0;
+ }
+ else
+ {
+ write_closed_ = true;
+ big_lock_.release();
+ // indicate the outgoing connection is closed
+ return -2;
+ }
+ }
+ else
+ {
+ // indicate that there was no message to output
+ big_lock_.release();
+ return -1;
+ }
+}
+
diff --git a/ACE/ace/PIP_IO_Handler.h b/ACE/ace/PIP_IO_Handler.h
new file mode 100644
index 00000000000..563a95303c2
--- /dev/null
+++ b/ACE/ace/PIP_IO_Handler.h
@@ -0,0 +1,103 @@
+ /**
+ * @file PIP_IO_Handler.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that manages network I/O
+*/
+
+#ifndef _PIP_IO_HANDLER_H_
+#define _PIP_IO_HANDLER_H_
+
+#include "ace/Message_Queue.h"
+#include "ace/Mutex.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Svc_Handler.h"
+#include "ace/Thread_Mutex.h"
+
+// Typedefs
+typedef ACE_Message_Queue_Ex<ACE_PIP_Protocol_Message, ACE_NULL_SYNCH>
+ PROTO_MESSAGE_QUEUE_TYPE;
+
+/**
+ * @class ACE_PIP_IO_Handler
+ *
+ * @brief Performs network I/O
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+class ACE_Export ACE_PIP_IO_Handler :
+ public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_MT_SYNCH>
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_IO_Handler ();
+
+ /// Destructor
+ virtual ~ACE_PIP_IO_Handler();
+
+ /// Enqueue a message to be sent
+ virtual int put_message (ACE_PIP_Protocol_Message* message) = 0;
+
+ /// Initialize the priority of the handler, and inform the other end
+ /// of the priority
+ virtual void init(ACE_UINT32 site_id,
+ ACE_UINT32 destination_site_id,
+ ACE_UINT32 priority);
+
+ /// Handles read event on socket.
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+
+ /// Handles read event on socket.
+ virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+
+ /// Set the site_id
+ void site_id(ACE_UINT32 site_id);
+
+ /// Get the site id
+ ACE_UINT32 site_id() const;
+
+ /// Get the other end's site id
+ ACE_UINT32 destination_site_id() const;
+
+ /// Get the handler id
+ void handler_id(ACE_UINT32 handler_id);
+
+ /// Set the handler id
+ ACE_UINT32 handler_id() const;
+
+ /// Get the address of the remote site
+ ACE_INET_Addr remote_address() const;
+
+ protected:
+
+ /// Reads priority from socket
+ void extract_priority();
+
+ // variables to track the state of the handler
+ bool read_closed_;
+ bool write_closed_;
+ bool priority_set_;
+
+ ACE_UINT32 site_id_;
+ ACE_UINT32 handler_id_;
+ ACE_UINT32 destination_site_id_;
+
+ const ACE_Time_Value millisecond_;
+
+ PROTO_MESSAGE_QUEUE_TYPE outgoing_message_queue_;
+ ACE_Thread_Mutex big_lock_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "PIP_IO_Handler.inl"
+#endif /* __ACE_INLINE__ */
+
+
+#endif /* _PIP_IO_Handler_H_ */
+
+
diff --git a/ACE/ace/PIP_IO_Handler.inl b/ACE/ace/PIP_IO_Handler.inl
new file mode 100644
index 00000000000..ed1768940aa
--- /dev/null
+++ b/ACE/ace/PIP_IO_Handler.inl
@@ -0,0 +1,36 @@
+// $Id:
+
+#include <ace/INET_Addr.h>
+#include <ace/PIP_Invocation_Manager.h>
+
+ACE_INLINE void ACE_PIP_IO_Handler::site_id(ACE_UINT32 site_id)
+{
+ site_id_ = site_id;
+}
+
+ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::site_id() const
+{
+ return site_id_;
+}
+
+ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::destination_site_id() const
+{
+ return destination_site_id_;
+}
+
+ACE_INLINE void ACE_PIP_IO_Handler::handler_id(ACE_UINT32 handler_id)
+{
+ handler_id_ = handler_id;
+}
+
+ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::handler_id() const
+{
+ return handler_id_;
+}
+
+ACE_INLINE ACE_INET_Addr ACE_PIP_IO_Handler::remote_address() const
+{
+ ACE_INET_Addr addr;
+ peer_.get_remote_addr(addr);
+ return addr;
+}
diff --git a/ACE/ace/PIP_Invocation_Manager.cpp b/ACE/ace/PIP_Invocation_Manager.cpp
new file mode 100644
index 00000000000..d18d8789e82
--- /dev/null
+++ b/ACE/ace/PIP_Invocation_Manager.cpp
@@ -0,0 +1,316 @@
+
+#include "ace/PIP_Invocation_Manager.h"
+#include "ace/PIP_IO_Handler.h"
+#include <ace/PIP_Message_Handler.h>
+
+#include <iostream>
+ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::invocation_manager_ = 0;
+ACE_Mutex ACE_PIP_Invocation_Manager::instance_lock_;
+bool ACE_PIP_Invocation_Manager::delete_manager_ = false;
+ACE_UINT64 ACE_PIP_Invocation_Manager::message_id_base_ = 0;
+ACE_UINT32 ACE_PIP_Invocation_Manager::site_id_ = 0;
+
+/// Constructor
+ACE_PIP_Invocation_Manager::ACE_PIP_Invocation_Manager()
+ : message_counter_(0)
+{
+}
+
+/// Destructor
+ACE_PIP_Invocation_Manager::~ACE_PIP_Invocation_Manager()
+{
+
+}
+
+/// Processes requests received from I/O handler
+void ACE_PIP_Invocation_Manager::process_inbound_request(ACE_PIP_Protocol_Message* message)
+{
+ ACE_PIP_Data_Message* payload =
+ static_cast<ACE_PIP_Data_Message*>(message->release_next());
+
+ ACE_UINT32 handler_id = payload->destination_handler_ID();
+ ACE_PIP_Message_Handler* handler(0);
+
+ big_lock_.acquire();
+ ACE_UINT64 message_id = message->message_id();
+ if (object_id_handler_map_.find(handler_id, handler) == 0)
+ {
+ // look to see if there are any accelerations. If so, accelerate.
+ // Map the message ID to a list of outgoing messages
+ in_out_id_map_.bind(message_id,
+ std::list<ACE_UINT64>());
+
+ // Keep a record of the message and its priority so
+ // it can be accelerated if necessary
+ Invocation_Data invocation_data;
+ invocation_data.site_id = site_id_;
+ invocation_data.priority = payload->message_priority();
+ invocation_data_map_.bind(message_id, invocation_data);
+
+ big_lock_.release();
+
+ // Pass the message to the message handler, deleting
+ // the corresponding struct
+ handler->process_incoming_message(payload->release_block(), message_id);
+ delete payload;
+ delete message;
+
+ // Once message processing has completed,
+ // clean-up any message residue
+ big_lock_.acquire();
+ in_out_id_map_.unbind(message_id);
+ }
+
+ big_lock_.release();
+}
+
+/// Processes request to be forwarded to another handler
+int ACE_PIP_Invocation_Manager::process_outbound_request(ACE_Message_Block* message,
+ ACE_UINT64 token,
+ ACE_Future<ACE_Message_Block*>*& response_holder)
+{
+ // Create a protocol message from the data block
+ ACE_PIP_Protocol_Message* protocol_message(0);
+
+ ACE_NEW_RETURN(protocol_message, ACE_PIP_Protocol_Message, -1);
+ protocol_message->message_type(ACE_PIP_Protocol_Message::REQUEST);
+ protocol_message->process_message_payload(message);
+
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(protocol_message->next());
+
+ // determine if the message expects a reply. If so, create a future for it.
+ if (data_message->reply_expected())
+ {
+ // create and store future
+ ACE_NEW_RETURN(response_holder, ACE_Future<ACE_Message_Block*>, -1);
+ }
+
+ Invocation_Data remote_info;
+ remote_info.response_holder = response_holder;
+ remote_info.site_id = data_message->destination_site_ID();
+ remote_info.priority = data_message->message_priority();
+
+ // Associate this message with token. This enables acceleration forwarding. Only
+ // Token == -1 indicates this is the start of a call chain
+
+ big_lock_.acquire();
+ ACE_UINT64 message_id = generate_message_id();
+ invocation_data_map_.bind(message_id, remote_info);
+ protocol_message->message_id(message_id);
+
+ if (token != 0)
+ {
+ ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* entry(0);
+ // map the originating message to the outgoing message
+ // so that accelerations can be forwarded appropriately
+ in_out_id_map_.find(token, entry);
+ if (entry)
+ {
+ entry->item().push_back(message_id);
+ }
+ }
+
+ // use the priority and address to determine which I/O handler to send to
+ // pass the message to the I/O handler
+ PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
+ site_to_handlers_map_.find(data_message->destination_site_ID(),
+ handler_map);
+
+ ACE_PIP_IO_Handler* IO_handler(0);
+ handler_map->find(data_message->message_priority(),
+ IO_handler);
+
+ big_lock_.release();
+
+ if (IO_handler)
+ {
+ IO_handler->put_message(protocol_message);
+ }
+
+ return 0;
+}
+
+ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::instance()
+{
+ if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0)
+ {
+ instance_lock_.acquire();
+
+ if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0)
+ {
+ ACE_NEW_RETURN (ACE_PIP_Invocation_Manager::invocation_manager_,
+ ACE_PIP_Invocation_Manager,
+ 0);
+
+ delete_manager_ = true;
+ }
+
+ instance_lock_.release();
+ }
+
+ return invocation_manager_;
+}
+
+/// Process response received from a handler
+void ACE_PIP_Invocation_Manager::process_inbound_response(ACE_PIP_Protocol_Message* message)
+{
+ Invocation_Data remote_info;
+
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+
+ // Remove the child ID
+ if (invocation_data_map_.unbind(message->message_id(),
+ remote_info) != -1)
+ {
+ // Pass the received response to the message handler
+ // via a Future
+ remote_info.response_holder->set(message->next()->block());
+ remote_info.response_holder = 0;
+ }
+}
+
+/// Process response received from a handler
+void ACE_PIP_Invocation_Manager::process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token)
+{
+ // Parse the message
+ ACE_PIP_Protocol_Message* response = new ACE_PIP_Protocol_Message;
+ response->process_message_payload(message);
+ response->message_type(ACE_PIP_Protocol_Message::RESPONSE);
+
+ // Lookup the appropriate IO handler, and pass down the message
+ ACE_PIP_Data_Message* data_message =
+ static_cast<ACE_PIP_Data_Message*>(response->next());
+
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+
+ response->message_id(token);
+ PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
+
+ if (site_to_handlers_map_.find
+ (data_message->destination_site_ID(),
+ handler_map) != -1)
+ {
+ ACE_PIP_IO_Handler* handler(0);
+ if (handler_map->find(data_message->message_priority(),
+ handler) != -1)
+ {
+ handler->put_message(response);
+ }
+ }
+}
+
+/// Process request to accelerate the priority of a process
+void ACE_PIP_Invocation_Manager::process_acceleration(ACE_PIP_Protocol_Message* message)
+{
+ ACE_PIP_Accel_Message* accel_message =
+ static_cast<ACE_PIP_Accel_Message*>(message->next());
+
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+
+ // Update the stored priority of the original message. This will enable subsequent upcalls
+ // to adjust their priority appropriately
+ Invocation_Data invocation_data;
+ if (invocation_data_map_.unbind(message->message_id(), invocation_data) == 0)
+ {
+ if (invocation_data.priority < accel_message->new_priority())
+ {
+ invocation_data.priority = accel_message->new_priority();
+ }
+
+ invocation_data_map_.bind(message->message_id(), invocation_data);
+
+ // Generate acceleration messages for each outgoing invocation
+ // resulting from processing of incoming request. Do so
+ // only if their priority is lower than the accelerated priority
+ ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* child_entry(0);
+ if (in_out_id_map_.find(message->message_id(), child_entry) == 0)
+ {
+ std::list<ACE_UINT64>::iterator child_iter = child_entry->item().begin();
+ for (; child_iter != child_entry->item().end(); ++child_iter)
+ {
+ if (invocation_data_map_.unbind(*child_iter, invocation_data) == 0)
+ {
+ if (invocation_data.priority < accel_message->new_priority())
+ {
+ invocation_data.priority = accel_message->new_priority();
+ invocation_data_map_.bind(*child_iter, invocation_data);
+ // Generate new message and send it to the appropriate site
+ PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
+ if (site_to_handlers_map_.find(invocation_data.site_id,
+ handler_map) == 0)
+ {
+ ACE_PIP_IO_Handler* handler(0);
+ if (handler_map->find(ACE_Event_Handler::HI_PRIORITY, handler) == 0)
+ {
+ ACE_PIP_Accel_Message* accel_copy = accel_message->copy();
+ ACE_PIP_Protocol_Message* proto_copy = message->copy();
+ proto_copy->next(accel_copy);
+ handler->put_message(proto_copy);
+ }
+ }
+ }
+
+ invocation_data_map_.bind(*child_iter, invocation_data);
+ }
+ }
+ }
+ }
+
+ // delete the acceleration message here
+}
+
+/// Register an IO handler that can send messages on invocation
+/// manager's behalf
+int ACE_PIP_Invocation_Manager::register_IO_handler(ACE_PIP_IO_Handler* handler)
+{
+ // Extract the priority and remote address of handler
+ ACE_UINT32 priority = handler->priority();
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+
+ // Map the destination site ID and priority to this handler
+ PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
+ if (site_to_handlers_map_.find(handler->destination_site_id(),
+ handler_map) == -1)
+ {
+ ACE_NEW_RETURN(handler_map, PRIORITY_TO_IO_HANDLER_MAP, -1);
+ }
+
+ handler_map->bind(priority, handler);
+ site_to_handlers_map_.bind(handler->destination_site_id(), handler_map);
+
+ return 0;
+}
+
+void ACE_PIP_Invocation_Manager::unregister_IO_handler(ACE_PIP_IO_Handler* handler)
+{
+ ACE_UINT32 priority = handler->priority();
+
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+
+ // unbind the handler
+ PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
+ if (site_to_handlers_map_.find(handler->destination_site_id(),
+ handler_map) != -1)
+ {
+ handler_map->unbind(priority, handler);
+ }
+}
+
+void ACE_PIP_Invocation_Manager::register_message_handler(ACE_PIP_Message_Handler* handler)
+{
+ // extract the object id from the handler
+ // map the object id to the handler
+ ACE_Guard<ACE_Mutex> guard(big_lock_);
+ object_id_handler_map_.bind(handler->handler_id(), handler);
+}
+
+ACE_UINT64 ACE_PIP_Invocation_Manager::generate_message_id()
+{
+ return (((ACE_UINT64)site_id_) << 32) + message_counter_++;
+}
+
+void ACE_PIP_Invocation_Manager::site_id(ACE_UINT64 site_id)
+{
+ site_id_ = site_id;
+}
diff --git a/ACE/ace/PIP_Invocation_Manager.h b/ACE/ace/PIP_Invocation_Manager.h
new file mode 100644
index 00000000000..ae04f7a60dd
--- /dev/null
+++ b/ACE/ace/PIP_Invocation_Manager.h
@@ -0,0 +1,150 @@
+ /**
+ * @file PIP_Invocation_Manager.h
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that tracks handler invocations at a particular site
+ */
+
+
+#ifndef _PIP_INVOCATION_MANAGER_H_
+#define _PIP_INVOCATION_MANAGER_H_
+
+#include "ace/Containers_T.h"
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Message_Block.h"
+#include "ace/PIP_Messages.h"
+#include "ace/Singleton.h"
+#include "ace/Mutex.h"
+#include "ace/Null_Mutex.h"
+#include "ace/Future.h"
+
+#include <list>
+class ACE_PIP_IO_Handler;
+class ACE_PIP_Message_Handler;
+
+struct Invocation_Data
+{
+ ACE_Future<ACE_Message_Block*>* response_holder;
+ ACE_UINT32 site_id;
+ ACE_UINT32 priority;
+};
+
+// Typedefs
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64,
+ std::list<ACE_UINT64>,
+ ACE_Hash<ACE_UINT64>,
+ ACE_Equal_To<ACE_UINT64>,
+ ACE_Null_Mutex> ID_TO_ID_LIST_MAP;
+
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32,
+ ACE_PIP_Message_Handler*,
+ ACE_Hash<ACE_UINT32>,
+ ACE_Equal_To<ACE_UINT32>,
+ ACE_Null_Mutex> ID_TO_HANDLER_MAP;
+
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT64,
+ Invocation_Data,
+ ACE_Hash<ACE_UINT64>,
+ ACE_Equal_To<ACE_UINT64>,
+ ACE_Null_Mutex> ID_TO_INVOCATION_RECORD_MAP;
+
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32,
+ ACE_PIP_IO_Handler*,
+ ACE_Hash<ACE_UINT32>,
+ ACE_Equal_To<ACE_UINT32>,
+ ACE_Null_Mutex> PRIORITY_TO_IO_HANDLER_MAP;
+
+typedef ACE_Hash_Map_Manager_Ex<ACE_UINT32,
+ PRIORITY_TO_IO_HANDLER_MAP*,
+ ACE_Hash<ACE_UINT64>,
+ ACE_Equal_To<ACE_UINT64>,
+ ACE_Null_Mutex> SITE_TO_IO_HANDLERS_MAP;
+
+/**
+ * @class ACE_PIP_Invocation_Manager
+ * @brief
+*/
+class ACE_Export ACE_PIP_Invocation_Manager
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Invocation_Manager();
+
+ /// Destructor
+ ~ACE_PIP_Invocation_Manager();
+
+ /// Get the singleton instance of the Invocation Manager
+ static ACE_PIP_Invocation_Manager* instance();
+
+ /// Associated a site ID with the Invocation Manager
+ static void site_id(ACE_UINT64 site_id);
+
+ /// Process request made on local handler
+ void process_inbound_request(ACE_PIP_Protocol_Message* message);
+
+ /// Processes request to be forwarded to another handler
+ int process_outbound_request(ACE_Message_Block* message,
+ ACE_UINT64 token,
+ ACE_Future<ACE_Message_Block*>*& response_holder);
+
+ /// Process response to message sent by local handler
+ void process_inbound_response(ACE_PIP_Protocol_Message* message);
+
+ /// Process response sent to remote handler
+ void process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token);
+
+ /// Process request to accelerate the priority of a process
+ void process_acceleration(ACE_PIP_Protocol_Message* message);
+
+ /// Register an IO handler that can send messages of a certain priority
+ /// for the Invocation Manager
+ int register_IO_handler(ACE_PIP_IO_Handler* handler);
+
+ /// Un-register an IO handler
+ void unregister_IO_handler(ACE_PIP_IO_Handler* handler);
+
+ /// Register user-level message handler
+ void register_message_handler(ACE_PIP_Message_Handler* handler);
+
+ private:
+
+
+ ACE_UINT64 generate_message_id();
+
+ ACE_UINT64 message_counter_;
+
+ static ACE_UINT64 message_id_base_;
+
+ static ACE_UINT32 site_id_;
+
+ static ACE_PIP_Invocation_Manager* invocation_manager_;
+
+ static ACE_Mutex instance_lock_;
+
+ ACE_Mutex big_lock_;
+
+ static bool delete_manager_;
+
+ // Mapping of incoming messages to corresponding outgoing messages.
+ // This is used to track invocations resulting from an incoming invocation
+ // in order to pass acceleration messages along the chain
+ ID_TO_ID_LIST_MAP in_out_id_map_;
+
+ // Mapping of user-level handler ID to corresponding handler
+ ID_TO_HANDLER_MAP object_id_handler_map_;
+
+ // Mapping of site IDs to corresponding I/O handler map
+ SITE_TO_IO_HANDLERS_MAP site_to_handlers_map_;
+
+ // Maps message ID to data such as destination site
+ // and priority
+ ID_TO_INVOCATION_RECORD_MAP invocation_data_map_;
+};
+
+
+#endif
diff --git a/ACE/ace/PIP_Message_Handler.cpp b/ACE/ace/PIP_Message_Handler.cpp
new file mode 100644
index 00000000000..6f62815db5a
--- /dev/null
+++ b/ACE/ace/PIP_Message_Handler.cpp
@@ -0,0 +1,38 @@
+#include "ace/PIP_Message_Handler.h"
+
+ACE_PIP_Message_Handler::ACE_PIP_Message_Handler()
+ : handler_id_(0)
+ , site_id_(0)
+{
+
+}
+
+ACE_PIP_Message_Handler::ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id)
+ : handler_id_(handler_id)
+ , site_id_(site_id)
+{
+}
+
+void ACE_PIP_Message_Handler::send_request(ACE_Message_Block* message,
+ ACE_UINT64 message_id,
+ ACE_Message_Block*& response)
+{
+ ACE_Future<ACE_Message_Block*>* response_holder(0);
+ ACE_PIP_Invocation_Manager::instance()->process_outbound_request(message, message_id, response_holder);
+ if (response_holder)
+ {
+ if (response_holder->get(response) == -1)
+ {
+ response = 0;
+ }
+ }
+}
+
+void ACE_PIP_Message_Handler::send_response(ACE_Message_Block* message,
+ ACE_UINT64 message_id)
+{
+ ACE_PIP_Invocation_Manager::instance()->process_outbound_response(message, message_id);
+}
+
+
+
diff --git a/ACE/ace/PIP_Message_Handler.h b/ACE/ace/PIP_Message_Handler.h
new file mode 100644
index 00000000000..a38b0016a86
--- /dev/null
+++ b/ACE/ace/PIP_Message_Handler.h
@@ -0,0 +1,82 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file PIP_Message_Handler.h
+ *
+ * $Id:
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+//=============================================================================
+
+
+#ifndef _PIP_MESSAGE_HANDLER_H_
+#define _PIP_MESSAGE_HANDLER_H_
+
+#include <ace/Message_Block.h>
+#include <ace/PIP_Messages.h>
+#include <ace/Event_Handler.h>
+#include <ace/PIP_Invocation_Manager.h>
+
+/**
+ * @class ACE_PIP_Message_Handler
+ * @brief ACE_PIP_Message_Handler is a user-level message handler interface.
+ *
+ * ACE_PIP_Message_Handler is a user-level message handler interface. It can
+ * be extended to provide user-level processing of messages before passing
+ * the message block to the priority-inheritance protocol infrastructure.
+ *
+ * Each handler has an associated (site id, handler id) tuple with which
+ * it can be address by other handlersy
+ */
+class ACE_Export ACE_PIP_Message_Handler
+{
+ public:
+
+ /// Constructor that sets site and handler ID
+ ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id);
+ ACE_PIP_Message_Handler();
+
+ /// Destroy handler
+ virtual ~ACE_PIP_Message_Handler(){}
+
+ /// Process message received by priority inheritance infrastructure
+ virtual void process_incoming_message(ACE_Message_Block* message,
+ ACE_UINT64 message_id) = 0;
+
+ /// Get the handler id
+ ACE_UINT32 handler_id() const;
+
+ /// Set the handler id
+ void handler_id(ACE_UINT32 id);
+
+ /// Get the site id
+ ACE_UINT32 site_id() const;
+
+ /// Set the site id
+ void site_id(ACE_UINT32 id);
+
+ protected:
+
+ ACE_UINT32 handler_id_;
+ ACE_UINT32 site_id_;
+ ACE_INET_Addr my_address_;
+
+ // Pass a message to a remote handler
+ virtual void send_request(ACE_Message_Block* message,
+ ACE_UINT64 message_id,
+ ACE_Message_Block*& response);
+
+ // Pass a response message to a remote handler
+ virtual void send_response(ACE_Message_Block* message,
+ ACE_UINT64 message_id);
+
+};
+
+#if defined (__ACE_INLINE__)
+#include "PIP_Message_Handler.inl"
+#endif /* __ACE_INLINE__ */
+
+#endif
+
diff --git a/ACE/ace/PIP_Message_Handler.inl b/ACE/ace/PIP_Message_Handler.inl
new file mode 100644
index 00000000000..5100d9e2c9a
--- /dev/null
+++ b/ACE/ace/PIP_Message_Handler.inl
@@ -0,0 +1,22 @@
+// $Id:
+
+ACE_INLINE ACE_UINT32 ACE_PIP_Message_Handler::handler_id() const
+{
+ return handler_id_;
+}
+
+ACE_INLINE void ACE_PIP_Message_Handler::handler_id(ACE_UINT32 id)
+{
+ handler_id_ = id;
+}
+
+ACE_INLINE ACE_UINT32 ACE_PIP_Message_Handler::site_id() const
+{
+ return site_id_;
+}
+
+ACE_INLINE void ACE_PIP_Message_Handler::site_id(ACE_UINT32 id)
+{
+ site_id_ = id;
+}
+
diff --git a/ACE/ace/PIP_Messages.cpp b/ACE/ace/PIP_Messages.cpp
new file mode 100644
index 00000000000..c9bc42ef544
--- /dev/null
+++ b/ACE/ace/PIP_Messages.cpp
@@ -0,0 +1,577 @@
+// $Id$
+
+#include "ace/OS_NS_stdlib.h"
+#include "ace/OS_NS_string.h"
+#include "ace/PIP_Messages.h"
+
+ACE_PIP_Message::ACE_PIP_Message()
+ : block_(0)
+ , dirty_(false)
+ , next_(0)
+{}
+
+ACE_PIP_Message::~ACE_PIP_Message()
+{
+ if (this->next_)
+ {
+ delete this->next_;
+ }
+
+ if (block_)
+ {
+ block_->release();
+ }
+}
+
+void ACE_PIP_Message::block(ACE_Message_Block* block)
+{
+ // Remove the other block if it exist.
+ if (block_)
+ {
+ block_->release();
+ }
+
+ block_ = block;
+
+ // Extract the values from the block.
+ unpack();
+}
+
+ACE_PIP_Data_Message::ACE_PIP_Data_Message()
+ : message_priority_(-1)
+ , reply_expected_(false)
+ , source_handler_id_(-1)
+ , source_site_id_(-1)
+ , destination_handler_id_(-1)
+ , destination_site_id_(-1)
+{
+}
+
+int ACE_PIP_Data_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_sent(0);
+
+ // Only serialize if there is a block. If not,
+ // there's nothing we can do but fail since we don't
+ // have enough information to create a block and unpack it.
+ if (block_)
+ {
+ if (this->dirty_)
+ {
+ pack();
+ }
+
+ ACE_Message_Block* curr_block = block_;
+ int bytes_sent(0);
+ while(curr_block)
+ {
+ bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length());
+ if (bytes_sent > 0)
+ {
+ total_bytes_sent += bytes_sent;
+ curr_block = curr_block->next();
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ break;
+ }
+ }
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ }
+
+ return total_bytes_sent;
+}
+
+int ACE_PIP_Data_Message::pack()
+{
+ char* write_ptr = block_->wr_ptr();
+ char* read_ptr = block_->rd_ptr();
+
+ block_->reset();
+
+ // Pack reply expected into buffer.
+ ACE_OS::memcpy(block_->wr_ptr(), &this->reply_expected_, sizeof(this->reply_expected_));
+ block_->wr_ptr(sizeof(this->reply_expected_));
+
+ // Pack the message priority into the buffer.
+ ACE_OS::memcpy(block_->wr_ptr(), &this->message_priority_, sizeof(this->message_priority_));
+ block_->wr_ptr(sizeof(this->message_priority_));
+
+ // Pack the destination handler ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &this->destination_handler_id_, sizeof(this->destination_handler_id_));
+ block_->wr_ptr(sizeof(this->destination_handler_id_));
+
+ // Pack the source handler ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &this->source_handler_id_, sizeof(this->source_handler_id_));
+ block_->wr_ptr(sizeof(this->source_handler_id_));
+
+ // Pack the destination site ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &this->destination_site_id_, sizeof(this->destination_site_id_));
+ block_->wr_ptr(sizeof(this->destination_site_id_));
+
+ // Pack the source site ID into the buffer
+ ACE_OS::memcpy(block_->wr_ptr(), &this->source_site_id_, sizeof(this->source_site_id_));
+ block_->wr_ptr(sizeof(this->source_site_id_));
+
+ // Reset the buffer pointers to where they were so that the message length remains
+ // accurate.
+ block_->rd_ptr(read_ptr);
+ block_->wr_ptr(write_ptr);
+
+ return 0;
+}
+
+void ACE_PIP_Data_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ block_->reset();
+
+ // reply_expected_
+ ACE_OS::memcpy(&this->reply_expected_, block_->rd_ptr(), sizeof(this->reply_expected_));
+ block_->rd_ptr(sizeof(this->reply_expected_));
+
+ // message priority
+ ACE_OS::memcpy(&this->message_priority_, block_->rd_ptr(), sizeof(this->message_priority_));
+ block_->rd_ptr(sizeof(this->message_priority_));
+
+ // destination handler ID
+ ACE_OS::memcpy(&this->destination_handler_id_, block_->rd_ptr(), sizeof(this->destination_handler_id_));
+ block_->rd_ptr(sizeof(this->destination_handler_id_));
+
+ // source handler ID
+ ACE_OS::memcpy(&this->source_handler_id_, block_->rd_ptr(), sizeof(this->source_handler_id_));
+ block_->rd_ptr(sizeof(this->source_handler_id_));
+
+ // destination site ID
+ ACE_OS::memcpy(&this->destination_site_id_, block_->rd_ptr(), sizeof(this->destination_site_id_));
+ block_->rd_ptr(sizeof(this->destination_site_id_));
+
+ // source site ID
+ ACE_OS::memcpy(&this->source_site_id_, block_->rd_ptr(), sizeof(this->source_site_id_));
+ block_->rd_ptr(sizeof(this->source_site_id_));
+
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+ }
+
+ this->dirty_ = false;
+}
+
+ACE_PIP_Accel_Message::ACE_PIP_Accel_Message()
+ : ACCEL_HEADER_LENGTH_(2*sizeof(ACE_UINT32))
+ , destination_address_(0)
+ , destination_port_(0)
+ , new_priority_(0)
+ , old_priority_(0)
+{
+}
+
+int ACE_PIP_Accel_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ pack();
+
+ int bytes_sent = stream.send_n(block_->rd_ptr(), block_->length());
+ return bytes_sent;
+}
+
+int ACE_PIP_Accel_Message::pack()
+{
+ if (!block_)
+ {
+ ACE_NEW_RETURN(block_, ACE_Message_Block(ACCEL_HEADER_LENGTH_), -1);
+ this->dirty_ = true;
+ }
+
+ if (this->dirty_)
+ {
+
+ // Set the buffer pointers to the start of the buffer to
+ // ensure we're writing to the correct location
+ block_->reset();
+
+ // Pack the contents of the struct into the message block
+ ACE_OS::memcpy(block_->wr_ptr(), &this->old_priority_, sizeof(this->old_priority_));
+ block_->wr_ptr(sizeof(this->old_priority_));
+
+ ACE_OS::memcpy(block_->wr_ptr(), &this->new_priority_, sizeof(this->new_priority_));
+ block_->wr_ptr(sizeof(this->new_priority_));
+
+ this->dirty_ = false;
+ }
+
+ return 0;
+}
+
+void ACE_PIP_Accel_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ block_->reset();
+
+ this->old_priority_ = (*block_->rd_ptr());
+ block_->rd_ptr(sizeof(this->old_priority_));
+
+ this->new_priority_ = (*block_->rd_ptr());
+ block_->rd_ptr(sizeof (this->new_priority_));
+
+ // Reset the read and write pointers to their original location
+ // in the block.
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+ }
+
+ this->dirty_ = false;
+}
+
+ACE_PIP_Accel_Message* ACE_PIP_Accel_Message::copy()
+{
+ ACE_PIP_Accel_Message* copy(0);
+ ACE_NEW_RETURN(copy, ACE_PIP_Accel_Message, 0);
+
+ copy->new_priority_ = this->new_priority_;
+ copy->old_priority_ = this->old_priority_;
+ copy->pack();
+
+ return copy;
+}
+
+ACE_PIP_Protocol_Message::ACE_PIP_Protocol_Message()
+ : FIXED_HEADER_LENGTH_(sizeof(Message_Type) +
+ sizeof(message_id_) +
+ sizeof(num_payload_blocks_))
+ , message_type_(NONE)
+ , num_payload_blocks_(0)
+ , message_id_(0)
+{
+}
+
+int ACE_PIP_Protocol_Message::serialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_sent(0);
+
+ pack();
+
+ ACE_Message_Block* curr_block = block_;
+ int bytes_sent(0);
+
+ // Write each of the message blocks associated with this
+ // header into the stream
+ while(curr_block)
+ {
+ bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length());
+ if (bytes_sent > 0)
+ {
+ total_bytes_sent += bytes_sent;
+ curr_block = curr_block->next();
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ break;
+ }
+ }
+ if ((total_bytes_sent > 0) && this->next_)
+ {
+ int next_sent = this->next_->serialize(stream);
+ if (next_sent > 0)
+ {
+ total_bytes_sent += next_sent;
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ }
+ }
+ else
+ {
+ total_bytes_sent = -1;
+ }
+
+ return total_bytes_sent;
+}
+
+int ACE_PIP_Protocol_Message::deserialize(ACE_SOCK_Stream& stream)
+{
+ int total_bytes_received(-1);
+
+ ACE_Message_Block* header_block(0);
+
+ ACE_Message_Block* lengths_block(0);
+ ACE_Message_Block* curr_payload_block(0);
+ ACE_Message_Block* payload_blocks(0);
+
+ ACE_NEW_RETURN(header_block, ACE_Message_Block(FIXED_HEADER_LENGTH_), -1);
+ // Read the fixed-length portion of the protocol header.
+ int bytes_received = stream.recv_n(header_block->wr_ptr(), FIXED_HEADER_LENGTH_);
+ if (bytes_received == FIXED_HEADER_LENGTH_)
+ {
+ total_bytes_received = bytes_received;
+
+ // Determine number of data message blocks in the payload.
+ header_block->rd_ptr(FIXED_HEADER_LENGTH_ - sizeof(this->num_payload_blocks_));
+ ACE_OS::memcpy(&this->num_payload_blocks_, header_block->rd_ptr(),
+ sizeof(this->num_payload_blocks_));
+
+ header_block->reset();
+ header_block->wr_ptr(bytes_received);
+
+ // Extract the length of each payload block.
+ if (this->num_payload_blocks_ > 0)
+ {
+ // Read the lengths of each block.
+ int bytes_to_read = this->num_payload_blocks_ * sizeof(ACE_UINT32);
+ ACE_NEW_RETURN(lengths_block, ACE_Message_Block(bytes_to_read), -1);
+ bytes_received = stream.recv_n(lengths_block->wr_ptr(), bytes_to_read);
+
+ if (bytes_received == bytes_to_read)
+ {
+ total_bytes_received += bytes_received;
+ lengths_block->wr_ptr(bytes_received);
+
+ // The lengths of each block have been successfully written, so
+ // unpack them.
+ header_block->next(lengths_block);
+ block(header_block);
+
+ ACE_NEW_RETURN(curr_payload_block, ACE_Message_Block(payload_block_lengths_[0]), -1);
+ payload_blocks = curr_payload_block;
+ unsigned int i = 0;
+ for (; i < this->num_payload_blocks_ && bytes_received != -1; ++i)
+ {
+ // Read the block.
+ bytes_received = stream.recv_n(curr_payload_block->wr_ptr(),
+ payload_block_lengths_[i]);
+ if (bytes_received > 0)
+ {
+ total_bytes_received += bytes_received;
+ curr_payload_block->wr_ptr(bytes_received);
+ if (i < (this->num_payload_blocks_ - 1))
+ {
+ curr_payload_block->next(
+ new ACE_Message_Block(payload_block_lengths_[i + 1]));
+
+ curr_payload_block = curr_payload_block->next();
+ }
+ else
+ {
+ curr_payload_block->next(0);
+ }
+
+ }
+ else
+ {
+ total_bytes_received = -1;
+ break;
+ }
+ }
+ }
+ else
+ {
+ total_bytes_received = -1;
+ }
+ }
+ }
+ else
+ {
+ total_bytes_received = -1;
+ }
+
+ if (total_bytes_received > 0)
+ {
+ if (this->message_type_ == ACCEL)
+ {
+ ACE_NEW_RETURN(this->next_, ACE_PIP_Accel_Message, -1);
+ }
+ else
+ {
+ ACE_NEW_RETURN(this->next_, ACE_PIP_Data_Message, -1);
+ }
+
+ // Pass the payload blocks to the next message struct
+ // so it can unpack it.
+ this->next_->block(payload_blocks);
+ }
+ else if (block_)
+ {
+ // Something failed during reading, so cleanup any allocated memory.
+ block_->release();
+ }
+
+ return total_bytes_received;
+}
+
+void ACE_PIP_Protocol_Message::next(ACE_PIP_Message* next)
+{
+ // Determine the number and length of payload blocks.
+ payload_block_lengths_.clear();
+ this->num_payload_blocks_ = 0;
+ next->pack();
+ ACE_Message_Block* curr_block = next->block();
+ while (curr_block)
+ {
+ ++this->num_payload_blocks_;
+ payload_block_lengths_.push_back(curr_block->length());
+ curr_block = curr_block->next();
+ }
+
+ this->next_ = next;
+ this->dirty_ = true;
+}
+
+int ACE_PIP_Protocol_Message::process_message_payload(ACE_Message_Block* payload)
+{
+ payload_block_lengths_.clear();
+ this->num_payload_blocks_ = 0;
+
+ // Determine the length and number of payload blocks.
+ ACE_Message_Block* curr_block = payload;
+ while (curr_block)
+ {
+ ++this->num_payload_blocks_;
+ payload_block_lengths_.push_back(curr_block->length());
+ curr_block = curr_block->next();
+ }
+
+ if (!this->next_)
+ {
+ if (this->message_type_ == ACCEL)
+ {
+ ACE_NEW_RETURN(this->next_, ACE_PIP_Accel_Message, -1);
+ }
+ else
+ {
+ ACE_NEW_RETURN(this->next_, ACE_PIP_Data_Message, -1);
+ }
+ }
+
+ this->next_->block(payload);
+ this->dirty_ = true;
+
+ return 0;
+}
+
+int ACE_PIP_Protocol_Message::pack()
+{
+ if (!block_)
+ {
+ // Create the message buffer for the protocol header.
+ block_ = new ACE_Message_Block(FIXED_HEADER_LENGTH_);
+
+ // Create the message buffer for the list of payload block lengths.
+ block_->next(new ACE_Message_Block(this->num_payload_blocks_ * sizeof(ACE_UINT32)));
+ block_->next()->next(0);
+ this->dirty_ = true;
+ }
+ if (this->dirty_)
+ {
+ // Set the buffer pointers to the start of the buffer
+ // so that we write to the appropriate location.
+ block_->reset();
+
+ // pack the process Id.
+ ACE_OS::memcpy(block_->wr_ptr(), &message_id_, sizeof(message_id_));
+ block_->wr_ptr(sizeof (message_id_));
+
+ // Pack the message type.
+ ACE_OS::memcpy(block_->wr_ptr(), &this->message_type_, sizeof(this->message_type_));
+ block_->wr_ptr(sizeof(this->message_type_));
+
+ // Number of blocks in payload.
+ ACE_OS::memcpy(block_->wr_ptr(), &this->num_payload_blocks_, sizeof(this->num_payload_blocks_));
+ block_->wr_ptr(sizeof(this->num_payload_blocks_));
+
+ ACE_Message_Block* next_block = block_->next();
+ if (next_block)
+ {
+ next_block->reset();
+
+ // Write the block lengths into the message block.
+ for (unsigned int i = 0; i < this->num_payload_blocks_; ++i)
+ {
+ ACE_OS::memcpy(next_block->wr_ptr(),
+ &payload_block_lengths_[i],
+ sizeof(ACE_UINT32));
+
+ next_block->wr_ptr(sizeof(ACE_UINT32));
+ }
+ }
+
+ this->dirty_ = false;
+ }
+
+ return 0;
+}
+
+void ACE_PIP_Protocol_Message::unpack()
+{
+ if (block_)
+ {
+ char* write_ptr = block_->wr_ptr();
+ // char* read_ptr = block_->rd_ptr();
+ block_->reset();
+
+ // Extract the process ID.
+ ACE_OS::memcpy(&message_id_, block_->rd_ptr(), sizeof(message_id_));
+ block_->rd_ptr(sizeof (message_id_));
+
+ // Extract the message type.
+ ACE_OS::memcpy(&this->message_type_, block_->rd_ptr(), sizeof(this->message_type_));
+ block_->rd_ptr(sizeof(this->message_type_));
+
+ // Number of blocks in payload.
+ ACE_OS::memcpy(&this->num_payload_blocks_, block_->rd_ptr(),
+ sizeof(this->num_payload_blocks_));
+
+ block_->rd_ptr(sizeof(this->num_payload_blocks_));
+
+ // Reset buffer pointers to be where they were prior to unpacking.
+ block_->reset();
+ block_->wr_ptr(write_ptr);
+
+ // The next block holds the lengths of each payload block.
+ ACE_Message_Block* next_block = block_->next();
+ if (next_block)
+ {
+ write_ptr = next_block->wr_ptr();
+ next_block->reset();
+ payload_block_lengths_.resize(this->num_payload_blocks_, 0);
+ ACE_UINT32 block_length(0);
+
+ // Extract the lengths of each payload block, which will
+ // be used to recreate the structure of the original payload.
+ for (ACE_UINT32 i = 0; i < this->num_payload_blocks_; ++i)
+ {
+ ACE_OS::memcpy(&block_length, next_block->rd_ptr(), sizeof(block_length));
+ next_block->rd_ptr(sizeof(block_length));
+ payload_block_lengths_[i] = block_length;
+ }
+
+ // Reset the buffer pointers to where they were prior to unpacking.
+ next_block->reset();
+ next_block->wr_ptr(write_ptr);
+ }
+ }
+
+ this->dirty_ = false;
+}
+
+ACE_PIP_Protocol_Message* ACE_PIP_Protocol_Message::copy()
+{
+ ACE_PIP_Protocol_Message* message_copy(0);
+ ACE_NEW_RETURN(message_copy, ACE_PIP_Protocol_Message, 0);
+ message_copy->message_type_ = this->message_type_;
+ message_copy->num_payload_blocks_ = this->num_payload_blocks_;
+ for (ACE_UINT32 block_index = 0; block_index < this->num_payload_blocks_; ++block_index)
+ {
+ message_copy->payload_block_lengths_[block_index] = payload_block_lengths_[block_index];
+ }
+
+ return message_copy;
+}
diff --git a/ACE/ace/PIP_Messages.h b/ACE/ace/PIP_Messages.h
new file mode 100644
index 00000000000..4a8b3e95075
--- /dev/null
+++ b/ACE/ace/PIP_Messages.h
@@ -0,0 +1,255 @@
+ /**
+ * @file PIP_Messages
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a heirarchy of
+ * classes that represent the various messages used in the
+ * priority inheritance protocol
+*/
+
+#ifndef _PIP_MESSAGES_H_
+#define _PIP_MESSAGES_H_
+
+#include "ace/Message_Block.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Vector_T.h"
+
+/**
+ * @class ACE_PIP_Message
+ * @brief Base class for all messages used in
+ * the implementation of a distributed priority inheritance
+ * protocol.
+ *
+ * Base class for all messages used in the implementation of a distributed
+ * priority inheritance protocol. Provides an interface for message (de)serialization,
+ * message chaining, packing, unpacking, and payload ownership transfer
+ */
+class ACE_Export ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Message();
+ virtual ~ACE_PIP_Message();
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream) = 0;
+
+ /// Get the next message struct.
+ virtual ACE_PIP_Message* next();
+
+ /// Set the next message struct.
+ virtual void next(ACE_PIP_Message* next);
+
+ /// Returns the next message, making the caller
+ /// the new owner.
+ virtual ACE_PIP_Message* release_next();
+
+ /// Get the message block.
+ virtual ACE_Message_Block* block();
+
+ /// Set the message block and populate the message struct
+ /// with message contents.
+ virtual void block(ACE_Message_Block* block);
+
+ /// Get the message block, making the caller the new owner.
+ virtual ACE_Message_Block* release_block();
+
+ /// Place the values in the message struct into the message block.
+ virtual int pack() = 0;
+
+ /// Populate the message struct using values from the message block.
+ virtual void unpack() = 0;
+
+ /// This is temporarily public to facilitate testing.
+ /// It should eventually be made private.
+ ACE_Message_Block* block_;
+
+ protected:
+
+ // Indicates values in structure are newer than values in the
+ // message block.
+ bool dirty_;
+
+ ACE_PIP_Message* next_;
+};
+
+/**
+ * @class ACE_PIP_Data_Message
+ * @brief Structure representing the fields of an application-
+ * level protocol message and associated header values
+ *
+ * Structure representing the fields of an appliation level
+ * protocol message and associated header values. Structure is that
+ * of several contiguous ACE_Message_Block's. The message is configurable
+ * to support any application-level protocol that contains at least the following
+ * data: source address, destination address, reply expectation, and priority
+ *
+*/
+class ACE_Export ACE_PIP_Data_Message : public ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Data_Message();
+ virtual ~ACE_PIP_Data_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ /// Determine if a reply message is expected
+ bool reply_expected() const;
+ void reply_expected(bool expected);
+
+ /// Determine the priority at which this message should be handled
+ ACE_UINT32 message_priority() const;
+ void message_priority(ACE_UINT32 priority);
+
+ /// Determine the ID of the destination handler
+ ACE_UINT32 destination_handler_ID() const;
+ void destination_handler_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the sending handler
+ ACE_UINT32 source_handler_ID() const;
+ void source_handler_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the destination site
+ ACE_UINT32 destination_site_ID() const;
+ void destination_site_ID(ACE_UINT32 ID);
+
+ /// Determine the ID of the sending site
+ ACE_UINT32 source_site_ID() const;
+ void source_site_ID(ACE_UINT32 ID);
+
+ // Place the values from the struct into the message blocks.
+ virtual int pack();
+
+ // Extract the values from the message blocks into the structs.
+ virtual void unpack();
+
+ private:
+
+ ACE_UINT32 message_priority_;
+ bool reply_expected_;
+ ACE_UINT32 source_handler_id_;
+ ACE_UINT32 source_site_id_;
+ ACE_UINT32 destination_handler_id_;
+ ACE_UINT32 destination_site_id_;
+};
+
+/**
+ * @class ACE_PIP_Protocol_Message
+ * @brief Structure representing a message supported by the priority
+ * inheritance protocol
+ *
+*/
+
+class ACE_Export ACE_PIP_Accel_Message : public ACE_PIP_Message
+{
+ public:
+
+ ACE_PIP_Accel_Message();
+ virtual ~ACE_PIP_Accel_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ ACE_UINT32 old_priority() const;
+ void old_priority(ACE_UINT32 priority);
+
+ ACE_UINT32 new_priority() const;
+ void new_priority(ACE_UINT32 priority);
+
+ /// Get the address of the application receiving the message.
+ ACE_UINT32 destination_address() const;
+ void destination_address(const ACE_UINT32& address);
+
+ u_short destination_port() const;
+ void destination_port(u_short port);
+
+ /// Place the values in the message struct into the message block.
+ virtual int pack();
+
+ /// Extract the values from the message block and store them in the struct.
+ virtual void unpack();
+
+ /// Return a copy of the this message
+ ACE_PIP_Accel_Message* copy();
+
+ private:
+
+ const ACE_UINT32 ACCEL_HEADER_LENGTH_;
+ ACE_UINT32 destination_address_;
+ u_short destination_port_;
+ ACE_UINT32 new_priority_;
+ ACE_UINT32 old_priority_;
+};
+
+/**
+ * @class ACE_PIP_Accel_Message
+ * @brief Structure representing an acceleration message
+ * used in the implementation of a priority inheritance protocol
+ *
+ * Structure representing an acceleration message used in the
+ * implementation of a priority inheritance protocol. Indicates the
+ * old and new priority of the targeted process, as well as the address
+ * of handler to which the associated message was sent.
+*/
+class ACE_Export ACE_PIP_Protocol_Message : public ACE_PIP_Message
+{
+ public:
+
+ enum Message_Type { NONE, ACCEL, DATA, REQUEST, RESPONSE };
+
+ ACE_PIP_Protocol_Message();
+ virtual ~ACE_PIP_Protocol_Message(){}
+
+ /// Send the contents of this message over the stream.
+ virtual int serialize(ACE_SOCK_Stream& stream);
+
+ /// Receive the contents of this message from the stream.
+ virtual int deserialize(ACE_SOCK_Stream& stream);
+
+ /// Set the next message in the chain.
+ virtual void next(ACE_PIP_Message* next);
+
+ /// Get the next message in the chain
+ virtual ACE_PIP_Message* next();
+
+ /// Determine the type of message this header has been tacked onto.
+ Message_Type message_type() const;
+ void message_type(Message_Type type);
+
+ /// Determine which call chain this message is associated with.
+ ACE_UINT64 message_id() const;
+ void message_id(ACE_UINT64 id);
+
+ /// Attach message block as payload of priority inheritance
+ /// protocol message.
+ int process_message_payload(ACE_Message_Block* payload);
+
+ virtual int pack();
+ virtual void unpack();
+
+ /// Make a copy of the header of this message, i.e. without
+ /// data or accel payload
+ ACE_PIP_Protocol_Message* copy();
+
+ const int FIXED_HEADER_LENGTH_;
+
+ private:
+
+ Message_Type message_type_;
+ ACE_UINT32 num_payload_blocks_;
+ ACE_Vector<ACE_UINT32> payload_block_lengths_;
+ ACE_UINT64 message_id_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "PIP_Messages.inl"
+#endif /* __ACE_INLINE__ */
+
+
+#endif
+
diff --git a/ACE/ace/PIP_Messages.inl b/ACE/ace/PIP_Messages.inl
new file mode 100644
index 00000000000..03145e441bf
--- /dev/null
+++ b/ACE/ace/PIP_Messages.inl
@@ -0,0 +1,186 @@
+/**************************************************
+ *
+ * ACE_PIP_Message - Inline Methods
+ *
+ **************************************************/
+inline ACE_PIP_Message* ACE_PIP_Message::next()
+{
+ return this->next_;
+}
+
+inline void ACE_PIP_Message::next(ACE_PIP_Message* message)
+{
+ this->next_ = message;
+}
+
+inline ACE_PIP_Message* ACE_PIP_Message::release_next()
+{
+ ACE_PIP_Message* temp = this->next_;
+ this->next_ = 0;
+ return temp;
+}
+
+inline ACE_Message_Block* ACE_PIP_Message::block()
+{
+ return this->block_;
+}
+
+inline ACE_Message_Block* ACE_PIP_Message::release_block()
+{
+ ACE_Message_Block* temp_block = this->block_;
+ this->block_ = 0;
+ this->dirty_ = true;
+ return temp_block;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Data_Message - Inline Methods
+ *
+ **************************************************/
+
+inline bool ACE_PIP_Data_Message::reply_expected() const
+{
+ return this->reply_expected_;
+}
+
+inline void ACE_PIP_Data_Message::reply_expected(bool expected)
+{
+ this->dirty_ = true;
+ this->reply_expected_ = expected;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::message_priority() const
+{
+ return this->message_priority_;
+}
+
+inline void ACE_PIP_Data_Message::message_priority(ACE_UINT32 priority)
+{
+ this->dirty_ = true;
+ this->message_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::destination_handler_ID() const
+{
+ return this->destination_handler_id_;
+}
+
+inline void ACE_PIP_Data_Message::destination_handler_ID(ACE_UINT32 ID)
+{
+ this->destination_handler_id_ = ID;
+ this->dirty_ = true;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::source_handler_ID() const
+{
+ return this->source_handler_id_;
+}
+
+inline void ACE_PIP_Data_Message::source_handler_ID(ACE_UINT32 ID)
+{
+ this->source_handler_id_ = ID;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::source_site_ID() const
+{
+ return this->source_site_id_;
+}
+
+inline void ACE_PIP_Data_Message::source_site_ID(ACE_UINT32 ID)
+{
+ this->source_site_id_ = ID;
+}
+
+inline ACE_UINT32 ACE_PIP_Data_Message::destination_site_ID() const
+{
+ return this->destination_site_id_;
+}
+
+inline void ACE_PIP_Data_Message::destination_site_ID(ACE_UINT32 ID)
+{
+ this->destination_site_id_ = ID;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Accel_Message - Inline Methods
+ *
+ **************************************************/
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::old_priority() const
+{
+ return old_priority_;
+}
+
+inline void ACE_PIP_Accel_Message::old_priority(ACE_UINT32 priority)
+{
+ this->dirty_ = true;
+ old_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::new_priority() const
+{
+ return new_priority_;
+}
+
+inline void ACE_PIP_Accel_Message::new_priority(ACE_UINT32 priority)
+{
+ this->dirty_ = true;
+ new_priority_ = priority;
+}
+
+inline ACE_UINT32 ACE_PIP_Accel_Message::destination_address() const
+{
+ return destination_address_;
+}
+
+inline void ACE_PIP_Accel_Message::destination_address(const ACE_UINT32& address)
+{
+ this->dirty_ = true;
+ destination_address_ = address;
+}
+
+inline u_short ACE_PIP_Accel_Message::destination_port() const
+{
+ return destination_port_;
+}
+
+inline void ACE_PIP_Accel_Message::destination_port(u_short port)
+{
+ destination_port_ = port;
+}
+
+/**************************************************
+ *
+ * ACE_PIP_Protocol_Message - Inline Methods
+ *
+ **************************************************/
+
+inline void ACE_PIP_Protocol_Message::
+ message_type(ACE_PIP_Protocol_Message::Message_Type type)
+{
+ message_type_ = type;
+ this->dirty_ = true;
+}
+
+inline ACE_PIP_Protocol_Message::Message_Type ACE_PIP_Protocol_Message::message_type() const
+{
+ return message_type_;
+}
+
+inline ACE_UINT64 ACE_PIP_Protocol_Message::message_id() const
+{
+ return message_id_;
+}
+
+inline void ACE_PIP_Protocol_Message::message_id(ACE_UINT64 id)
+{
+ this->dirty_ = true;
+ message_id_ = id;
+}
+
+inline ACE_PIP_Message* ACE_PIP_Protocol_Message::next()
+{
+ return this->next_;
+}
diff --git a/ACE/ace/PIP_Reactive_IO_Handler.cpp b/ACE/ace/PIP_Reactive_IO_Handler.cpp
new file mode 100644
index 00000000000..4eca6890705
--- /dev/null
+++ b/ACE/ace/PIP_Reactive_IO_Handler.cpp
@@ -0,0 +1,64 @@
+// $Id$
+
+#include "ace/OS_NS_sys_time.h"
+#include "ace/PIP_Reactive_IO_Handler.h"
+#include "ace/PIP_Invocation_Manager.h"
+
+/// Constructor
+ACE_PIP_Reactive_IO_Handler::ACE_PIP_Reactive_IO_Handler()
+{
+}
+
+ACE_PIP_Reactive_IO_Handler::~ACE_PIP_Reactive_IO_Handler()
+{
+}
+
+/// Closes all remote connections.
+int ACE_PIP_Reactive_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
+{
+ ACE_UNUSED_ARG(handle);
+ switch(close_mask)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ read_closed_ = true;
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ write_closed_ = true;
+ break;
+ };
+
+ if (read_closed_ && write_closed_)
+ {
+ // Close our end of the connection
+ peer_.close_reader();
+ peer_.close_writer();
+
+ // un-register with invocation manager so it doesn't
+ // try to use the handler for IO
+ ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this);
+
+ delete this;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/// Enqueue a message to be sent
+int ACE_PIP_Reactive_IO_Handler::put_message (ACE_PIP_Protocol_Message* message)
+{
+ big_lock_.acquire();
+ outgoing_message_queue_.enqueue_head(message);
+ big_lock_.release();
+
+ // Register so Reactor tells us to send the message
+ ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::WRITE_MASK);
+ ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
+
+ return 0;
+}
+
+
+
+
diff --git a/ACE/ace/PIP_Reactive_IO_Handler.h b/ACE/ace/PIP_Reactive_IO_Handler.h
new file mode 100644
index 00000000000..ae50ebf9b27
--- /dev/null
+++ b/ACE/ace/PIP_Reactive_IO_Handler.h
@@ -0,0 +1,54 @@
+ /**
+ * @file PIP_IO_Handler.cpp
+ *
+ * // $Id$
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ *
+ * This file contains the specification for a class
+ * that manages network I/O
+*/
+
+
+#ifndef _PIP_REACTIVE_IO_HANDLER_H_
+#define _PIP_REACTIVE_IO_HANDLER_H_
+
+
+#include "ace/Message_Queue.h"
+#include "ace/PIP_IO_Handler.h"
+#include "ace/PIP_Messages.h"
+
+/**
+ * @class ACE_PIP_Reactive_IO_Handler
+ *
+ * @brief Performs reactive network I/O in
+ * the context of a distributed system
+ * employing the the priority inheritance
+ * protocol
+ *
+ * @author John Moore <ljohn7@gmail.com>
+ */
+class ACE_Export ACE_PIP_Reactive_IO_Handler :
+ public ACE_PIP_IO_Handler
+{
+ public:
+
+ /// Constructor
+ ACE_PIP_Reactive_IO_Handler ();
+ ~ACE_PIP_Reactive_IO_Handler();
+
+ /// Closes all remote connections.
+ virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
+
+ /// Enqueue a message to be sent
+ virtual int put_message (ACE_PIP_Protocol_Message* message);
+
+ private:
+
+};
+
+
+
+#endif /* _PIP_Reactive_IO_Handler_H_ */
+
+