From ec9c7fc2857ffd49059789828843c555d9c428ca Mon Sep 17 00:00:00 2001 From: jmoore Date: Fri, 25 Jul 2008 03:00:01 +0000 Subject: initial Priority Inheritance Protocol commit --- ACE/ace/MT_Priority_Reactor.cpp | 611 ++++++++++++++++++++++++++++++++++++ ACE/ace/MT_Priority_Reactor.h | 252 +++++++++++++++ ACE/ace/PIP_Active_IO_Handler.cpp | 109 +++++++ ACE/ace/PIP_Active_IO_Handler.h | 55 ++++ ACE/ace/PIP_Connection_Manager.cpp | 207 ++++++++++++ ACE/ace/PIP_Connection_Manager.h | 81 +++++ ACE/ace/PIP_DA_Strategy_Adapter.cpp | 4 + ACE/ace/PIP_DA_Strategy_Adapter.h | 258 +++++++++++++++ ACE/ace/PIP_Dispatcher.cpp | 468 +++++++++++++++++++++++++++ ACE/ace/PIP_Dispatcher.h | 188 +++++++++++ ACE/ace/PIP_IO_Handler.cpp | 151 +++++++++ ACE/ace/PIP_IO_Handler.h | 103 ++++++ ACE/ace/PIP_IO_Handler.inl | 36 +++ ACE/ace/PIP_Invocation_Manager.cpp | 316 +++++++++++++++++++ ACE/ace/PIP_Invocation_Manager.h | 150 +++++++++ ACE/ace/PIP_Message_Handler.cpp | 38 +++ ACE/ace/PIP_Message_Handler.h | 82 +++++ ACE/ace/PIP_Message_Handler.inl | 22 ++ ACE/ace/PIP_Messages.cpp | 577 ++++++++++++++++++++++++++++++++++ ACE/ace/PIP_Messages.h | 255 +++++++++++++++ ACE/ace/PIP_Messages.inl | 186 +++++++++++ ACE/ace/PIP_Reactive_IO_Handler.cpp | 64 ++++ ACE/ace/PIP_Reactive_IO_Handler.h | 54 ++++ 23 files changed, 4267 insertions(+) create mode 100644 ACE/ace/MT_Priority_Reactor.cpp create mode 100644 ACE/ace/MT_Priority_Reactor.h create mode 100644 ACE/ace/PIP_Active_IO_Handler.cpp create mode 100644 ACE/ace/PIP_Active_IO_Handler.h create mode 100644 ACE/ace/PIP_Connection_Manager.cpp create mode 100644 ACE/ace/PIP_Connection_Manager.h create mode 100644 ACE/ace/PIP_DA_Strategy_Adapter.cpp create mode 100644 ACE/ace/PIP_DA_Strategy_Adapter.h create mode 100644 ACE/ace/PIP_Dispatcher.cpp create mode 100644 ACE/ace/PIP_Dispatcher.h create mode 100644 ACE/ace/PIP_IO_Handler.cpp create mode 100644 ACE/ace/PIP_IO_Handler.h create mode 100644 ACE/ace/PIP_IO_Handler.inl create mode 100644 ACE/ace/PIP_Invocation_Manager.cpp create mode 100644 ACE/ace/PIP_Invocation_Manager.h create mode 100644 ACE/ace/PIP_Message_Handler.cpp create mode 100644 ACE/ace/PIP_Message_Handler.h create mode 100644 ACE/ace/PIP_Message_Handler.inl create mode 100644 ACE/ace/PIP_Messages.cpp create mode 100644 ACE/ace/PIP_Messages.h create mode 100644 ACE/ace/PIP_Messages.inl create mode 100644 ACE/ace/PIP_Reactive_IO_Handler.cpp create mode 100644 ACE/ace/PIP_Reactive_IO_Handler.h diff --git a/ACE/ace/MT_Priority_Reactor.cpp b/ACE/ace/MT_Priority_Reactor.cpp new file mode 100644 index 00000000000..582b59ad25b --- /dev/null +++ b/ACE/ace/MT_Priority_Reactor.cpp @@ -0,0 +1,611 @@ +// $Id$ + +#include "ace/MT_Priority_Reactor.h" +#include "ace/Thread.h" +#include "ace/Timer_Queue.h" +#include "ace/Sig_Handler.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_time.h" + +#include // for debugging + +#if !defined (__ACE_INLINE__) +#include "ace/MT_Priority_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (ace, + MT_Priority_Reactor, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_ALLOC_HOOK_DEFINE (ACE_MT_Priority_Reactor) + +ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor (ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int mask_signals, + int s_queue) +: ACE_TP_Reactor (sh, tq, mask_signals, s_queue) + , _bucket(0) + , _bucketize_socket_events(false) + , _current_priority(ACE_Event_Handler::HI_PRIORITY) + , _max_priority(0) + , _min_priority(0) + , _num_socket_events(0) + , _tuple_allocator(0) +{ + ACE_TRACE ("ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor"); + this->init_bucket(); +} + +ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor (size_t max_number_of_handles, + int restart, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int mask_signals, + int s_queue) + : ACE_TP_Reactor (max_number_of_handles, restart, sh, tq, mask_signals, s_queue) + , _bucket(0) + , _current_priority(ACE_Event_Handler::HI_PRIORITY) + , _max_priority(0) + , _min_priority(0) + , _num_socket_events(0) + , _tuple_allocator(0) +{ + ACE_TRACE ("ACE_MT_Priority_Reactor::ACE_MT_Priority_Reactor"); + this->init_bucket(); +} + +ACE_MT_Priority_Reactor::~ACE_MT_Priority_Reactor() +{ + ACE_TRACE ("ACE_MT_Priority_Reactor::~ACE_MT_Priority_Reactor"); + for (int index = 0; index < npriorities; ++index) + { + delete _bucket[index]; + } + + delete [] _bucket; + delete _tuple_allocator; +} + +int +ACE_MT_Priority_Reactor::handle_events (ACE_Time_Value &max_wait_time) +{ + return this->handle_events (&max_wait_time); +} + +int +ACE_MT_Priority_Reactor::handle_events (ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_MT_Priority_Reactor::handle_events"); + + // Stash the current time -- the destructor of this object will + // automatically compute how much time elapsed since this method was + // called. + ACE_Countdown_Time countdown (max_wait_time); + + + // Instantiate the token guard which will try grabbing the token for + // this thread. + ACE_TP_Token_Guard guard (this->token_); + + int const result = guard.acquire_read_token (max_wait_time); + + // If the guard is NOT the owner just return the retval + if (!guard.is_owner ()) + return result; + + // After getting the lock just just for deactivation. + if (this->deactivated_) + return -1; + + // Update the countdown to reflect time waiting for the token. + countdown.update (); + + return this->dispatch_handler (max_wait_time, + guard); +} + +int +ACE_MT_Priority_Reactor::dispatch_handler (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard) +{ + int num_events = + this->get_an_event_for_dispatching (max_wait_time); + + int initial_event_count = num_events; + int result = 0; + + result = this->handle_timer (num_events, + guard); + + // If we've dispatched a timer event handler, return the thread + if (result > 0) + return result; + + // Else just go ahead fall through for further handling. + if (num_events > 0) + { + // Next dispatch the notification handlers (if there are any to + // dispatch). These are required to handle multiple-threads + // that are trying to update the . + result = this->handle_notify (_num_socket_events, + guard); + + if (result > 0) + return result; + + // Else just fall through for further handling + } + + if (num_events > 0) + { + // Sort events into priority buckets + if (_bucketize_socket_events) + { + bucketize_socket_events(); + } + + // Handle the highest priority event + result = this->handle_socket(num_events, + guard); + } + + return result; +} + +int +ACE_MT_Priority_Reactor::get_an_event_for_dispatching (ACE_Time_Value *max_wait_time) +{ + // If the reactor handler state has changed, clear any remembered + // ready bits and re-scan from the master wait_set. + if (this->state_changed_) + { + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + + this->state_changed_ = false; + } + else + { + // This is a hack... somewhere, under certain conditions (which + // I don't understand...) the mask will have all of its bits clear, + // yet have a size_ > 0. This is an attempt to remedy the affect, + // without knowing why it happens. + + this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ()); + this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ()); + this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ()); + } + + if (_num_socket_events > 0) + return _num_socket_events; + + else + { + int num_events = wait_for_multiple_events (this->ready_set_, + max_wait_time); + + _bucketize_socket_events = true; + + if (num_events > 0) + preprocess_new_event_set(); + + return num_events; + } +} + +int +ACE_MT_Priority_Reactor::handle_timer(int & /*event_count*/, + ACE_TP_Token_Guard &guard) +{ + if (this->timer_queue_ == 0 || this->timer_queue_->is_empty()) + { // Empty timer queue so cannot have any expired timers. + return 0; + } + + // Get the current time + ACE_Time_Value cur_time (this->timer_queue_->gettimeofday () + + this->timer_queue_->timer_skew ()); + + // Look for a node in the timer queue whose timer <= the present + // time. + ACE_Timer_Node_Dispatch_Info info; + + if (this->timer_queue_->dispatch_info (cur_time, + info)) + { + const void *upcall_act = 0; + + // Preinvoke. + this->timer_queue_->preinvoke (info, + cur_time, + upcall_act); + + // Release the token before dispatching notifies... + guard.release_token (); + + // call the functor + this->timer_queue_->upcall (info, + cur_time); + + // Postinvoke + this->timer_queue_->postinvoke (info, + cur_time, + upcall_act); + + // We have dispatched a timer + return 1; + } + + return 0; +} + +int +ACE_MT_Priority_Reactor::handle_notify(int & /*event_count*/, + ACE_TP_Token_Guard &guard) +{ + + // Get the handle on which notify calls could have occured + ACE_HANDLE notify_handle = + this->get_the_notify_handle (); + + int result = 0; + + // The notify was not in the list returned by + // wait_for_multiple_events (). + if (notify_handle == ACE_INVALID_HANDLE) + return result; + + // Now just do a read on the pipe.. + ACE_Notification_Buffer buffer; + + // Clear the handle of the read_mask of our + this->ready_set_.rd_mask_.clr_bit (notify_handle); + + // Keep reading notifies till we empty it or till we have a + // dispatchable buffer + while (this->notify_handler_->read_notify_pipe (notify_handle, + buffer) > 0) + { + // Just figure out whether we can read any buffer that has + // dispatchable info. If not we have just been unblocked by + // another thread trying to update the reactor. If we get any + // buffer that needs dispatching we will dispatch that after + // releasing the lock + if (this->notify_handler_->is_dispatchable (buffer) > 0) + { + // Release the token before dispatching notifies... + guard.release_token (); + + // Dispatch the upcall for the notify + this->notify_handler_->dispatch_notify (buffer); + + // We had a successful dispatch. + result = 1; + + // break out of the while loop + break; + } + + + } + + // If we did some work, then we just return 1 which will allow us + // to get out of here. If we return 0, then we will be asked to do + // some work ie. dispacth socket events + return result; +} + +int +ACE_MT_Priority_Reactor::handle_socket(int &event_count, + ACE_TP_Token_Guard &guard) +{ + // We got the lock, lets handle some I/O events. + ACE_Handle_Dispatch_Info dispatch_info; + + if (this->get_sock_event_info (dispatch_info) == 0) + { + return 0; + } + + // If there is any event handler that is ready to be dispatched, the + // dispatch information is recorded in dispatch_info. + if (!dispatch_info.dispatch ()) + { + // Check for removed handlers. + if (dispatch_info.event_handler_ == 0) + { + this->handler_rep_.unbind(dispatch_info.handle_, + dispatch_info.mask_); + } + + return 0; + } + + // Suspend the handler so that other threads don't start dispatching + // it, if we can't suspend then return directly + if (dispatch_info.event_handler_ != this->notify_handler_) + if (this->suspend_i (dispatch_info.handle_) == -1) + return 0; + + // Call add_reference() if needed. + if (dispatch_info.reference_counting_required_) + dispatch_info.event_handler_->add_reference (); + + // Do any pre-dispatch processing + preprocess_chosen_handler(dispatch_info); + + // Release the lock so that another thread can begin dispatching + guard.release_token (); + + int result = 0; + + // Dispatched an event + if (this->dispatch_sock_event (dispatch_info) == 0) + ++result; + + return result; +} + +ACE_HANDLE +ACE_MT_Priority_Reactor::get_the_notify_handle (void) +{ + // Call the notify handler to get a handle on which we would have a + // notify waiting + ACE_HANDLE const read_handle = + this->notify_handler_->notify_handle (); + + // Check whether the rd_mask has been set on that handle. If so + // return the handle. + if (read_handle != ACE_INVALID_HANDLE && + this->ready_set_.rd_mask_.is_set (read_handle)) + { + return read_handle; + } + + // None found.. + return ACE_INVALID_HANDLE; +} + +int +ACE_MT_Priority_Reactor::get_sock_event_info (ACE_Handle_Dispatch_Info &event) +{ + int found_io = 0; + + // Iterate through buckets to find the highest-priority socket event + if (_num_socket_events > 0) + { + ACE_Handle_Dispatch_Info event_info; + _current_priority = ACE_Event_Handler::HI_PRIORITY; + while (_current_priority >= _min_priority) + { + if (!_bucket[_current_priority]->is_empty()) + { + size_t size = _bucket[_current_priority]->size(); + size_t count = 0; + while (count < size) + { + ++count; + --_num_socket_events; + _bucket[_current_priority]->dequeue_head(event_info); + if (!this->is_suspended_i(event_info.handle_)) + { + found_io = 1; + event = event_info; + break; + } + } + if (found_io) + { + break; + } + } + else + { + // There are no more messages at this priority, so continue + // to the next lowest priority + --_current_priority; + } + } + } + + return found_io; +} + +void ACE_MT_Priority_Reactor::init_bucket() +{ + // Create a tuple allocator + ACE_NEW (this->_tuple_allocator, + TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE)); + + // Create a list of pointers, one per priority level + ACE_NEW (this->_bucket, + QUEUE *[npriorities]); + + // Create a bucket for each priority level + for (int i = 0; i < npriorities; ++i) + ACE_NEW (this->_bucket[i], + QUEUE (this->_tuple_allocator)); +} + +// Dispatches a single event handler +int +ACE_MT_Priority_Reactor::dispatch_sock_event (ACE_Handle_Dispatch_Info &dispatch_info) +{ + ACE_TRACE ("ACE_MT_Priority_Reactor::dispatch_socket_event"); + + ACE_Event_Handler * const event_handler = dispatch_info.event_handler_; + ACE_EH_PTMF const callback = dispatch_info.callback_; + + // Check for removed handlers. + if (event_handler == 0) + return -1; + + // Upcall. If the handler returns positive value (requesting a + // reactor callback) don't set the ready-bit because it will be + // ignored if the reactor state has changed. Just call back + // as many times as the handler requests it. Other threads are off + // handling other things. + int status = 1; + + while (status > 0) + status = (event_handler->*callback) (dispatch_info.handle_); + + // Post process socket event + return this->post_process_sock_event (dispatch_info, status); +} + +int +ACE_MT_Priority_Reactor::post_process_sock_event (ACE_Handle_Dispatch_Info &dispatch_info, + int status) +{ + int result = 0; + + // First check if we really have to post process something, if not, then + // we don't acquire the token which saves us a lot of time. + if (status < 0 || + (dispatch_info.event_handler_ != this->notify_handler_ && + dispatch_info.resume_flag_ == + ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)) + { + // Get the reactor token and with this token acquired remove first the + // handler and resume it at the same time. This must be atomic, see also + // bugzilla 2395. When this is not atomic it can be that we resume the + // handle after it is reused by the OS. + ACE_TP_Token_Guard guard (this->token_); + + result = guard.acquire_token (); + + // If the guard is NOT the owner just return the retval + if (!guard.is_owner ()) + return result; + + // A different event handler may have been registered during the + // upcall if the handle was closed and then reopened, for + // example. Make sure we're removing and/or resuming the event + // handler used during the upcall. + ACE_Event_Handler const * const eh = + this->handler_rep_.find (dispatch_info.handle_); + + // Only remove or resume the event handler used during the + // upcall. + if (eh == dispatch_info.event_handler_) + { + if (status < 0) + { + result = + this->remove_handler_i (dispatch_info.handle_, + dispatch_info.mask_); + } + + // Resume handler if required. + if (dispatch_info.event_handler_ != this->notify_handler_ && + dispatch_info.resume_flag_ == + ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER) + this->resume_i (dispatch_info.handle_); + } + } + + // Call remove_reference() if needed. + if (dispatch_info.reference_counting_required_) + dispatch_info.event_handler_->remove_reference (); + + return result; +} + +void +ACE_MT_Priority_Reactor::notify_handle (ACE_HANDLE, + ACE_Reactor_Mask, + ACE_Handle_Set &, + ACE_Event_Handler *eh, + ACE_EH_PTMF) +{ + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("ACE_MT_Priority_Reactor::notify_handle: ") + ACE_LIB_TEXT ("Wrong version of notify_handle() got called \n"))); + + ACE_ASSERT (eh == 0); + ACE_UNUSED_ARG (eh); +} + +int ACE_MT_Priority_Reactor::bucketize_socket_events() +{ + int result(0); + _bucketize_socket_events = false; + ACE_HANDLE handle; + + // Place each active handle in the appropriate priority bucket + ACE_Handle_Set_Iterator wr_handle_iter (this->ready_set_.wr_mask_); + while ((handle = wr_handle_iter ()) != ACE_INVALID_HANDLE) + { + ACE_Handle_Dispatch_Info event; + + // To avoid dispatching an event whose associated handle + // is already suspended, don't place that event in the bucket + if (this->is_suspended_i (handle)) + continue; + + // Create a token containing all information + // necessary to dispatch the event at the appropriate + // moment + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::WRITE_MASK, + &ACE_Event_Handler::handle_output); + + + ++_num_socket_events; + + // Store the token in the appropriate bucket + _bucket[event.event_handler_->priority()]->enqueue_tail(event); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + } + + ACE_Handle_Set_Iterator ex_handle_iter (this->ready_set_.ex_mask_); + while ((handle = ex_handle_iter ()) != ACE_INVALID_HANDLE) + { + ACE_Handle_Dispatch_Info event; + + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::EXCEPT_MASK, + &ACE_Event_Handler::handle_exception); + + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + } + + ACE_Handle_Set_Iterator rd_handle_iter (this->ready_set_.rd_mask_); + while ((handle = rd_handle_iter ()) != ACE_INVALID_HANDLE) + { + ACE_Handle_Dispatch_Info event; + + if (this->is_suspended_i (handle)) + continue; + + // Remember this info + event.set (handle, + this->handler_rep_.find (handle), + ACE_Event_Handler::READ_MASK, + &ACE_Event_Handler::handle_input); + + ++_num_socket_events; + _bucket[event.event_handler_->priority()]->enqueue_tail(event); + this->ready_set_.wr_mask_.clr_bit (handle); + this->ready_set_.ex_mask_.clr_bit (handle); + this->ready_set_.rd_mask_.clr_bit (handle); + } + + return result; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/MT_Priority_Reactor.h b/ACE/ace/MT_Priority_Reactor.h new file mode 100644 index 00000000000..a8df0ae66c7 --- /dev/null +++ b/ACE/ace/MT_Priority_Reactor.h @@ -0,0 +1,252 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file MT_Priority_Reactor.h + * + * $Id$ + * + * The ACE_MT_Priority_Reactor, like the ACE_TP_Reactor, uses the + * Leader/Followers pattern to demultiplex events among a pool of + * threads. When using a thread pool reactor, an application + * pre-spawns a _fixed_ number of threads. When these threads + * invoke the ACE_MT_Priority_Reactor's method, one thread + * will become the leader and wait for an event. The other + * follower threads will queue up waiting for their turn to become + * the leader. When an event occurs, the leader will pick a + * follower to become the leader and go on to handle the event. + * The consequence of using ACE_TP_Reactor is the amortization of + * the costs used to creating threads. The context switching cost + * will also reduce. More over, the total resources used by + * threads are bounded because there are a fixed number of threads. + * + * The ACE_MT_Priority_Reactor differs from the TP reactor in that it + * dispatches socket events in priority order, according to the priorites + * specified in each registered event handler + * + * @author John Moore + */ +//============================================================================= + + +#ifndef _ACE_MT_PRIORITY_REACTOR_H_ +#define _ACE_MT_PRIORITY_REACTOR_H_ + +#include /**/ "ace/pre.h" + +#include "ace/TP_Reactor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +#include "ace/Malloc_T.h" +#include "ace/Unbounded_Queue.h" + +#define npriorities ACE_Event_Handler::HI_PRIORITY-ACE_Event_Handler::LO_PRIORITY+1 + +/** + * @class ACE_Handle_Dispatch_Info + * + * @brief This structure contains all information for needed to + * dispatch the corresponding event handler + */ +class ACE_Handle_Dispatch_Info +{ + public: + + /// Default constructor + ACE_Handle_Dispatch_Info (void); + + /// Method for setting + void set (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + ACE_EH_PTMF callback); + + /// Dispatch indicates whether or not it's safe to dispatch + /// the associated handler + bool dispatch (void) const; + + ACE_HANDLE handle_; + ACE_Event_Handler *event_handler_; + ACE_Reactor_Mask mask_; + ACE_EH_PTMF callback_; + int resume_flag_; + bool reference_counting_required_; + + private: + bool dispatch_; +}; + + +typedef ACE_Cached_Allocator +< ACE_Node< ACE_Handle_Dispatch_Info >, + ACE_SYNCH_NULL_MUTEX > TUPLE_ALLOCATOR; + +typedef ACE_Unbounded_Queue_Iterator +< ACE_Handle_Dispatch_Info > QUEUE_ITERATOR; + +/** + * @class ACE_MT_Priority_Reactor + * + * @brief Specialization of Select Reactor to support prioritized, thread-pool + * based event dispatching. + * + * The multi-threaded priority reactor combines the benefits of + * the thread-pool-based TP_Reactor with the prioritized dispatching of + * event handlers - a feature found in the single-threaded priority reactor + */ +class ACE_Export ACE_MT_Priority_Reactor : public ACE_TP_Reactor +{ +public: + + /// Initialize ACE_MT_Priority_Reactor with the default size. + ACE_MT_Priority_Reactor (ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); + + /** + * Initialize the ACE_MT_Priority_Reactor to manage + * @a max_number_of_handles. If @a restart is non-0 then the + * ACE_Reactor's @c handle_events() method will be restarted + * automatically when @c EINTR occurs. If @a sh or + * @a tq are non-0 they are used as the signal handler and + * timer queue, respectively. + */ + ACE_MT_Priority_Reactor (size_t max_number_of_handles, + int restart = 0, + ACE_Sig_Handler *sh = 0, + ACE_Timer_Queue *tq = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); + + + /** + *Destructor + */ + virtual ~ACE_MT_Priority_Reactor(); + + /** + * This event loop driver that blocks for @a max_wait_time before + * returning. It will return earlier if timer events, I/O events, + * or signal events occur. Note that @a max_wait_time can be 0, in + * which case this method blocks indefinitely until events occur. + * + * @a max_wait_time is decremented to reflect how much time this call + * took. For instance, if a time value of 3 seconds is passed to + * handle_events and an event occurs after 2 seconds, + * @a max_wait_time will equal 1 second. This can be used if an + * application wishes to handle events for some fixed amount of + * time. + * + * Returns the total number of ACE_Event_Handlers that were + * dispatched, 0 if the @a max_wait_time elapsed without dispatching + * any handlers, or -1 if something goes wrong. + */ + virtual int handle_events (ACE_Time_Value *max_wait_time = 0); + + virtual int handle_events (ACE_Time_Value &max_wait_time); + + + /// Called from handle events + static void no_op_sleep_hook (void *); + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + + protected: + + /// Template method from the base class. + virtual void clear_dispatch_mask (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + + /// Dispatch just 1 signal, timer, notification handlers + int dispatch_handler (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard); + + /// Template method called when new, non-empty set of events is + /// returned from call to select + virtual void preprocess_new_event_set () {} + + /// Template method called when an event handler has been + /// selected for dispatch + virtual void preprocess_chosen_handler (ACE_Handle_Dispatch_Info dispatch_info) {} + + /// Get the event that needs dispatching. It could be either a + /// signal, timer, notification handlers or return possibly 1 I/O + /// handler for dispatching. In the most common use case, this would + /// return 1 I/O handler for dispatching + int get_an_event_for_dispatching (ACE_Time_Value *max_wait_time); + + /// Handle timer events + int handle_timer(int &event_count, + ACE_TP_Token_Guard &g); + + /// Handle notify events + int handle_notify (int &event_count, + ACE_TP_Token_Guard &g); + + /// handle socket events + int handle_socket(int &event_count, + ACE_TP_Token_Guard &g); + + /// This method shouldn't get called. + virtual void notify_handle (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &, + ACE_Event_Handler *eh, + ACE_EH_PTMF callback); + private: + + typedef ACE_Unbounded_Queue< ACE_Handle_Dispatch_Info > QUEUE; + + /// Deny access since member-wise won't work... + ACE_MT_Priority_Reactor (const ACE_MT_Priority_Reactor &); + ACE_MT_Priority_Reactor &operator = (const ACE_MT_Priority_Reactor &); + + /// Get the handle of the notify pipe from the ready set if there is + /// an event in the notify pipe. + ACE_HANDLE get_the_notify_handle (void); + + /// Get socket event dispatch information. + int get_sock_event_info (ACE_Handle_Dispatch_Info &info); + + /// Allocates storage for event handler tokens + void init_bucket (void); + + /// Notify the appropriate in the context of the + /// associated with that a particular event has occurred. + int dispatch_sock_event (ACE_Handle_Dispatch_Info &dispatch_info); + + /// Clear the @a handle from the read_set + void clear_read_set (ACE_HANDLE handle); + + int post_process_sock_event (ACE_Handle_Dispatch_Info &dispatch_info,int status); + + + /// Divide existing socket events into buckets by priority + int bucketize_socket_events(); + + // Private data members + QUEUE** _bucket; + bool _bucketize_socket_events; + ACE_Allocator* _tuple_allocator; + int _num_socket_events; + int _current_priority; + int _min_priority; + int _max_priority; +}; + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "ace/MT_Priority_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* ACE_MT_PRIORITY_REACTOR_H */ diff --git a/ACE/ace/PIP_Active_IO_Handler.cpp b/ACE/ace/PIP_Active_IO_Handler.cpp new file mode 100644 index 00000000000..e6fe89bd348 --- /dev/null +++ b/ACE/ace/PIP_Active_IO_Handler.cpp @@ -0,0 +1,109 @@ +// $Id$ + +#include "ace/PIP_Active_IO_Handler.h" + +/// Constructor +ACE_PIP_Active_IO_Handler::ACE_PIP_Active_IO_Handler() + : shutdown_(false) +{ + // acquire the shutdown lock so that when shutdown_svc is called, + // the caller cannot return until shutdown has been completed and + // lock relinquished + this->shutdown_lock_.acquire(); +} + +/// Closes all remote connections. +int ACE_PIP_Active_IO_Handler::handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask) +{ + ACE_UNUSED_ARG(handle); + switch(close_mask) + { + case ACE_Event_Handler::READ_MASK: + this->read_closed_ = true; + break; + case ACE_Event_Handler::WRITE_MASK: + this->write_closed_ = true; + break; + }; + + if (read_closed_ && write_closed_) + { + // Close our end of the connection + this->peer_.close_reader(); + this->peer_.close_writer(); + delete this; + return -1; + } + + return 0; +} + +/// Enqueue a message to be sent +int ACE_PIP_Active_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) +{ + int result = this->outgoing_message_queue_.enqueue(message); + if (result > -1) + { + return result; + } + + return -1; +} + +int ACE_PIP_Active_IO_Handler::svc() +{ + int result(0); + ssize_t bytes_available(0); + char byte; + + // Run until told to shutdown + while (!this->shutdown_) + { + // Peek to see if an incoming message available + bytes_available = this->peer_.recv(&byte, 1, MSG_PEEK); + if (bytes_available > 0) + { + this->handle_input(); + } + + // Handle outgoing messages + result = this->handle_output(); + if (result == -2) + { + // Indicate to caller that the + // handler is no longer active + return -1; + } + + bytes_available = 0; + } + + return 0; +} + +void ACE_PIP_Active_IO_Handler::shutdown_svc() +{ + this->shutdown_ = true; + this->shutdown_lock_.acquire(); + + this->handle_close(0, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::WRITE_MASK); + +} + +int ACE_PIP_Active_IO_Handler::open(void*) +{ + int result = this->activate(); + if (result > 0) + { + return 0; + } + + return -1; +} + + + + diff --git a/ACE/ace/PIP_Active_IO_Handler.h b/ACE/ace/PIP_Active_IO_Handler.h new file mode 100644 index 00000000000..9d2ed86dcde --- /dev/null +++ b/ACE/ace/PIP_Active_IO_Handler.h @@ -0,0 +1,55 @@ + /** + * @file PIP_Active_IO_Handler.h + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that manages network I/O in a dedicated thread +*/ + + +#ifndef PIP_ACTIVE_IO_HANDLER_H +#define PIP_ACTIVE_IO_HANDLER_H + +#include "ace/Mutex.h" +#include "ace/PIP_IO_Handler.h" + +/** + * @class ACE_PIP_Active_IO_Handler + * + * @brief Performs network I/O in a dedicated thread + * + * @author John Moore + */ +class ACE_Export ACE_PIP_Active_IO_Handler : + public ACE_PIP_IO_Handler +{ + public: + + /// Constructor + ACE_PIP_Active_IO_Handler (); + + /// Enqueue a message to be sent + virtual int put_message (ACE_PIP_Protocol_Message* message); + + /// Closes all remote connections. + virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); + + /// Performs message I/O + virtual int svc(); + + /// Shuts down the service. Result is handler deactivated and + /// deleted + void shutdown_svc(); + + virtual int open(void* = 0); + + private: + + bool shutdown_; + ACE_Mutex shutdown_lock_; +}; + +#endif /* _PIP_Active_IO_Handler_H_ */ diff --git a/ACE/ace/PIP_Connection_Manager.cpp b/ACE/ace/PIP_Connection_Manager.cpp new file mode 100644 index 00000000000..04d4ede7150 --- /dev/null +++ b/ACE/ace/PIP_Connection_Manager.cpp @@ -0,0 +1,207 @@ + /** + * @file PIP_Connection_Manager.cpp + * + * // $Id$ + * + * @author John Moore + * + */ + +#include +#include +#include +#include +#include + + +ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::connection_manager_ = 0; +ACE_Mutex ACE_PIP_Connection_Manager::instance_lock_; +bool ACE_PIP_Connection_Manager::delete_manager_ = false; + +/// Default Constructor +ACE_PIP_Connection_Manager::ACE_PIP_Connection_Manager() +{ +} + +/// Destructor +ACE_PIP_Connection_Manager::~ACE_PIP_Connection_Manager() +{ + +} + +ACE_PIP_Connection_Manager* ACE_PIP_Connection_Manager::instance() +{ + if (connection_manager_ == 0) + { + instance_lock_.acquire(); + + if (ACE_PIP_Connection_Manager::connection_manager_ == 0) + { + ACE_NEW_RETURN (ACE_PIP_Connection_Manager::connection_manager_, + ACE_PIP_Connection_Manager, + 0); + + delete_manager_ = true; + } + + instance_lock_.release(); + } + + return connection_manager_; +} + +int ACE_PIP_Connection_Manager::establish_connections(ACE_UINT32 source_site_id) +{ + int result(0); + + //establish connections + for (unsigned i = 0; i < (this->connection_definitions_)->size(); ++i) + { + if ((*(this->connection_definitions_))[i]->source_site_id == + source_site_id) + { + ACE_INET_Addr address; + address.set((*(this->connection_definitions_))[i]->port, + (*(this->connection_definitions_))[i]->address); + + if ((*(this->connection_definitions_))[i]->type == + Connection_Definition::ACTIVE) + { + ACE_PIP_Active_IO_Handler* handler(0); + ACE_NEW_RETURN (handler, ACE_PIP_Active_IO_Handler, 0); + + result = this->active_connector_.connect(handler, address); + if (result == -1) + { + return -1; + } + else + { + handler->init( + (*(this->connection_definitions_))[i]->source_site_id, + (*(this->connection_definitions_))[i]->destination_site_id, + (*(this->connection_definitions_))[i]->priority); + + this->handlers_.push_back(handler); + } + } + else + { + ACE_PIP_Reactive_IO_Handler* handler(0); + ACE_NEW_RETURN(handler, ACE_PIP_Reactive_IO_Handler, 0); + result = this->reactive_connector_.connect(handler, address); + if (result == -1) + { + return -1; + } + else + { + handler->init( + (*(this->connection_definitions_))[i]->source_site_id, + (*(this->connection_definitions_))[i]->destination_site_id, + (*(this->connection_definitions_))[i]->priority); + + this->handlers_.push_back(handler); + } + } + } + + } + + return result; +} + +int ACE_PIP_Connection_Manager::process_connection_file(char* file_name) +{ + // Expecting the file to contain one tuple per line + // where each is of form (source_id, dest_id, dest_address, + // dest_port, priority) + + ACE_TCHAR line[100]; + char* token(0); + FILE* fp = ACE_OS::fopen(file_name, "r"); + Connection_Definition* current_definition(0); + if (fp) + { + int num_entries(0); + fscanf(fp, "%d", &num_entries); + if (num_entries > 0) + { + ACE_NEW_RETURN(this->connection_definitions_, + ACE_Vector, 0); + for (int i = 0; i < num_entries; ++i) + { + ACE_NEW_RETURN(current_definition, Connection_Definition, 0); + fscanf(fp, "%s", line); + token = strtok(line, " (,)"); + if (!token) + { + delete current_definition; + return -1; + } + + // Store the source site ID + current_definition->source_site_id = atoi(token); + + token = ACE_OS::strtok(0, " (,)"); + if (!token) + { + delete current_definition; + return -1; + } + + // destination site ID + current_definition->destination_site_id = atoi(token); + + token = ACE_OS::strtok(0, " (,)"); + if (!token) + { + delete current_definition; + return -1; + } + + // IP address + size_t length = ACE_OS::strlen(token); + ACE_NEW_RETURN(current_definition->address, char[length + 1], 0); + ACE_OS::strncpy(current_definition->address, token, length); + current_definition->address[length] = 0; + + token = ACE_OS::strtok(0, " (,)"); + if (!token) + { + delete current_definition; + return -1; + } + + // IP port + current_definition->port = atoi(token); + + token = ACE_OS::strtok(0, " (,)"); + if (!token) + { + delete current_definition; + return -1; + } + + // Connection Priority + current_definition->priority = atoi(token); + current_definition->type = Connection_Definition::REACTIVE; + this->connection_definitions_->push_back(current_definition); + } + } + } + else + { + //error + return -1; + } + + return 0; +} + +const ACE_Vector* +ACE_PIP_Connection_Manager::get_connections() const +{ + return this->connection_definitions_; +} + diff --git a/ACE/ace/PIP_Connection_Manager.h b/ACE/ace/PIP_Connection_Manager.h new file mode 100644 index 00000000000..fb7dd8e3064 --- /dev/null +++ b/ACE/ace/PIP_Connection_Manager.h @@ -0,0 +1,81 @@ + /** + * @file PIP_Connection_Manager.h + * + * // $Id$ + * + * @author John Moore + * + */ + +#ifndef PIP_CONNECTION_MANAGER_H +#define PIP_CONNECTION_MANAGER_H + +#include +#include +#include +#include +#include +#include + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include +#include + +class ACE_Export ACE_PIP_Connection_Manager +{ + public: + + /// Informationa associated with a connection + struct Connection_Definition + { + enum Handler_Type {ACTIVE, REACTIVE}; + + ACE_UINT32 source_site_id; + ACE_UINT32 destination_site_id; + ACE_TCHAR* address; + u_short port; + ACE_UINT32 priority; + Handler_Type type; + }; + + /// Default Constructor + ACE_PIP_Connection_Manager(); + + /// Destructor + virtual ~ACE_PIP_Connection_Manager(); + + /// obtain the single instance of the manager + static ACE_PIP_Connection_Manager* instance(); + + /// Extract all connection information from a file + virtual int process_connection_file(char* filename); + + /// Establish all connection for which source_site_id is the source + virtual int establish_connections(ACE_UINT32 source_site_id); + + const ACE_Vector* get_connections() const; + + private: + + ACE_Vector* connection_definitions_; + + // The connector used to actively connect to a remote site + ACE_Connector< + ACE_PIP_Active_IO_Handler, + ACE_SOCK_Connector> active_connector_; + + ACE_Connector< + ACE_PIP_Reactive_IO_Handler, + ACE_SOCK_Connector> reactive_connector_; + + static ACE_PIP_Connection_Manager* connection_manager_; + static ACE_Mutex instance_lock_; + static bool delete_manager_; + + ACE_Vector handlers_; +}; + +#endif diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.cpp b/ACE/ace/PIP_DA_Strategy_Adapter.cpp new file mode 100644 index 00000000000..ecfd42a4747 --- /dev/null +++ b/ACE/ace/PIP_DA_Strategy_Adapter.cpp @@ -0,0 +1,4 @@ +// $Id$ + +#include "PIP_DA_Strategy_Adapter.h" + diff --git a/ACE/ace/PIP_DA_Strategy_Adapter.h b/ACE/ace/PIP_DA_Strategy_Adapter.h new file mode 100644 index 00000000000..50028a69d77 --- /dev/null +++ b/ACE/ace/PIP_DA_Strategy_Adapter.h @@ -0,0 +1,258 @@ + /** + * @file PIP_DA_Strategy_Adapter.h + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that adapts a deadlock avoidance strategy to additionally + * support priority inheritance protocol annotations +*/ + + +#ifndef PIP_DA_STRATEGY_ADAPTER +#define PIP_DA_STRATEGY_ADAPTER + +#include "ace/DA_Strategy_Base.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Unbounded_Set.h" +#include "ace/Mutex.h" +#include "ace/Null_Mutex.h" + +/** + * @class ACE_PIP_DA_Strategy_Adapter + * @brief Extends deadlock avoidance strategies + * to support priority inheritance annotations + * + * Deadlock avoidance strategies associate a resource cost annotation + * with each handle. This class extends the strategies to support + * the association of annotations with each priority at which the + * handle can be dispatched, i.e. the priority at which the corresponding + * thread resource can dispatch the handle +*/ +template +class ACE_PIP_DA_Strategy_Adapter +{ + public: + + /// Constructor that takes the deadlock avoidance strategy that + /// the Strategy Adapter adapts. + ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base* DA_strategy); + ~ACE_PIP_DA_Strategy_Adapter(); + + /// Indicates whether allocating a thread to the handle + /// at the specified priority could potentially result in deadlock. + int is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority); + + /// Grant the handle a thread at the specified priority. + void grant(Handle_Id handle, ACE_UINT32 priority); + + /// Release the thread + void release(Handle_Id handle, ACE_UINT32 priority); + + /// Determine the number of threads being managed by + /// the DA_Strategy adapter. + int get_max_threads(); + + /// Add an annotation value for the handle / priority pair. + int add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation); + + /// Remove every annotation associated with this handle. + int remove_annotation (Handle_Id handle); + int remove_annotation (Handle_Id handle, ACE_UINT32 priority); + +private: + + /// Associates each message handler with an internally generated id + /// which can be used, along with a priority, to lookup an annotation. + typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> HANDLE_ID_MAP; + + /// Associates each message handler with a set of potential priorities. + /// Message handler represented by internally generated id. + typedef ACE_Hash_Map_Manager_Ex*, + ACE_Hash, + ACE_Equal_To, + ACE_Null_Mutex> HANDLE_ID_PRIORITY_MAP; + + /// Determines an id that uniquely identifies a handler/priority pair. + ACE_UINT64 hash_handle_id_and_priority(ACE_UINT32 handle_id, + ACE_UINT32 priority) const; + + /// Generates an annotation ID given the actual handle and priority. + ACE_UINT64 get_annotation_id(Handle_Id handle, ACE_UINT32 priority); + + DA_Strategy_Base* DA_strategy_; + HANDLE_ID_MAP handle_ids_; + HANDLE_ID_PRIORITY_MAP id_to_priority_map_; + Lock lock_; + ACE_UINT32 next_id_; +}; + +template +ACE_PIP_DA_Strategy_Adapter:: + ACE_PIP_DA_Strategy_Adapter(DA_Strategy_Base* DA_strategy) +: DA_strategy_(DA_strategy) +, next_id_(0) +{ +} + +template +ACE_PIP_DA_Strategy_Adapter::~ACE_PIP_DA_Strategy_Adapter() +{ + HANDLE_ID_PRIORITY_MAP::iterator it = this->id_to_priority_map_.begin(); + for (; it != this->id_to_priority_map_.end(); ++it) + { + delete it->item(); + } +} + +template +ACE_INLINE int ACE_PIP_DA_Strategy_Adapter::get_max_threads() +{ + return this->DA_strategy_->get_max_threads(); +} + +template +ACE_INLINE ACE_UINT64 ACE_PIP_DA_Strategy_Adapter:: + hash_handle_id_and_priority(ACE_UINT32 handle_id, ACE_UINT32 priority) const +{ + ACE_UINT64 result = handle_id; + result = (result << 32) | priority; + return result; +} + +template +int ACE_PIP_DA_Strategy_Adapter:: + is_deadlock_potential(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard guard(this->lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + return this->DA_strategy_->is_deadlock_potential(annotation_id); +} + +template +void ACE_PIP_DA_Strategy_Adapter:: + grant(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard guard(this->lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + return this->DA_strategy_->grant(annotation_id); +} + +template +void ACE_PIP_DA_Strategy_Adapter:: + release(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard guard(this->lock_); + ACE_UINT64 annotation_id = get_annotation_id(handle, priority); + this->DA_strategy_->release(annotation_id); +} + +template +int ACE_PIP_DA_Strategy_Adapter:: + add_annotation (Handle_Id handle, ACE_UINT32 priority, int annotation) +{ + ACE_UINT32 internal_handle_id(0); + ACE_Unbounded_Set* priorities(0); + + ACE_Guard guard(this->lock_); + if (this->handle_ids_.find(handle, internal_handle_id) == -1) + { + // This is the first time handle has been encountered, so generate an + // internal handle id. + internal_handle_id = this->next_id_++; + this->handle_ids_.bind(handle, internal_handle_id); + priorities = new ACE_Unbounded_Set; + this->id_to_priority_map_.bind(internal_handle_id, priorities); + } + else + { + this->id_to_priority_map_.find(internal_handle_id, priorities); + } + + priorities->insert(priority); + + return this->DA_strategy_->add_annotation( + hash_handle_id_and_priority(internal_handle_id, priority), annotation); +} + +template +int ACE_PIP_DA_Strategy_Adapter:: + remove_annotation (Handle_Id handle) +{ + ACE_Guard guard(this->lock_); + ACE_UINT32 internal_handle_id(0); + if (this->handle_ids_.unbind(handle, internal_handle_id) != -1) + { + ACE_Unbounded_Set* priorities(0); + if (this->id_to_priority_map_.unbind(internal_handle_id, priorities) != -1) + { + for (ACE_Unbounded_Set::ITERATOR it = priorities->begin(); + it != priorities->end(); + ++it) + { + this->DA_strategy_->remove_annotation( + get_annotation_id(internal_handle_id, *it)); + } + + delete priorities; + } + } + + return 0; +} + +template +int ACE_PIP_DA_Strategy_Adapter:: + remove_annotation (Handle_Id handle, ACE_UINT32 priority) +{ + ACE_Guard guard(this->lock_); + ACE_UINT32 internal_handle_id(0); + int result(0); + if (this->handle_ids_.find(handle, internal_handle_id) != -1) + { + ACE_Unbounded_Set* priorities(0); + if (this->id_to_priority_map_.find(internal_handle_id, priorities) != -1) + { + if (priorities->remove(priority) != -1) + { + result = this->DA_strategy_->remove_annotation( + get_annotation_id(internal_handle_id, priority)); + } + if (priorities->is_empty()) + { + // This was the last annotation for this handle, + // so remove the handle information + this->id_to_priority_map_.unbind(internal_handle_id, priorities); + delete priorities; + this->handle_ids_.unbind(handle, internal_handle_id); + } + } + } + + return result; +} + +template +ACE_UINT64 ACE_PIP_DA_Strategy_Adapter:: + get_annotation_id(Handle_Id handle, ACE_UINT32 priority) +{ + ACE_UINT64 annotation_id(0); + ACE_UINT32 handle_id(0); + + if (this->handle_ids_.find(handle, handle_id) != -1) + { + annotation_id = hash_handle_id_and_priority(handle_id, priority); + } + + return annotation_id; +} + +#endif + diff --git a/ACE/ace/PIP_Dispatcher.cpp b/ACE/ace/PIP_Dispatcher.cpp new file mode 100644 index 00000000000..45d3ed63d6a --- /dev/null +++ b/ACE/ace/PIP_Dispatcher.cpp @@ -0,0 +1,468 @@ +#include "ace/PIP_Dispatcher.h" +#include "ace/PIP_Invocation_Manager.h" +#include "ace/PIP_Messages.h" +#include "ace/Reactor.h" + +ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::dispatcher_ = 0; +ACE_Mutex ACE_PIP_Dispatcher::instance_lock_; +bool ACE_PIP_Dispatcher::delete_dispatcher_ = false; +bool ACE_PIP_Dispatcher::shutdown_ = false; + +/// Constructor +ACE_PIP_Dispatcher::ACE_PIP_Dispatcher() + : current_highest_priority_(ACE_Event_Handler::LO_PRIORITY) + , current_lowest_priority_(ACE_Event_Handler::LO_PRIORITY) + , waiting_for_message_(false) + , num_threads_needed_(0) + , message_available_signal_(0) + , threads_available_signal_(0) + , DA_strategy_adapter_(0) + , num_pending_messages_(0) + , num_messages_received_(0) + , num_messages_dispatched_(0) +{ +} + +/// Destructor +ACE_PIP_Dispatcher::~ACE_PIP_Dispatcher() +{ + ACE_PIP_Protocol_Message* message(0); + + // Destroy all messages that have yet to be dispatched + this->pending_messages_lock_.acquire(); + while (this->pending_messages_by_message_id_.current_size() != 0) + { + this->pending_messages_by_message_id_.unbind( + this->pending_messages_by_message_id_.begin()->key(), + message); + + if (message) + { + delete message; + message = 0; + } + } + this->pending_messages_lock_.release(); +} + + +ACE_PIP_Dispatcher* ACE_PIP_Dispatcher::instance() +{ + if (ACE_PIP_Dispatcher::dispatcher_ == 0) + { + instance_lock_.acquire(); + + if (ACE_PIP_Dispatcher::dispatcher_ == 0) + { + ACE_NEW_RETURN (ACE_PIP_Dispatcher::dispatcher_, + ACE_PIP_Dispatcher, + 0); + + delete_dispatcher_ = true; + } + + instance_lock_.release(); + } + + return dispatcher_; +} + +/// Receive a message for eventual dispatching +void ACE_PIP_Dispatcher::process_message(ACE_PIP_Protocol_Message* message) +{ + switch (message->message_type()) + { + case ACE_PIP_Protocol_Message::ACCEL: + process_incoming_acceleration(message); + break; + + case ACE_PIP_Protocol_Message::REQUEST: + process_incoming_request(message); + break; + + case ACE_PIP_Protocol_Message::RESPONSE: + // Forward the response to the invocation manager + ACE_PIP_Invocation_Manager::instance()->process_inbound_response(message); + break; + + default: + break; + } +} + + +/// Signals the dispatcher to dispatch a new message if possible. +int ACE_PIP_Dispatcher::handle_output (ACE_HANDLE) +{ + ACE_PIP_Protocol_Message* message(0); + bool message_dispatched(false); + + while (!message_dispatched && !shutdown_) + { + // get the highest priority message + this->pending_messages_lock_.acquire(); + message = retrieve_highest_priority_pending_message(); + if (message) + { + ACE_PIP_Data_Message* data_message = + static_cast(message->next()); + + this->deadlock_avoidance_lock_.acquire(); + + /// If dispatching could potentially cause deadlock, try to accelerate all lower priority + /// messages and then wait for threads to become available + this->num_threads_needed_ = DA_strategy_adapter_->is_deadlock_potential( + data_message->destination_handler_ID(), + data_message->message_priority()); + + if (this->num_threads_needed_ > 0) + { + this->deadlock_avoidance_lock_.release(); + find_and_accelerate_lower_priority_message(data_message->message_priority()); + + // Wait for signal indicating enough threads exist to dispatch the message + this->threads_available_signal_.acquire(); + + // Before grabing the deadlock avoidance lock, check to make sure + // we haven't been told to shutdown. + if (shutdown_) + break; + + this->deadlock_avoidance_lock_.acquire(); + } + + // At this point, sufficient threads exist to dispatch the message + // without threat of deadlock, so grant a thread + DA_strategy_adapter_->grant(data_message->destination_handler_ID(), + data_message->message_priority()); + + this->deadlock_avoidance_lock_.release(); + + // Transfer the message to the "dispatched" list + this->dispatched_messages_lock_.acquire(); + Dispatched_Message_Data dispatch_record; + dispatch_record.id = message->message_id(); + dispatch_record.priority = data_message->message_priority(); + this->dispatched_messages_data_.insert(dispatch_record); + this->dispatched_messages_lock_.release(); + + //-------------TEST DATA------------------ + // store statistics to be printed later + Dispatch_Test_Data test_data; + test_data.id = message->message_id(); + test_data.priority = data_message->message_priority(); + test_data.num_pending = this->num_pending_messages_; + test_data.highest_priority = this->current_highest_priority_; + test_data.lowest_priority = this->current_lowest_priority_; + dispatch_records_.push_back(test_data); + + dispatched_ids_.push_back(message->message_id()); + + ++num_messages_dispatched_; + --this->num_pending_messages_; + this->pending_messages_lock_.release(); + //----------------------------------------- + + // Request another thread to be associated with dispatcher + ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); + + message_dispatched = true; + + // Pass the message to the invocation manager for processing + ACE_PIP_Invocation_Manager::instance()->process_inbound_request(message); + + // All processing associated with the message has been completed + // so discard the record + this->dispatched_messages_lock_.acquire(); + this->dispatched_messages_data_.erase(dispatch_record); + this->dispatched_messages_lock_.release(); + + // Cleanup message information and release the thread resource + this->deadlock_avoidance_lock_.acquire(); + DA_strategy_adapter_->release(data_message->destination_handler_ID(), + data_message->message_priority()); + + if (this->num_threads_needed_ > 0) + { + --this->num_threads_needed_; + if (this->num_threads_needed_ == 0) + { + this->threads_available_signal_.release(); + } + } + + this->deadlock_avoidance_lock_.release(); + } + else + { + // There are no messages to dispatch, so wait for one to arrive + this->waiting_for_message_ = true; + this->pending_messages_lock_.release(); + this->message_available_signal_.acquire(); + + // Before dispatching a message, make sure we haven't been + // instructed to shutdown + if (shutdown_) + break; + } + } + + return 0; +} + + +/// Initializes dispatcher +void ACE_PIP_Dispatcher::init(ACE_PIP_DA_Strategy_Adapter* DA_strategy_adapter) +{ + DA_strategy_adapter_ = DA_strategy_adapter; + this->waiting_for_message_ = true; + ACE_Reactor::instance()->notify(this, ACE_Event_Handler::WRITE_MASK); +} + +/// store the message +void ACE_PIP_Dispatcher::process_incoming_request(ACE_PIP_Protocol_Message* message) +{ + // Store the message token 2 ways to enable efficient dispatching as well as + // efficient lookup for accelerations + this->pending_messages_lock_.acquire(); + + //-------TEST DATA------------------------ + ++num_messages_received_; + ++this->num_pending_messages_; + received_ids_.push_back(message->message_id()); + + //------------------------------------------ + ACE_UINT32 priority = + static_cast(message->next())->message_priority(); + + // update the priority upper and lower bounds. These values are stored to + // avoid checking the full range of priorities when dispatching messages + if (priority > this->current_highest_priority_) + { + this->current_highest_priority_ = priority; + } + else if (priority < this->current_lowest_priority_) + { + this->current_lowest_priority_ = priority; + } + + PRIORITY_MESSAGE_LIST_MAP::iterator + message_iter = this->pending_messages_by_priority_.find(priority); + + if (message_iter == this->pending_messages_by_priority_.end()) + { + // Create a new entry for this priority level + std::list new_priority_list; + new_priority_list.push_back(message); + this->pending_messages_by_priority_.insert( + make_pair(priority, new_priority_list)); + } + else + { + // Priority already exists, so add the message token to the list + message_iter->second.push_back(message); + } + + this->pending_messages_by_message_id_.bind(message->message_id(), message); + + if (this->waiting_for_message_) + { + this->waiting_for_message_ = false; + + // Signal waiting dispatcher thread to dispatch new message + this->message_available_signal_.release(); + } + + this->pending_messages_lock_.release(); + +} + +/// Find the highest priority message and return it +ACE_PIP_Protocol_Message* ACE_PIP_Dispatcher:: + retrieve_highest_priority_pending_message() +{ + ACE_PIP_Protocol_Message* message(0); + for (ACE_INT32 current_priority = (ACE_INT32)this->current_highest_priority_; + current_priority >= (ACE_INT32)this->current_lowest_priority_; + --current_priority) + { + PRIORITY_MESSAGE_LIST_MAP::iterator + pending_message_iter = this->pending_messages_by_priority_.find(current_priority); + + for (; pending_message_iter != this->pending_messages_by_priority_.end(); + ++pending_message_iter) + { + std::list::iterator next_message_iter = + pending_message_iter->second.begin(); + + if (next_message_iter != pending_message_iter->second.end()) + { + // The highest-priority message has been found. Grab the message + // and remove it from both containers + message = *next_message_iter; + pending_message_iter->second.pop_front(); + this->pending_messages_by_message_id_.unbind(message->message_id()); + break; + } + else + { + // There are no messages at this priority. Since the search begins at + // the highest priority, lower the highest priority until a message + // is found + if (this->current_highest_priority_ > this->current_lowest_priority_) + { + --this->current_highest_priority_; + } + } + } + + if (message) + { + break; + } + } + + return message; +} + +bool ACE_PIP_Dispatcher:: +find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority) +{ + bool found(false); + bool erased_this_pass(true); + + this->dispatched_messages_lock_.acquire(); + + while(erased_this_pass) + { + erased_this_pass = false; + + std::set::iterator iter = this->dispatched_messages_data_.begin(); + + // Find all dispatched messages having priority lower than new_priority. For each + // send an acceleration message, and update the dispatch record + for (; iter != this->dispatched_messages_data_.end() && + this->num_threads_needed_ > 0; ++iter) + { + if (iter->priority < new_priority) + { + // A message has been found that has a lower priority, + // so the send an acceleration message + ACE_PIP_Accel_Message* accel_message = new ACE_PIP_Accel_Message; + accel_message->old_priority(iter->priority); + accel_message->new_priority(new_priority); + + ACE_PIP_Protocol_Message* protocol_message = new ACE_PIP_Protocol_Message; + protocol_message->message_type(ACE_PIP_Protocol_Message::ACCEL); + protocol_message->message_id(iter->id); + protocol_message->next(accel_message); + + Dispatched_Message_Data dispatch_record = *iter; + this->dispatched_messages_data_.erase(iter); + dispatch_record.priority = new_priority; + this->dispatched_messages_data_.insert(dispatch_record); + ACE_PIP_Invocation_Manager::instance()->process_acceleration(protocol_message); + found = true; + erased_this_pass = true; + break; + } + } + } + + this->dispatched_messages_lock_.release(); + this->pending_messages_lock_.release(); + + return found; +} + +void ACE_PIP_Dispatcher::shutdown() +{ + shutdown_ = true; + + // Pulse signals so waiting threads can quit + this->message_available_signal_.release(); + this->threads_available_signal_.release(); +} + +void ACE_PIP_Dispatcher::process_incoming_acceleration(ACE_PIP_Protocol_Message* message) +{ + bool updated_pending(false); + // Look for pending message. If the message is pending, update the priority, move it around in data structures, and quit + + ACE_PIP_Accel_Message* accel_message = + static_cast(message->next()); + + this->pending_messages_lock_.acquire(); + ACE_Hash_Map_Entry* entry(0); + if (this->pending_messages_by_message_id_.find(message->message_id(), entry) == 0) + { + ACE_PIP_Data_Message* data_message = + static_cast(entry->item()->next()); + + data_message->message_priority(accel_message->new_priority()); + + // move the message from one priority to the other + updated_pending = true; + } + this->pending_messages_lock_.release(); + + if (!updated_pending) + { + bool found(false); + ACE_Guard guard(this->dispatched_messages_lock_); + // Message is not pending, so must already be dispatche + std::set::iterator iter = this->dispatched_messages_data_.begin(); + + // Find all dispatched messages having priority lower than new_priority. For each + // send an acceleration message, and update the dispatch record + for (; iter != this->dispatched_messages_data_.end(); ++iter) + { + if ((iter->id == message->message_id()) && + (iter->priority < accel_message->new_priority())) + { + Dispatched_Message_Data dispatch_record = *iter; + this->dispatched_messages_data_.erase(iter); + dispatch_record.priority = accel_message->new_priority(); + this->dispatched_messages_data_.insert(dispatch_record); + ACE_PIP_Invocation_Manager::instance()->process_acceleration(message); + found = true; + break; + } + } + + if (!found) + { + for (size_t index = 0; index < received_ids_.size(); ++index) + { + if (received_ids_[index] == message->message_id()) + { + found = true; + break; + } + } + } + } +} +void ACE_PIP_Dispatcher::print_results() +{ + printf("----------------------DISPATCHER_RESULTS-------------\n"); + printf("Num received: %d\n", num_messages_received_); + printf("Num dispatched: %d\n\n", num_messages_dispatched_); + printf("Received Ids: \n"); + + for (size_t index = 0; index < received_ids_.size(); ++index) + { + printf("%lld\n", received_ids_[index]); + } + + printf("\nDispatched Ids: \n"); + for (size_t dispatched_index = 0; dispatched_index < dispatched_ids_.size(); + ++dispatched_index) + { + printf("%lld\n", dispatched_ids_[dispatched_index]); + } + + printf("Num received: %d\n", received_ids_.size()); + printf("Num dispatched: %d\n", dispatched_ids_.size()); + printf("---------------------\n"); +} diff --git a/ACE/ace/PIP_Dispatcher.h b/ACE/ace/PIP_Dispatcher.h new file mode 100644 index 00000000000..31f59f5d358 --- /dev/null +++ b/ACE/ace/PIP_Dispatcher.h @@ -0,0 +1,188 @@ + /** + * @file PIP_Dispatcher.h + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that dispatches priority inheritance protocol messages + * to the appropriate message handler. +*/ + + +#ifndef _PIP_DISPATCHER_H_ +#define _PIP_DISPATCHER_H_ + +// ACE definitions +#include "ace/Event_Handler.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/PIP_DA_Strategy_Adapter.h" +#include "ace/PIP_Messages.h" +#include "ace/RW_Thread_Mutex.h" +#include "ace/Semaphore.h" +#include "ace/Singleton.h" + +// STL definitions +#include +#include +#include +#include + +// Forward Declarations +class ACE_PIP_Protocol_Message; + +typedef std::map > + PRIORITY_MESSAGE_LIST_MAP; + +// Associate each message with a message ID +typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> ID_MESSAGE_MAP; + + +/** + * @class ACE_Dispatcher + * @brief Dispatches ACE_PIP_Priority_Messages in priority order + * message handlers. Additionally, notifies handlers when priority inversion is + * detected. + * + * The PIP_Message_Dispatcher implements the priority inheritance protocol. + * Upon receipt of messages, it determines the highest-priority message to + * be dispatched, and dispatches providing enough resources exist. If not enough exist, + * and a lower priority message has been dispatched, an acceleration message is sent + * to the corresponding handler to raise the priority of the message, thus + * mitigating the inversion. +*/ +class ACE_Export ACE_PIP_Dispatcher : public ACE_Event_Handler +{ + public: + + /// Constructor + ACE_PIP_Dispatcher(); + + /// Destructor + virtual ~ACE_PIP_Dispatcher(); + + /// obtain the single instance of the dispatcher + static ACE_PIP_Dispatcher* instance(); + + /// Receive a message for eventual dispatching + void process_message(ACE_PIP_Protocol_Message* message); + + /// Signals the dispatcher to dispatch a new message if possible. + virtual int handle_output (ACE_HANDLE); + + /// Initializes dispatcher + void init(ACE_PIP_DA_Strategy_Adapter* DA_strategy_adapter); + + /// Tell the dispatcher to stop dispatching and release all threads ASAP + void shutdown(); + + /// Accelerate the appropriate message + void process_incoming_acceleration(ACE_PIP_Protocol_Message* message); + + /// Print statistics + void print_results(); + + private: + + // Dispatched_Message_Data stores the ID and priority + // of a dispatched message + class Dispatched_Message_Data + { + public: + + bool operator<(const Dispatched_Message_Data& other) const + { + return (priority < other.priority); + } + + bool operator==(const Dispatched_Message_Data& other) const + { + return (id == other.id); + } + + bool operator!=(const Dispatched_Message_Data& other) const + { + return !(*this == other); + } + + ACE_UINT64 id; + ACE_UINT32 priority; + }; + + class Dispatch_Test_Data + { + public: + ACE_UINT64 id; + ACE_UINT64 priority; + ACE_UINT32 num_pending; + ACE_UINT32 highest_priority; + ACE_UINT32 lowest_priority; + }; + + /// store the message + void process_incoming_request(ACE_PIP_Protocol_Message* message); + + /// Find the highest priority message and return it + ACE_PIP_Protocol_Message* retrieve_highest_priority_pending_message(); + + bool find_and_accelerate_lower_priority_message(ACE_UINT32 new_priority); + + + // Dispatched message data is stored to determine which messages are + // currently assigned to a thread. This is useful for finding messages + // whose priority needs to be accelerated in the case where an inversion + // is detected. + std::set dispatched_messages_data_; + ACE_Mutex dispatched_messages_lock_; + + ACE_UINT32 current_highest_priority_; + ACE_UINT32 current_lowest_priority_; + + // Pending messages (those not dispatched) are stored in 2 ways for efficiency + // 1.) By message id - this is useful for managing priority accelerations + // because we can find the appropriate message in constant time + // 2.) By priority - this is useful for determining which message to dispatch next + // as messages are dispatched in priority order + PRIORITY_MESSAGE_LIST_MAP pending_messages_by_priority_; + ID_MESSAGE_MAP pending_messages_by_message_id_; + ACE_Mutex pending_messages_lock_; + + // Indicates the dispatcher has a thread waiting to + // dispatch a message + bool waiting_for_message_; + + // Number of threads that need to be returned in order to + // dispatch the current message + int num_threads_needed_; + + ACE_Semaphore message_available_signal_; + ACE_Semaphore threads_available_signal_; + + ACE_PIP_DA_Strategy_Adapter* DA_strategy_adapter_; + ACE_Mutex deadlock_avoidance_lock_; + + static ACE_PIP_Dispatcher* dispatcher_; + static ACE_Mutex instance_lock_; + static bool delete_dispatcher_; + static bool shutdown_; + + // Test variables + ACE_UINT32 num_pending_messages_; + ACE_UINT32 num_messages_received_; + ACE_UINT32 num_messages_dispatched_; + + ACE_Vector received_ids_; + ACE_Vector dispatched_ids_; + ACE_Vector dispatch_records_; +}; + +// Define a singleton class to make the dispatcher globally accessible +typedef ACE_Singleton + ACE_PIP_Dispatcher_Singleton; + +#endif diff --git a/ACE/ace/PIP_IO_Handler.cpp b/ACE/ace/PIP_IO_Handler.cpp new file mode 100644 index 00000000000..be40fc03a8b --- /dev/null +++ b/ACE/ace/PIP_IO_Handler.cpp @@ -0,0 +1,151 @@ +// $Id$ + +#include "ace/Guard_T.h" +#include "ace/PIP_IO_Handler.h" +#include "ace/PIP_Invocation_Manager.h" +#include "ace/PIP_Dispatcher.h" + +/// Constructor +ACE_PIP_IO_Handler::ACE_PIP_IO_Handler() + : read_closed_(false) + , write_closed_(false) + , priority_set_(false) + , site_id_(0) + , handler_id_(0) + , destination_site_id_(0) + , millisecond_(0, 1000) +{ + // Temporarily assign the priority to be highest possible. + // The first message received by the handler will be the priority + this->priority(ACE_Event_Handler::HI_PRIORITY); +} + +/// Destructor +ACE_PIP_IO_Handler::~ACE_PIP_IO_Handler( ) +{ + // Tell the Invocation Manager to stop sending us messages + ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); + + // Delete all outgoing messages + ACE_PIP_Protocol_Message* message(0); + while (!outgoing_message_queue_.is_empty()) + { + outgoing_message_queue_.dequeue_tail(message); + delete message; + } +} + +/// Initialize the priority of the handler, and inform the other end +/// of the priority +void ACE_PIP_IO_Handler::init(ACE_UINT32 site_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 priority) +{ + this->priority(priority); + site_id_ = site_id; + destination_site_id_ = destination_site_id; + + // Inform other end of this connections priority + peer_.send(&priority, sizeof(priority)); + + // Inform other end of this end's site id + peer_.send(&site_id, sizeof(site_id)); + priority_set_ = true; + + // Register to receive outgoing messages + ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); +} + +void ACE_PIP_IO_Handler::extract_priority() +{ + ACE_UINT32 priority(0); + if (peer_.recv(&priority, sizeof(priority)) == sizeof(priority)) + { + this->priority(priority); + } + else + { + this->priority(ACE_Event_Handler::LO_PRIORITY); + } + + // Receive the other end's site id + if (peer_.recv(&destination_site_id_, sizeof(destination_site_id_)) != sizeof(destination_site_id_)) + { + destination_site_id_ = 0; + } + + priority_set_ = true; +} + +/// Handles read event on socket. +int ACE_PIP_IO_Handler::handle_input (ACE_HANDLE fd) +{ + ACE_UNUSED_ARG(fd); + + int result(0); + int bytes_read(0); + + if (!priority_set_) + { + // incoming message is the priority of this connection + extract_priority(); + ACE_PIP_Invocation_Manager::instance()->register_IO_handler(this); + } + else + { + ACE_PIP_Protocol_Message* message(0); + ACE_NEW_RETURN(message, ACE_PIP_Protocol_Message, 0); + // Read the next incoming message + bytes_read = message->deserialize(peer_); + if (bytes_read > 0) + { + ACE_PIP_Dispatcher::instance()->process_message(message); + } + else if (bytes_read < 0) + { + // The connection is broken, so handler should be deleted + delete message; + result = -1; + } + } + + return result; +} + + +/// Handles output event on socket +int ACE_PIP_IO_Handler::handle_output (ACE_HANDLE fd) +{ + ACE_UNUSED_ARG(fd); + + int bytes_sent(0); + // determine if outgoing messages exist + ACE_PIP_Protocol_Message* message(0); + + write_closed_ = false; + big_lock_.acquire(); + if (outgoing_message_queue_.dequeue_tail(message) != -1) + { + bytes_sent = message->serialize(peer_); + delete message; + if (bytes_sent >= 0) + { + big_lock_.release(); + return 0; + } + else + { + write_closed_ = true; + big_lock_.release(); + // indicate the outgoing connection is closed + return -2; + } + } + else + { + // indicate that there was no message to output + big_lock_.release(); + return -1; + } +} + diff --git a/ACE/ace/PIP_IO_Handler.h b/ACE/ace/PIP_IO_Handler.h new file mode 100644 index 00000000000..563a95303c2 --- /dev/null +++ b/ACE/ace/PIP_IO_Handler.h @@ -0,0 +1,103 @@ + /** + * @file PIP_IO_Handler.h + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that manages network I/O +*/ + +#ifndef _PIP_IO_HANDLER_H_ +#define _PIP_IO_HANDLER_H_ + +#include "ace/Message_Queue.h" +#include "ace/Mutex.h" +#include "ace/PIP_Messages.h" +#include "ace/Svc_Handler.h" +#include "ace/Thread_Mutex.h" + +// Typedefs +typedef ACE_Message_Queue_Ex + PROTO_MESSAGE_QUEUE_TYPE; + +/** + * @class ACE_PIP_IO_Handler + * + * @brief Performs network I/O + * + * @author John Moore + */ +class ACE_Export ACE_PIP_IO_Handler : + public ACE_Svc_Handler +{ + public: + + /// Constructor + ACE_PIP_IO_Handler (); + + /// Destructor + virtual ~ACE_PIP_IO_Handler(); + + /// Enqueue a message to be sent + virtual int put_message (ACE_PIP_Protocol_Message* message) = 0; + + /// Initialize the priority of the handler, and inform the other end + /// of the priority + virtual void init(ACE_UINT32 site_id, + ACE_UINT32 destination_site_id, + ACE_UINT32 priority); + + /// Handles read event on socket. + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + + /// Handles read event on socket. + virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE); + + /// Set the site_id + void site_id(ACE_UINT32 site_id); + + /// Get the site id + ACE_UINT32 site_id() const; + + /// Get the other end's site id + ACE_UINT32 destination_site_id() const; + + /// Get the handler id + void handler_id(ACE_UINT32 handler_id); + + /// Set the handler id + ACE_UINT32 handler_id() const; + + /// Get the address of the remote site + ACE_INET_Addr remote_address() const; + + protected: + + /// Reads priority from socket + void extract_priority(); + + // variables to track the state of the handler + bool read_closed_; + bool write_closed_; + bool priority_set_; + + ACE_UINT32 site_id_; + ACE_UINT32 handler_id_; + ACE_UINT32 destination_site_id_; + + const ACE_Time_Value millisecond_; + + PROTO_MESSAGE_QUEUE_TYPE outgoing_message_queue_; + ACE_Thread_Mutex big_lock_; +}; + +#if defined (__ACE_INLINE__) +#include "PIP_IO_Handler.inl" +#endif /* __ACE_INLINE__ */ + + +#endif /* _PIP_IO_Handler_H_ */ + + diff --git a/ACE/ace/PIP_IO_Handler.inl b/ACE/ace/PIP_IO_Handler.inl new file mode 100644 index 00000000000..ed1768940aa --- /dev/null +++ b/ACE/ace/PIP_IO_Handler.inl @@ -0,0 +1,36 @@ +// $Id: + +#include +#include + +ACE_INLINE void ACE_PIP_IO_Handler::site_id(ACE_UINT32 site_id) +{ + site_id_ = site_id; +} + +ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::site_id() const +{ + return site_id_; +} + +ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::destination_site_id() const +{ + return destination_site_id_; +} + +ACE_INLINE void ACE_PIP_IO_Handler::handler_id(ACE_UINT32 handler_id) +{ + handler_id_ = handler_id; +} + +ACE_INLINE ACE_UINT32 ACE_PIP_IO_Handler::handler_id() const +{ + return handler_id_; +} + +ACE_INLINE ACE_INET_Addr ACE_PIP_IO_Handler::remote_address() const +{ + ACE_INET_Addr addr; + peer_.get_remote_addr(addr); + return addr; +} diff --git a/ACE/ace/PIP_Invocation_Manager.cpp b/ACE/ace/PIP_Invocation_Manager.cpp new file mode 100644 index 00000000000..d18d8789e82 --- /dev/null +++ b/ACE/ace/PIP_Invocation_Manager.cpp @@ -0,0 +1,316 @@ + +#include "ace/PIP_Invocation_Manager.h" +#include "ace/PIP_IO_Handler.h" +#include + +#include +ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::invocation_manager_ = 0; +ACE_Mutex ACE_PIP_Invocation_Manager::instance_lock_; +bool ACE_PIP_Invocation_Manager::delete_manager_ = false; +ACE_UINT64 ACE_PIP_Invocation_Manager::message_id_base_ = 0; +ACE_UINT32 ACE_PIP_Invocation_Manager::site_id_ = 0; + +/// Constructor +ACE_PIP_Invocation_Manager::ACE_PIP_Invocation_Manager() + : message_counter_(0) +{ +} + +/// Destructor +ACE_PIP_Invocation_Manager::~ACE_PIP_Invocation_Manager() +{ + +} + +/// Processes requests received from I/O handler +void ACE_PIP_Invocation_Manager::process_inbound_request(ACE_PIP_Protocol_Message* message) +{ + ACE_PIP_Data_Message* payload = + static_cast(message->release_next()); + + ACE_UINT32 handler_id = payload->destination_handler_ID(); + ACE_PIP_Message_Handler* handler(0); + + big_lock_.acquire(); + ACE_UINT64 message_id = message->message_id(); + if (object_id_handler_map_.find(handler_id, handler) == 0) + { + // look to see if there are any accelerations. If so, accelerate. + // Map the message ID to a list of outgoing messages + in_out_id_map_.bind(message_id, + std::list()); + + // Keep a record of the message and its priority so + // it can be accelerated if necessary + Invocation_Data invocation_data; + invocation_data.site_id = site_id_; + invocation_data.priority = payload->message_priority(); + invocation_data_map_.bind(message_id, invocation_data); + + big_lock_.release(); + + // Pass the message to the message handler, deleting + // the corresponding struct + handler->process_incoming_message(payload->release_block(), message_id); + delete payload; + delete message; + + // Once message processing has completed, + // clean-up any message residue + big_lock_.acquire(); + in_out_id_map_.unbind(message_id); + } + + big_lock_.release(); +} + +/// Processes request to be forwarded to another handler +int ACE_PIP_Invocation_Manager::process_outbound_request(ACE_Message_Block* message, + ACE_UINT64 token, + ACE_Future*& response_holder) +{ + // Create a protocol message from the data block + ACE_PIP_Protocol_Message* protocol_message(0); + + ACE_NEW_RETURN(protocol_message, ACE_PIP_Protocol_Message, -1); + protocol_message->message_type(ACE_PIP_Protocol_Message::REQUEST); + protocol_message->process_message_payload(message); + + ACE_PIP_Data_Message* data_message = + static_cast(protocol_message->next()); + + // determine if the message expects a reply. If so, create a future for it. + if (data_message->reply_expected()) + { + // create and store future + ACE_NEW_RETURN(response_holder, ACE_Future, -1); + } + + Invocation_Data remote_info; + remote_info.response_holder = response_holder; + remote_info.site_id = data_message->destination_site_ID(); + remote_info.priority = data_message->message_priority(); + + // Associate this message with token. This enables acceleration forwarding. Only + // Token == -1 indicates this is the start of a call chain + + big_lock_.acquire(); + ACE_UINT64 message_id = generate_message_id(); + invocation_data_map_.bind(message_id, remote_info); + protocol_message->message_id(message_id); + + if (token != 0) + { + ACE_Hash_Map_Entry >* entry(0); + // map the originating message to the outgoing message + // so that accelerations can be forwarded appropriately + in_out_id_map_.find(token, entry); + if (entry) + { + entry->item().push_back(message_id); + } + } + + // use the priority and address to determine which I/O handler to send to + // pass the message to the I/O handler + PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); + site_to_handlers_map_.find(data_message->destination_site_ID(), + handler_map); + + ACE_PIP_IO_Handler* IO_handler(0); + handler_map->find(data_message->message_priority(), + IO_handler); + + big_lock_.release(); + + if (IO_handler) + { + IO_handler->put_message(protocol_message); + } + + return 0; +} + +ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::instance() +{ + if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0) + { + instance_lock_.acquire(); + + if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0) + { + ACE_NEW_RETURN (ACE_PIP_Invocation_Manager::invocation_manager_, + ACE_PIP_Invocation_Manager, + 0); + + delete_manager_ = true; + } + + instance_lock_.release(); + } + + return invocation_manager_; +} + +/// Process response received from a handler +void ACE_PIP_Invocation_Manager::process_inbound_response(ACE_PIP_Protocol_Message* message) +{ + Invocation_Data remote_info; + + ACE_Guard guard(big_lock_); + + // Remove the child ID + if (invocation_data_map_.unbind(message->message_id(), + remote_info) != -1) + { + // Pass the received response to the message handler + // via a Future + remote_info.response_holder->set(message->next()->block()); + remote_info.response_holder = 0; + } +} + +/// Process response received from a handler +void ACE_PIP_Invocation_Manager::process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token) +{ + // Parse the message + ACE_PIP_Protocol_Message* response = new ACE_PIP_Protocol_Message; + response->process_message_payload(message); + response->message_type(ACE_PIP_Protocol_Message::RESPONSE); + + // Lookup the appropriate IO handler, and pass down the message + ACE_PIP_Data_Message* data_message = + static_cast(response->next()); + + ACE_Guard guard(big_lock_); + + response->message_id(token); + PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); + + if (site_to_handlers_map_.find + (data_message->destination_site_ID(), + handler_map) != -1) + { + ACE_PIP_IO_Handler* handler(0); + if (handler_map->find(data_message->message_priority(), + handler) != -1) + { + handler->put_message(response); + } + } +} + +/// Process request to accelerate the priority of a process +void ACE_PIP_Invocation_Manager::process_acceleration(ACE_PIP_Protocol_Message* message) +{ + ACE_PIP_Accel_Message* accel_message = + static_cast(message->next()); + + ACE_Guard guard(big_lock_); + + // Update the stored priority of the original message. This will enable subsequent upcalls + // to adjust their priority appropriately + Invocation_Data invocation_data; + if (invocation_data_map_.unbind(message->message_id(), invocation_data) == 0) + { + if (invocation_data.priority < accel_message->new_priority()) + { + invocation_data.priority = accel_message->new_priority(); + } + + invocation_data_map_.bind(message->message_id(), invocation_data); + + // Generate acceleration messages for each outgoing invocation + // resulting from processing of incoming request. Do so + // only if their priority is lower than the accelerated priority + ACE_Hash_Map_Entry >* child_entry(0); + if (in_out_id_map_.find(message->message_id(), child_entry) == 0) + { + std::list::iterator child_iter = child_entry->item().begin(); + for (; child_iter != child_entry->item().end(); ++child_iter) + { + if (invocation_data_map_.unbind(*child_iter, invocation_data) == 0) + { + if (invocation_data.priority < accel_message->new_priority()) + { + invocation_data.priority = accel_message->new_priority(); + invocation_data_map_.bind(*child_iter, invocation_data); + // Generate new message and send it to the appropriate site + PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); + if (site_to_handlers_map_.find(invocation_data.site_id, + handler_map) == 0) + { + ACE_PIP_IO_Handler* handler(0); + if (handler_map->find(ACE_Event_Handler::HI_PRIORITY, handler) == 0) + { + ACE_PIP_Accel_Message* accel_copy = accel_message->copy(); + ACE_PIP_Protocol_Message* proto_copy = message->copy(); + proto_copy->next(accel_copy); + handler->put_message(proto_copy); + } + } + } + + invocation_data_map_.bind(*child_iter, invocation_data); + } + } + } + } + + // delete the acceleration message here +} + +/// Register an IO handler that can send messages on invocation +/// manager's behalf +int ACE_PIP_Invocation_Manager::register_IO_handler(ACE_PIP_IO_Handler* handler) +{ + // Extract the priority and remote address of handler + ACE_UINT32 priority = handler->priority(); + ACE_Guard guard(big_lock_); + + // Map the destination site ID and priority to this handler + PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); + if (site_to_handlers_map_.find(handler->destination_site_id(), + handler_map) == -1) + { + ACE_NEW_RETURN(handler_map, PRIORITY_TO_IO_HANDLER_MAP, -1); + } + + handler_map->bind(priority, handler); + site_to_handlers_map_.bind(handler->destination_site_id(), handler_map); + + return 0; +} + +void ACE_PIP_Invocation_Manager::unregister_IO_handler(ACE_PIP_IO_Handler* handler) +{ + ACE_UINT32 priority = handler->priority(); + + ACE_Guard guard(big_lock_); + + // unbind the handler + PRIORITY_TO_IO_HANDLER_MAP* handler_map(0); + if (site_to_handlers_map_.find(handler->destination_site_id(), + handler_map) != -1) + { + handler_map->unbind(priority, handler); + } +} + +void ACE_PIP_Invocation_Manager::register_message_handler(ACE_PIP_Message_Handler* handler) +{ + // extract the object id from the handler + // map the object id to the handler + ACE_Guard guard(big_lock_); + object_id_handler_map_.bind(handler->handler_id(), handler); +} + +ACE_UINT64 ACE_PIP_Invocation_Manager::generate_message_id() +{ + return (((ACE_UINT64)site_id_) << 32) + message_counter_++; +} + +void ACE_PIP_Invocation_Manager::site_id(ACE_UINT64 site_id) +{ + site_id_ = site_id; +} diff --git a/ACE/ace/PIP_Invocation_Manager.h b/ACE/ace/PIP_Invocation_Manager.h new file mode 100644 index 00000000000..ae04f7a60dd --- /dev/null +++ b/ACE/ace/PIP_Invocation_Manager.h @@ -0,0 +1,150 @@ + /** + * @file PIP_Invocation_Manager.h + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that tracks handler invocations at a particular site + */ + + +#ifndef _PIP_INVOCATION_MANAGER_H_ +#define _PIP_INVOCATION_MANAGER_H_ + +#include "ace/Containers_T.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Message_Block.h" +#include "ace/PIP_Messages.h" +#include "ace/Singleton.h" +#include "ace/Mutex.h" +#include "ace/Null_Mutex.h" +#include "ace/Future.h" + +#include +class ACE_PIP_IO_Handler; +class ACE_PIP_Message_Handler; + +struct Invocation_Data +{ + ACE_Future* response_holder; + ACE_UINT32 site_id; + ACE_UINT32 priority; +}; + +// Typedefs +typedef ACE_Hash_Map_Manager_Ex, + ACE_Hash, + ACE_Equal_To, + ACE_Null_Mutex> ID_TO_ID_LIST_MAP; + +typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> ID_TO_HANDLER_MAP; + +typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> ID_TO_INVOCATION_RECORD_MAP; + +typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> PRIORITY_TO_IO_HANDLER_MAP; + +typedef ACE_Hash_Map_Manager_Ex, + ACE_Equal_To, + ACE_Null_Mutex> SITE_TO_IO_HANDLERS_MAP; + +/** + * @class ACE_PIP_Invocation_Manager + * @brief +*/ +class ACE_Export ACE_PIP_Invocation_Manager +{ + public: + + /// Constructor + ACE_PIP_Invocation_Manager(); + + /// Destructor + ~ACE_PIP_Invocation_Manager(); + + /// Get the singleton instance of the Invocation Manager + static ACE_PIP_Invocation_Manager* instance(); + + /// Associated a site ID with the Invocation Manager + static void site_id(ACE_UINT64 site_id); + + /// Process request made on local handler + void process_inbound_request(ACE_PIP_Protocol_Message* message); + + /// Processes request to be forwarded to another handler + int process_outbound_request(ACE_Message_Block* message, + ACE_UINT64 token, + ACE_Future*& response_holder); + + /// Process response to message sent by local handler + void process_inbound_response(ACE_PIP_Protocol_Message* message); + + /// Process response sent to remote handler + void process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token); + + /// Process request to accelerate the priority of a process + void process_acceleration(ACE_PIP_Protocol_Message* message); + + /// Register an IO handler that can send messages of a certain priority + /// for the Invocation Manager + int register_IO_handler(ACE_PIP_IO_Handler* handler); + + /// Un-register an IO handler + void unregister_IO_handler(ACE_PIP_IO_Handler* handler); + + /// Register user-level message handler + void register_message_handler(ACE_PIP_Message_Handler* handler); + + private: + + + ACE_UINT64 generate_message_id(); + + ACE_UINT64 message_counter_; + + static ACE_UINT64 message_id_base_; + + static ACE_UINT32 site_id_; + + static ACE_PIP_Invocation_Manager* invocation_manager_; + + static ACE_Mutex instance_lock_; + + ACE_Mutex big_lock_; + + static bool delete_manager_; + + // Mapping of incoming messages to corresponding outgoing messages. + // This is used to track invocations resulting from an incoming invocation + // in order to pass acceleration messages along the chain + ID_TO_ID_LIST_MAP in_out_id_map_; + + // Mapping of user-level handler ID to corresponding handler + ID_TO_HANDLER_MAP object_id_handler_map_; + + // Mapping of site IDs to corresponding I/O handler map + SITE_TO_IO_HANDLERS_MAP site_to_handlers_map_; + + // Maps message ID to data such as destination site + // and priority + ID_TO_INVOCATION_RECORD_MAP invocation_data_map_; +}; + + +#endif diff --git a/ACE/ace/PIP_Message_Handler.cpp b/ACE/ace/PIP_Message_Handler.cpp new file mode 100644 index 00000000000..6f62815db5a --- /dev/null +++ b/ACE/ace/PIP_Message_Handler.cpp @@ -0,0 +1,38 @@ +#include "ace/PIP_Message_Handler.h" + +ACE_PIP_Message_Handler::ACE_PIP_Message_Handler() + : handler_id_(0) + , site_id_(0) +{ + +} + +ACE_PIP_Message_Handler::ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id) + : handler_id_(handler_id) + , site_id_(site_id) +{ +} + +void ACE_PIP_Message_Handler::send_request(ACE_Message_Block* message, + ACE_UINT64 message_id, + ACE_Message_Block*& response) +{ + ACE_Future* response_holder(0); + ACE_PIP_Invocation_Manager::instance()->process_outbound_request(message, message_id, response_holder); + if (response_holder) + { + if (response_holder->get(response) == -1) + { + response = 0; + } + } +} + +void ACE_PIP_Message_Handler::send_response(ACE_Message_Block* message, + ACE_UINT64 message_id) +{ + ACE_PIP_Invocation_Manager::instance()->process_outbound_response(message, message_id); +} + + + diff --git a/ACE/ace/PIP_Message_Handler.h b/ACE/ace/PIP_Message_Handler.h new file mode 100644 index 00000000000..a38b0016a86 --- /dev/null +++ b/ACE/ace/PIP_Message_Handler.h @@ -0,0 +1,82 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file PIP_Message_Handler.h + * + * $Id: + * + * @author John Moore + */ +//============================================================================= + + +#ifndef _PIP_MESSAGE_HANDLER_H_ +#define _PIP_MESSAGE_HANDLER_H_ + +#include +#include +#include +#include + +/** + * @class ACE_PIP_Message_Handler + * @brief ACE_PIP_Message_Handler is a user-level message handler interface. + * + * ACE_PIP_Message_Handler is a user-level message handler interface. It can + * be extended to provide user-level processing of messages before passing + * the message block to the priority-inheritance protocol infrastructure. + * + * Each handler has an associated (site id, handler id) tuple with which + * it can be address by other handlersy + */ +class ACE_Export ACE_PIP_Message_Handler +{ + public: + + /// Constructor that sets site and handler ID + ACE_PIP_Message_Handler(ACE_UINT32 site_id, ACE_UINT32 handler_id); + ACE_PIP_Message_Handler(); + + /// Destroy handler + virtual ~ACE_PIP_Message_Handler(){} + + /// Process message received by priority inheritance infrastructure + virtual void process_incoming_message(ACE_Message_Block* message, + ACE_UINT64 message_id) = 0; + + /// Get the handler id + ACE_UINT32 handler_id() const; + + /// Set the handler id + void handler_id(ACE_UINT32 id); + + /// Get the site id + ACE_UINT32 site_id() const; + + /// Set the site id + void site_id(ACE_UINT32 id); + + protected: + + ACE_UINT32 handler_id_; + ACE_UINT32 site_id_; + ACE_INET_Addr my_address_; + + // Pass a message to a remote handler + virtual void send_request(ACE_Message_Block* message, + ACE_UINT64 message_id, + ACE_Message_Block*& response); + + // Pass a response message to a remote handler + virtual void send_response(ACE_Message_Block* message, + ACE_UINT64 message_id); + +}; + +#if defined (__ACE_INLINE__) +#include "PIP_Message_Handler.inl" +#endif /* __ACE_INLINE__ */ + +#endif + diff --git a/ACE/ace/PIP_Message_Handler.inl b/ACE/ace/PIP_Message_Handler.inl new file mode 100644 index 00000000000..5100d9e2c9a --- /dev/null +++ b/ACE/ace/PIP_Message_Handler.inl @@ -0,0 +1,22 @@ +// $Id: + +ACE_INLINE ACE_UINT32 ACE_PIP_Message_Handler::handler_id() const +{ + return handler_id_; +} + +ACE_INLINE void ACE_PIP_Message_Handler::handler_id(ACE_UINT32 id) +{ + handler_id_ = id; +} + +ACE_INLINE ACE_UINT32 ACE_PIP_Message_Handler::site_id() const +{ + return site_id_; +} + +ACE_INLINE void ACE_PIP_Message_Handler::site_id(ACE_UINT32 id) +{ + site_id_ = id; +} + diff --git a/ACE/ace/PIP_Messages.cpp b/ACE/ace/PIP_Messages.cpp new file mode 100644 index 00000000000..c9bc42ef544 --- /dev/null +++ b/ACE/ace/PIP_Messages.cpp @@ -0,0 +1,577 @@ +// $Id$ + +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_string.h" +#include "ace/PIP_Messages.h" + +ACE_PIP_Message::ACE_PIP_Message() + : block_(0) + , dirty_(false) + , next_(0) +{} + +ACE_PIP_Message::~ACE_PIP_Message() +{ + if (this->next_) + { + delete this->next_; + } + + if (block_) + { + block_->release(); + } +} + +void ACE_PIP_Message::block(ACE_Message_Block* block) +{ + // Remove the other block if it exist. + if (block_) + { + block_->release(); + } + + block_ = block; + + // Extract the values from the block. + unpack(); +} + +ACE_PIP_Data_Message::ACE_PIP_Data_Message() + : message_priority_(-1) + , reply_expected_(false) + , source_handler_id_(-1) + , source_site_id_(-1) + , destination_handler_id_(-1) + , destination_site_id_(-1) +{ +} + +int ACE_PIP_Data_Message::serialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_sent(0); + + // Only serialize if there is a block. If not, + // there's nothing we can do but fail since we don't + // have enough information to create a block and unpack it. + if (block_) + { + if (this->dirty_) + { + pack(); + } + + ACE_Message_Block* curr_block = block_; + int bytes_sent(0); + while(curr_block) + { + bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); + if (bytes_sent > 0) + { + total_bytes_sent += bytes_sent; + curr_block = curr_block->next(); + } + else + { + total_bytes_sent = -1; + break; + } + } + } + else + { + total_bytes_sent = -1; + } + + return total_bytes_sent; +} + +int ACE_PIP_Data_Message::pack() +{ + char* write_ptr = block_->wr_ptr(); + char* read_ptr = block_->rd_ptr(); + + block_->reset(); + + // Pack reply expected into buffer. + ACE_OS::memcpy(block_->wr_ptr(), &this->reply_expected_, sizeof(this->reply_expected_)); + block_->wr_ptr(sizeof(this->reply_expected_)); + + // Pack the message priority into the buffer. + ACE_OS::memcpy(block_->wr_ptr(), &this->message_priority_, sizeof(this->message_priority_)); + block_->wr_ptr(sizeof(this->message_priority_)); + + // Pack the destination handler ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &this->destination_handler_id_, sizeof(this->destination_handler_id_)); + block_->wr_ptr(sizeof(this->destination_handler_id_)); + + // Pack the source handler ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &this->source_handler_id_, sizeof(this->source_handler_id_)); + block_->wr_ptr(sizeof(this->source_handler_id_)); + + // Pack the destination site ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &this->destination_site_id_, sizeof(this->destination_site_id_)); + block_->wr_ptr(sizeof(this->destination_site_id_)); + + // Pack the source site ID into the buffer + ACE_OS::memcpy(block_->wr_ptr(), &this->source_site_id_, sizeof(this->source_site_id_)); + block_->wr_ptr(sizeof(this->source_site_id_)); + + // Reset the buffer pointers to where they were so that the message length remains + // accurate. + block_->rd_ptr(read_ptr); + block_->wr_ptr(write_ptr); + + return 0; +} + +void ACE_PIP_Data_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + block_->reset(); + + // reply_expected_ + ACE_OS::memcpy(&this->reply_expected_, block_->rd_ptr(), sizeof(this->reply_expected_)); + block_->rd_ptr(sizeof(this->reply_expected_)); + + // message priority + ACE_OS::memcpy(&this->message_priority_, block_->rd_ptr(), sizeof(this->message_priority_)); + block_->rd_ptr(sizeof(this->message_priority_)); + + // destination handler ID + ACE_OS::memcpy(&this->destination_handler_id_, block_->rd_ptr(), sizeof(this->destination_handler_id_)); + block_->rd_ptr(sizeof(this->destination_handler_id_)); + + // source handler ID + ACE_OS::memcpy(&this->source_handler_id_, block_->rd_ptr(), sizeof(this->source_handler_id_)); + block_->rd_ptr(sizeof(this->source_handler_id_)); + + // destination site ID + ACE_OS::memcpy(&this->destination_site_id_, block_->rd_ptr(), sizeof(this->destination_site_id_)); + block_->rd_ptr(sizeof(this->destination_site_id_)); + + // source site ID + ACE_OS::memcpy(&this->source_site_id_, block_->rd_ptr(), sizeof(this->source_site_id_)); + block_->rd_ptr(sizeof(this->source_site_id_)); + + block_->reset(); + block_->wr_ptr(write_ptr); + } + + this->dirty_ = false; +} + +ACE_PIP_Accel_Message::ACE_PIP_Accel_Message() + : ACCEL_HEADER_LENGTH_(2*sizeof(ACE_UINT32)) + , destination_address_(0) + , destination_port_(0) + , new_priority_(0) + , old_priority_(0) +{ +} + +int ACE_PIP_Accel_Message::serialize(ACE_SOCK_Stream& stream) +{ + pack(); + + int bytes_sent = stream.send_n(block_->rd_ptr(), block_->length()); + return bytes_sent; +} + +int ACE_PIP_Accel_Message::pack() +{ + if (!block_) + { + ACE_NEW_RETURN(block_, ACE_Message_Block(ACCEL_HEADER_LENGTH_), -1); + this->dirty_ = true; + } + + if (this->dirty_) + { + + // Set the buffer pointers to the start of the buffer to + // ensure we're writing to the correct location + block_->reset(); + + // Pack the contents of the struct into the message block + ACE_OS::memcpy(block_->wr_ptr(), &this->old_priority_, sizeof(this->old_priority_)); + block_->wr_ptr(sizeof(this->old_priority_)); + + ACE_OS::memcpy(block_->wr_ptr(), &this->new_priority_, sizeof(this->new_priority_)); + block_->wr_ptr(sizeof(this->new_priority_)); + + this->dirty_ = false; + } + + return 0; +} + +void ACE_PIP_Accel_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + block_->reset(); + + this->old_priority_ = (*block_->rd_ptr()); + block_->rd_ptr(sizeof(this->old_priority_)); + + this->new_priority_ = (*block_->rd_ptr()); + block_->rd_ptr(sizeof (this->new_priority_)); + + // Reset the read and write pointers to their original location + // in the block. + block_->reset(); + block_->wr_ptr(write_ptr); + } + + this->dirty_ = false; +} + +ACE_PIP_Accel_Message* ACE_PIP_Accel_Message::copy() +{ + ACE_PIP_Accel_Message* copy(0); + ACE_NEW_RETURN(copy, ACE_PIP_Accel_Message, 0); + + copy->new_priority_ = this->new_priority_; + copy->old_priority_ = this->old_priority_; + copy->pack(); + + return copy; +} + +ACE_PIP_Protocol_Message::ACE_PIP_Protocol_Message() + : FIXED_HEADER_LENGTH_(sizeof(Message_Type) + + sizeof(message_id_) + + sizeof(num_payload_blocks_)) + , message_type_(NONE) + , num_payload_blocks_(0) + , message_id_(0) +{ +} + +int ACE_PIP_Protocol_Message::serialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_sent(0); + + pack(); + + ACE_Message_Block* curr_block = block_; + int bytes_sent(0); + + // Write each of the message blocks associated with this + // header into the stream + while(curr_block) + { + bytes_sent = stream.send_n(curr_block->rd_ptr(), curr_block->length()); + if (bytes_sent > 0) + { + total_bytes_sent += bytes_sent; + curr_block = curr_block->next(); + } + else + { + total_bytes_sent = -1; + break; + } + } + if ((total_bytes_sent > 0) && this->next_) + { + int next_sent = this->next_->serialize(stream); + if (next_sent > 0) + { + total_bytes_sent += next_sent; + } + else + { + total_bytes_sent = -1; + } + } + else + { + total_bytes_sent = -1; + } + + return total_bytes_sent; +} + +int ACE_PIP_Protocol_Message::deserialize(ACE_SOCK_Stream& stream) +{ + int total_bytes_received(-1); + + ACE_Message_Block* header_block(0); + + ACE_Message_Block* lengths_block(0); + ACE_Message_Block* curr_payload_block(0); + ACE_Message_Block* payload_blocks(0); + + ACE_NEW_RETURN(header_block, ACE_Message_Block(FIXED_HEADER_LENGTH_), -1); + // Read the fixed-length portion of the protocol header. + int bytes_received = stream.recv_n(header_block->wr_ptr(), FIXED_HEADER_LENGTH_); + if (bytes_received == FIXED_HEADER_LENGTH_) + { + total_bytes_received = bytes_received; + + // Determine number of data message blocks in the payload. + header_block->rd_ptr(FIXED_HEADER_LENGTH_ - sizeof(this->num_payload_blocks_)); + ACE_OS::memcpy(&this->num_payload_blocks_, header_block->rd_ptr(), + sizeof(this->num_payload_blocks_)); + + header_block->reset(); + header_block->wr_ptr(bytes_received); + + // Extract the length of each payload block. + if (this->num_payload_blocks_ > 0) + { + // Read the lengths of each block. + int bytes_to_read = this->num_payload_blocks_ * sizeof(ACE_UINT32); + ACE_NEW_RETURN(lengths_block, ACE_Message_Block(bytes_to_read), -1); + bytes_received = stream.recv_n(lengths_block->wr_ptr(), bytes_to_read); + + if (bytes_received == bytes_to_read) + { + total_bytes_received += bytes_received; + lengths_block->wr_ptr(bytes_received); + + // The lengths of each block have been successfully written, so + // unpack them. + header_block->next(lengths_block); + block(header_block); + + ACE_NEW_RETURN(curr_payload_block, ACE_Message_Block(payload_block_lengths_[0]), -1); + payload_blocks = curr_payload_block; + unsigned int i = 0; + for (; i < this->num_payload_blocks_ && bytes_received != -1; ++i) + { + // Read the block. + bytes_received = stream.recv_n(curr_payload_block->wr_ptr(), + payload_block_lengths_[i]); + if (bytes_received > 0) + { + total_bytes_received += bytes_received; + curr_payload_block->wr_ptr(bytes_received); + if (i < (this->num_payload_blocks_ - 1)) + { + curr_payload_block->next( + new ACE_Message_Block(payload_block_lengths_[i + 1])); + + curr_payload_block = curr_payload_block->next(); + } + else + { + curr_payload_block->next(0); + } + + } + else + { + total_bytes_received = -1; + break; + } + } + } + else + { + total_bytes_received = -1; + } + } + } + else + { + total_bytes_received = -1; + } + + if (total_bytes_received > 0) + { + if (this->message_type_ == ACCEL) + { + ACE_NEW_RETURN(this->next_, ACE_PIP_Accel_Message, -1); + } + else + { + ACE_NEW_RETURN(this->next_, ACE_PIP_Data_Message, -1); + } + + // Pass the payload blocks to the next message struct + // so it can unpack it. + this->next_->block(payload_blocks); + } + else if (block_) + { + // Something failed during reading, so cleanup any allocated memory. + block_->release(); + } + + return total_bytes_received; +} + +void ACE_PIP_Protocol_Message::next(ACE_PIP_Message* next) +{ + // Determine the number and length of payload blocks. + payload_block_lengths_.clear(); + this->num_payload_blocks_ = 0; + next->pack(); + ACE_Message_Block* curr_block = next->block(); + while (curr_block) + { + ++this->num_payload_blocks_; + payload_block_lengths_.push_back(curr_block->length()); + curr_block = curr_block->next(); + } + + this->next_ = next; + this->dirty_ = true; +} + +int ACE_PIP_Protocol_Message::process_message_payload(ACE_Message_Block* payload) +{ + payload_block_lengths_.clear(); + this->num_payload_blocks_ = 0; + + // Determine the length and number of payload blocks. + ACE_Message_Block* curr_block = payload; + while (curr_block) + { + ++this->num_payload_blocks_; + payload_block_lengths_.push_back(curr_block->length()); + curr_block = curr_block->next(); + } + + if (!this->next_) + { + if (this->message_type_ == ACCEL) + { + ACE_NEW_RETURN(this->next_, ACE_PIP_Accel_Message, -1); + } + else + { + ACE_NEW_RETURN(this->next_, ACE_PIP_Data_Message, -1); + } + } + + this->next_->block(payload); + this->dirty_ = true; + + return 0; +} + +int ACE_PIP_Protocol_Message::pack() +{ + if (!block_) + { + // Create the message buffer for the protocol header. + block_ = new ACE_Message_Block(FIXED_HEADER_LENGTH_); + + // Create the message buffer for the list of payload block lengths. + block_->next(new ACE_Message_Block(this->num_payload_blocks_ * sizeof(ACE_UINT32))); + block_->next()->next(0); + this->dirty_ = true; + } + if (this->dirty_) + { + // Set the buffer pointers to the start of the buffer + // so that we write to the appropriate location. + block_->reset(); + + // pack the process Id. + ACE_OS::memcpy(block_->wr_ptr(), &message_id_, sizeof(message_id_)); + block_->wr_ptr(sizeof (message_id_)); + + // Pack the message type. + ACE_OS::memcpy(block_->wr_ptr(), &this->message_type_, sizeof(this->message_type_)); + block_->wr_ptr(sizeof(this->message_type_)); + + // Number of blocks in payload. + ACE_OS::memcpy(block_->wr_ptr(), &this->num_payload_blocks_, sizeof(this->num_payload_blocks_)); + block_->wr_ptr(sizeof(this->num_payload_blocks_)); + + ACE_Message_Block* next_block = block_->next(); + if (next_block) + { + next_block->reset(); + + // Write the block lengths into the message block. + for (unsigned int i = 0; i < this->num_payload_blocks_; ++i) + { + ACE_OS::memcpy(next_block->wr_ptr(), + &payload_block_lengths_[i], + sizeof(ACE_UINT32)); + + next_block->wr_ptr(sizeof(ACE_UINT32)); + } + } + + this->dirty_ = false; + } + + return 0; +} + +void ACE_PIP_Protocol_Message::unpack() +{ + if (block_) + { + char* write_ptr = block_->wr_ptr(); + // char* read_ptr = block_->rd_ptr(); + block_->reset(); + + // Extract the process ID. + ACE_OS::memcpy(&message_id_, block_->rd_ptr(), sizeof(message_id_)); + block_->rd_ptr(sizeof (message_id_)); + + // Extract the message type. + ACE_OS::memcpy(&this->message_type_, block_->rd_ptr(), sizeof(this->message_type_)); + block_->rd_ptr(sizeof(this->message_type_)); + + // Number of blocks in payload. + ACE_OS::memcpy(&this->num_payload_blocks_, block_->rd_ptr(), + sizeof(this->num_payload_blocks_)); + + block_->rd_ptr(sizeof(this->num_payload_blocks_)); + + // Reset buffer pointers to be where they were prior to unpacking. + block_->reset(); + block_->wr_ptr(write_ptr); + + // The next block holds the lengths of each payload block. + ACE_Message_Block* next_block = block_->next(); + if (next_block) + { + write_ptr = next_block->wr_ptr(); + next_block->reset(); + payload_block_lengths_.resize(this->num_payload_blocks_, 0); + ACE_UINT32 block_length(0); + + // Extract the lengths of each payload block, which will + // be used to recreate the structure of the original payload. + for (ACE_UINT32 i = 0; i < this->num_payload_blocks_; ++i) + { + ACE_OS::memcpy(&block_length, next_block->rd_ptr(), sizeof(block_length)); + next_block->rd_ptr(sizeof(block_length)); + payload_block_lengths_[i] = block_length; + } + + // Reset the buffer pointers to where they were prior to unpacking. + next_block->reset(); + next_block->wr_ptr(write_ptr); + } + } + + this->dirty_ = false; +} + +ACE_PIP_Protocol_Message* ACE_PIP_Protocol_Message::copy() +{ + ACE_PIP_Protocol_Message* message_copy(0); + ACE_NEW_RETURN(message_copy, ACE_PIP_Protocol_Message, 0); + message_copy->message_type_ = this->message_type_; + message_copy->num_payload_blocks_ = this->num_payload_blocks_; + for (ACE_UINT32 block_index = 0; block_index < this->num_payload_blocks_; ++block_index) + { + message_copy->payload_block_lengths_[block_index] = payload_block_lengths_[block_index]; + } + + return message_copy; +} diff --git a/ACE/ace/PIP_Messages.h b/ACE/ace/PIP_Messages.h new file mode 100644 index 00000000000..4a8b3e95075 --- /dev/null +++ b/ACE/ace/PIP_Messages.h @@ -0,0 +1,255 @@ + /** + * @file PIP_Messages + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a heirarchy of + * classes that represent the various messages used in the + * priority inheritance protocol +*/ + +#ifndef _PIP_MESSAGES_H_ +#define _PIP_MESSAGES_H_ + +#include "ace/Message_Block.h" +#include "ace/SOCK_Stream.h" +#include "ace/Vector_T.h" + +/** + * @class ACE_PIP_Message + * @brief Base class for all messages used in + * the implementation of a distributed priority inheritance + * protocol. + * + * Base class for all messages used in the implementation of a distributed + * priority inheritance protocol. Provides an interface for message (de)serialization, + * message chaining, packing, unpacking, and payload ownership transfer + */ +class ACE_Export ACE_PIP_Message +{ + public: + + ACE_PIP_Message(); + virtual ~ACE_PIP_Message(); + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream) = 0; + + /// Get the next message struct. + virtual ACE_PIP_Message* next(); + + /// Set the next message struct. + virtual void next(ACE_PIP_Message* next); + + /// Returns the next message, making the caller + /// the new owner. + virtual ACE_PIP_Message* release_next(); + + /// Get the message block. + virtual ACE_Message_Block* block(); + + /// Set the message block and populate the message struct + /// with message contents. + virtual void block(ACE_Message_Block* block); + + /// Get the message block, making the caller the new owner. + virtual ACE_Message_Block* release_block(); + + /// Place the values in the message struct into the message block. + virtual int pack() = 0; + + /// Populate the message struct using values from the message block. + virtual void unpack() = 0; + + /// This is temporarily public to facilitate testing. + /// It should eventually be made private. + ACE_Message_Block* block_; + + protected: + + // Indicates values in structure are newer than values in the + // message block. + bool dirty_; + + ACE_PIP_Message* next_; +}; + +/** + * @class ACE_PIP_Data_Message + * @brief Structure representing the fields of an application- + * level protocol message and associated header values + * + * Structure representing the fields of an appliation level + * protocol message and associated header values. Structure is that + * of several contiguous ACE_Message_Block's. The message is configurable + * to support any application-level protocol that contains at least the following + * data: source address, destination address, reply expectation, and priority + * +*/ +class ACE_Export ACE_PIP_Data_Message : public ACE_PIP_Message +{ + public: + + ACE_PIP_Data_Message(); + virtual ~ACE_PIP_Data_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + /// Determine if a reply message is expected + bool reply_expected() const; + void reply_expected(bool expected); + + /// Determine the priority at which this message should be handled + ACE_UINT32 message_priority() const; + void message_priority(ACE_UINT32 priority); + + /// Determine the ID of the destination handler + ACE_UINT32 destination_handler_ID() const; + void destination_handler_ID(ACE_UINT32 ID); + + /// Determine the ID of the sending handler + ACE_UINT32 source_handler_ID() const; + void source_handler_ID(ACE_UINT32 ID); + + /// Determine the ID of the destination site + ACE_UINT32 destination_site_ID() const; + void destination_site_ID(ACE_UINT32 ID); + + /// Determine the ID of the sending site + ACE_UINT32 source_site_ID() const; + void source_site_ID(ACE_UINT32 ID); + + // Place the values from the struct into the message blocks. + virtual int pack(); + + // Extract the values from the message blocks into the structs. + virtual void unpack(); + + private: + + ACE_UINT32 message_priority_; + bool reply_expected_; + ACE_UINT32 source_handler_id_; + ACE_UINT32 source_site_id_; + ACE_UINT32 destination_handler_id_; + ACE_UINT32 destination_site_id_; +}; + +/** + * @class ACE_PIP_Protocol_Message + * @brief Structure representing a message supported by the priority + * inheritance protocol + * +*/ + +class ACE_Export ACE_PIP_Accel_Message : public ACE_PIP_Message +{ + public: + + ACE_PIP_Accel_Message(); + virtual ~ACE_PIP_Accel_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + ACE_UINT32 old_priority() const; + void old_priority(ACE_UINT32 priority); + + ACE_UINT32 new_priority() const; + void new_priority(ACE_UINT32 priority); + + /// Get the address of the application receiving the message. + ACE_UINT32 destination_address() const; + void destination_address(const ACE_UINT32& address); + + u_short destination_port() const; + void destination_port(u_short port); + + /// Place the values in the message struct into the message block. + virtual int pack(); + + /// Extract the values from the message block and store them in the struct. + virtual void unpack(); + + /// Return a copy of the this message + ACE_PIP_Accel_Message* copy(); + + private: + + const ACE_UINT32 ACCEL_HEADER_LENGTH_; + ACE_UINT32 destination_address_; + u_short destination_port_; + ACE_UINT32 new_priority_; + ACE_UINT32 old_priority_; +}; + +/** + * @class ACE_PIP_Accel_Message + * @brief Structure representing an acceleration message + * used in the implementation of a priority inheritance protocol + * + * Structure representing an acceleration message used in the + * implementation of a priority inheritance protocol. Indicates the + * old and new priority of the targeted process, as well as the address + * of handler to which the associated message was sent. +*/ +class ACE_Export ACE_PIP_Protocol_Message : public ACE_PIP_Message +{ + public: + + enum Message_Type { NONE, ACCEL, DATA, REQUEST, RESPONSE }; + + ACE_PIP_Protocol_Message(); + virtual ~ACE_PIP_Protocol_Message(){} + + /// Send the contents of this message over the stream. + virtual int serialize(ACE_SOCK_Stream& stream); + + /// Receive the contents of this message from the stream. + virtual int deserialize(ACE_SOCK_Stream& stream); + + /// Set the next message in the chain. + virtual void next(ACE_PIP_Message* next); + + /// Get the next message in the chain + virtual ACE_PIP_Message* next(); + + /// Determine the type of message this header has been tacked onto. + Message_Type message_type() const; + void message_type(Message_Type type); + + /// Determine which call chain this message is associated with. + ACE_UINT64 message_id() const; + void message_id(ACE_UINT64 id); + + /// Attach message block as payload of priority inheritance + /// protocol message. + int process_message_payload(ACE_Message_Block* payload); + + virtual int pack(); + virtual void unpack(); + + /// Make a copy of the header of this message, i.e. without + /// data or accel payload + ACE_PIP_Protocol_Message* copy(); + + const int FIXED_HEADER_LENGTH_; + + private: + + Message_Type message_type_; + ACE_UINT32 num_payload_blocks_; + ACE_Vector payload_block_lengths_; + ACE_UINT64 message_id_; +}; + +#if defined (__ACE_INLINE__) +#include "PIP_Messages.inl" +#endif /* __ACE_INLINE__ */ + + +#endif + diff --git a/ACE/ace/PIP_Messages.inl b/ACE/ace/PIP_Messages.inl new file mode 100644 index 00000000000..03145e441bf --- /dev/null +++ b/ACE/ace/PIP_Messages.inl @@ -0,0 +1,186 @@ +/************************************************** + * + * ACE_PIP_Message - Inline Methods + * + **************************************************/ +inline ACE_PIP_Message* ACE_PIP_Message::next() +{ + return this->next_; +} + +inline void ACE_PIP_Message::next(ACE_PIP_Message* message) +{ + this->next_ = message; +} + +inline ACE_PIP_Message* ACE_PIP_Message::release_next() +{ + ACE_PIP_Message* temp = this->next_; + this->next_ = 0; + return temp; +} + +inline ACE_Message_Block* ACE_PIP_Message::block() +{ + return this->block_; +} + +inline ACE_Message_Block* ACE_PIP_Message::release_block() +{ + ACE_Message_Block* temp_block = this->block_; + this->block_ = 0; + this->dirty_ = true; + return temp_block; +} + +/************************************************** + * + * ACE_PIP_Data_Message - Inline Methods + * + **************************************************/ + +inline bool ACE_PIP_Data_Message::reply_expected() const +{ + return this->reply_expected_; +} + +inline void ACE_PIP_Data_Message::reply_expected(bool expected) +{ + this->dirty_ = true; + this->reply_expected_ = expected; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::message_priority() const +{ + return this->message_priority_; +} + +inline void ACE_PIP_Data_Message::message_priority(ACE_UINT32 priority) +{ + this->dirty_ = true; + this->message_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::destination_handler_ID() const +{ + return this->destination_handler_id_; +} + +inline void ACE_PIP_Data_Message::destination_handler_ID(ACE_UINT32 ID) +{ + this->destination_handler_id_ = ID; + this->dirty_ = true; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::source_handler_ID() const +{ + return this->source_handler_id_; +} + +inline void ACE_PIP_Data_Message::source_handler_ID(ACE_UINT32 ID) +{ + this->source_handler_id_ = ID; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::source_site_ID() const +{ + return this->source_site_id_; +} + +inline void ACE_PIP_Data_Message::source_site_ID(ACE_UINT32 ID) +{ + this->source_site_id_ = ID; +} + +inline ACE_UINT32 ACE_PIP_Data_Message::destination_site_ID() const +{ + return this->destination_site_id_; +} + +inline void ACE_PIP_Data_Message::destination_site_ID(ACE_UINT32 ID) +{ + this->destination_site_id_ = ID; +} + +/************************************************** + * + * ACE_PIP_Accel_Message - Inline Methods + * + **************************************************/ + +inline ACE_UINT32 ACE_PIP_Accel_Message::old_priority() const +{ + return old_priority_; +} + +inline void ACE_PIP_Accel_Message::old_priority(ACE_UINT32 priority) +{ + this->dirty_ = true; + old_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Accel_Message::new_priority() const +{ + return new_priority_; +} + +inline void ACE_PIP_Accel_Message::new_priority(ACE_UINT32 priority) +{ + this->dirty_ = true; + new_priority_ = priority; +} + +inline ACE_UINT32 ACE_PIP_Accel_Message::destination_address() const +{ + return destination_address_; +} + +inline void ACE_PIP_Accel_Message::destination_address(const ACE_UINT32& address) +{ + this->dirty_ = true; + destination_address_ = address; +} + +inline u_short ACE_PIP_Accel_Message::destination_port() const +{ + return destination_port_; +} + +inline void ACE_PIP_Accel_Message::destination_port(u_short port) +{ + destination_port_ = port; +} + +/************************************************** + * + * ACE_PIP_Protocol_Message - Inline Methods + * + **************************************************/ + +inline void ACE_PIP_Protocol_Message:: + message_type(ACE_PIP_Protocol_Message::Message_Type type) +{ + message_type_ = type; + this->dirty_ = true; +} + +inline ACE_PIP_Protocol_Message::Message_Type ACE_PIP_Protocol_Message::message_type() const +{ + return message_type_; +} + +inline ACE_UINT64 ACE_PIP_Protocol_Message::message_id() const +{ + return message_id_; +} + +inline void ACE_PIP_Protocol_Message::message_id(ACE_UINT64 id) +{ + this->dirty_ = true; + message_id_ = id; +} + +inline ACE_PIP_Message* ACE_PIP_Protocol_Message::next() +{ + return this->next_; +} diff --git a/ACE/ace/PIP_Reactive_IO_Handler.cpp b/ACE/ace/PIP_Reactive_IO_Handler.cpp new file mode 100644 index 00000000000..4eca6890705 --- /dev/null +++ b/ACE/ace/PIP_Reactive_IO_Handler.cpp @@ -0,0 +1,64 @@ +// $Id$ + +#include "ace/OS_NS_sys_time.h" +#include "ace/PIP_Reactive_IO_Handler.h" +#include "ace/PIP_Invocation_Manager.h" + +/// Constructor +ACE_PIP_Reactive_IO_Handler::ACE_PIP_Reactive_IO_Handler() +{ +} + +ACE_PIP_Reactive_IO_Handler::~ACE_PIP_Reactive_IO_Handler() +{ +} + +/// Closes all remote connections. +int ACE_PIP_Reactive_IO_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) +{ + ACE_UNUSED_ARG(handle); + switch(close_mask) + { + case ACE_Event_Handler::READ_MASK: + read_closed_ = true; + break; + case ACE_Event_Handler::WRITE_MASK: + write_closed_ = true; + break; + }; + + if (read_closed_ && write_closed_) + { + // Close our end of the connection + peer_.close_reader(); + peer_.close_writer(); + + // un-register with invocation manager so it doesn't + // try to use the handler for IO + ACE_PIP_Invocation_Manager::instance()->unregister_IO_handler(this); + + delete this; + return -1; + } + + return 0; +} + + +/// Enqueue a message to be sent +int ACE_PIP_Reactive_IO_Handler::put_message (ACE_PIP_Protocol_Message* message) +{ + big_lock_.acquire(); + outgoing_message_queue_.enqueue_head(message); + big_lock_.release(); + + // Register so Reactor tells us to send the message + ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::WRITE_MASK); + ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK); + + return 0; +} + + + + diff --git a/ACE/ace/PIP_Reactive_IO_Handler.h b/ACE/ace/PIP_Reactive_IO_Handler.h new file mode 100644 index 00000000000..ae50ebf9b27 --- /dev/null +++ b/ACE/ace/PIP_Reactive_IO_Handler.h @@ -0,0 +1,54 @@ + /** + * @file PIP_IO_Handler.cpp + * + * // $Id$ + * + * @author John Moore + * + * This file contains the specification for a class + * that manages network I/O +*/ + + +#ifndef _PIP_REACTIVE_IO_HANDLER_H_ +#define _PIP_REACTIVE_IO_HANDLER_H_ + + +#include "ace/Message_Queue.h" +#include "ace/PIP_IO_Handler.h" +#include "ace/PIP_Messages.h" + +/** + * @class ACE_PIP_Reactive_IO_Handler + * + * @brief Performs reactive network I/O in + * the context of a distributed system + * employing the the priority inheritance + * protocol + * + * @author John Moore + */ +class ACE_Export ACE_PIP_Reactive_IO_Handler : + public ACE_PIP_IO_Handler +{ + public: + + /// Constructor + ACE_PIP_Reactive_IO_Handler (); + ~ACE_PIP_Reactive_IO_Handler(); + + /// Closes all remote connections. + virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); + + /// Enqueue a message to be sent + virtual int put_message (ACE_PIP_Protocol_Message* message); + + private: + +}; + + + +#endif /* _PIP_Reactive_IO_Handler_H_ */ + + -- cgit v1.2.1