diff options
-rw-r--r-- | ChangeLog-97b | 43 | ||||
-rw-r--r-- | ace/Malloc_T.cpp | 3 | ||||
-rw-r--r-- | ace/Priority_Reactor.cpp | 184 | ||||
-rw-r--r-- | ace/Priority_Reactor.h | 90 | ||||
-rw-r--r-- | ace/Priority_Reactor.i | 18 | ||||
-rw-r--r-- | ace/Sched_Params.h | 40 | ||||
-rw-r--r-- | ace/Sched_Params.i | 50 | ||||
-rw-r--r-- | ace/Select_Reactor.cpp | 134 | ||||
-rw-r--r-- | ace/Select_Reactor.h | 45 | ||||
-rw-r--r-- | tests/Makefile | 152 | ||||
-rw-r--r-- | tests/Priority_Reactor_Test.cpp | 313 | ||||
-rw-r--r-- | tests/Priority_Reactor_Test.h | 56 | ||||
-rw-r--r-- | tests/Priority_Task_Test.cpp | 12 |
13 files changed, 1017 insertions, 123 deletions
diff --git a/ChangeLog-97b b/ChangeLog-97b index 2e81451d51f..807c0ee14b3 100644 --- a/ChangeLog-97b +++ b/ChangeLog-97b @@ -1,3 +1,46 @@ +Sun Oct 12 00:26:56 1997 Carlos O'Ryan <coryan@macarena.cs.wustl.edu> + + * ace/Makefile: + * ace/Priority_Reactor.h: + * ace/Priority_Reactor.i: + * ace/Priority_Reactor.cpp: + Augmentes Select_Reactor, adding priority based dispatching for + the I/O Event_Handlers, the only feature supported is + dispatching in the order defined by the priorities. + Each Event_Handler defines its priority, if the priority is out + of range the culprit is "punished" by dispatching at the lowest + priority. + Care has been exercised to avoid dynamic memory allocation. + + * tests/Makefile: + * tests/Priority_Reactor_Test.h: + * tests/Priority_Reactor_Test.cpp: + Added small tests of the Priority_Reactor, the test runs an + Acceptor on the main thread and creates several threads (or + processes if the plaform does not support threads) that connect + to this connector. The writing threads send several short + messages, the main thread receives them using one Svc_Handler + per writer, dispatched at different priorities. + The test itself is interesting, it shows how to write very + simple Svc_Handler, Connectors and Acceptors. + + * ace/Select_Reactor.h: + * ace/Select_Reactor.cpp: + The dispatching of all the handles in a "group" (READ, WRITE or + EXCEPT) was encapsulated in a single routine. + + * ace/Malloc_T.cpp: + In the Cached_Allocator memory was allocated as an arrays of + char, it must be released the same way. + + * ace/Sched_Params.h: + * ace/Sched_Params.i: + Added a new class (ACE_Sched_Priority_Iterator) to iterate over + the priorities. + + * tests/Priority_Task_Test.cpp: + Added some comments. + Sat Oct 10 16:23:49 1997 Steve Huston <shuston@riverace.com> * tests/SOCK_Connector_Test.cpp: Passes the test if the should-fail diff --git a/ace/Malloc_T.cpp b/ace/Malloc_T.cpp index 96ab8b2a1a4..837d6347e29 100644 --- a/ace/Malloc_T.cpp +++ b/ace/Malloc_T.cpp @@ -28,7 +28,8 @@ ACE_Cached_Allocator<T, ACE_LOCK>::ACE_Cached_Allocator (size_t n_chunks) template <class T, class ACE_LOCK> ACE_Cached_Allocator<T, ACE_LOCK>::~ACE_Cached_Allocator (void) { - delete [] this->pool_; + char* tmp = (char*)this->pool_; + delete [] tmp; } ACE_ALLOC_HOOK_DEFINE(ACE_Malloc) diff --git a/ace/Priority_Reactor.cpp b/ace/Priority_Reactor.cpp new file mode 100644 index 00000000000..65776220141 --- /dev/null +++ b/ace/Priority_Reactor.cpp @@ -0,0 +1,184 @@ +// $Id$ + +#define ACE_BUILD_DLL + +#include "ace/Priority_Reactor.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Priority_Reactor.i" +#endif /* __ACE_INLINE__ */ + +typedef ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple> QUEUE_ITERATOR; +// Its iterator. + +typedef ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_SYNCH_NULL_MUTEX> TUPLE_ALLOCATOR; +// Defines the memory allocator used, no need for locking because it +// is only used in one thread of control. + +ACE_ALLOC_HOOK_DEFINE(ACE_Priority_Reactor) + +// Initialize ACE_Select_Reactor. + +const int npriorities = + ACE_Event_Handler::HI_PRIORITY - ACE_Event_Handler::LO_PRIORITY + 1; + +ACE_INLINE +void ACE_Priority_Reactor::init_bucket (void) +{ + // Allocate enough space for all the handles. + // TODO: This can be wrong, maybe we should use other kind of + // allocator here? + ACE_NEW (this->tuple_allocator_, + TUPLE_ALLOCATOR (ACE_Select_Reactor::DEFAULT_SIZE)); + + // The event handlers are assigned to a new As the Event + ACE_NEW (this->bucket, QUEUE*[npriorities]); + // This loops "ensures" exception safety. + int i; + for (i = 0; i < npriorities; ++i) + { + this->bucket[i] = 0; + } + for (i = 0; i < npriorities; ++i) + { + ACE_NEW (this->bucket[i], QUEUE (this->tuple_allocator_)); + } +} + +ACE_Priority_Reactor::ACE_Priority_Reactor (ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq) + : ACE_Select_Reactor(sh, tq), + bucket (0), + tuple_allocator_ (0) +{ + ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor"); + this->init_bucket (); +} + +ACE_Priority_Reactor::ACE_Priority_Reactor (size_t size, + int rs, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq) + : ACE_Select_Reactor (size, rs, sh, tq), + bucket (0), + tuple_allocator_ (0) +{ + ACE_TRACE ("ACE_Priority_Reactor::ACE_Priority_Reactor"); + this->init_bucket (); +} + +ACE_Priority_Reactor::~ACE_Priority_Reactor (void) +{ + ACE_TRACE ("ACE_Priority_Reactor::~ACE_Priority_Reactor"); + for (int i = 0; i < npriorities; ++i) + { + delete this->bucket[i]; + } + delete[] this->bucket; + delete tuple_allocator_; +} + +int +ACE_Priority_Reactor::dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback) +{ + ACE_TRACE ("ACE_Priority_Reactor::dispatch_io_set"); + + if (number_of_active_handles == 0) + { + return 0; + } + + // ACE_DEBUG ((LM_DEBUG, "ACE_Priority_Reactor::dispatch_io_set\n")); + + ACE_HANDLE handle; + + + // The range for which there exists any Event_Tuple is computed on + // the ordering loop, minimizing iterations on the dispatching + // loop. + int min_priority = ACE_Event_Handler::HI_PRIORITY; + int max_priority = ACE_Event_Handler::LO_PRIORITY; + + ACE_Handle_Set_Iterator handle_iter (dispatch_mask); + + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE) + { + ACE_Event_Tuple et (this->handler_rep_.find (handle), handle); + int prio = et.event_handler_->priority (); + + // If the priority is out of range assign the minimum priority. + if (prio < ACE_Event_Handler::LO_PRIORITY + || prio > ACE_Event_Handler::HI_PRIORITY) + { + prio = ACE_Event_Handler::LO_PRIORITY; + } + + bucket[prio]->enqueue_tail (et); + // Update the priority ranges.... + if (min_priority > prio) + { + min_priority = prio; + } + if (max_priority < prio) + { + max_priority = prio; + } + } + + // ACE_DEBUG ((LM_DEBUG, "dispatching.... %d\n", number_of_active_handles)); + + for (int i = max_priority; i >= min_priority; --i) + { + // Remove all the entries from the wrappers + while (!bucket[i]->is_empty () + && number_dispatched < number_of_active_handles + && this->state_changed_ == 0) + { + ACE_Event_Tuple et; + bucket[i]->dequeue_head (et); + this->notify_handle (et.handle_, + mask, + ready_mask, + et.event_handler_, + callback); + number_dispatched++; + } + // Even if we are aborting the loop due to this->state_changed + // or another error we still want to cleanup the buckets. + bucket[i]->reset (); + } + + if (number_dispatched > 0 && this->state_changed_) + { + return -1; + } + + return 0; +} + +void +ACE_Priority_Reactor::dump (void) const +{ + ACE_TRACE ("ACE_Priority_Reactor::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_Select_Reactor::dump (); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Unbounded_Queue<ACE_Event_Tuple>; +template class ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple>; +template class ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_SYNCH_NULL_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Unbounded_Queue<ACE_Event_Tuple> +#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_Event_Tuple> +#pragme instantiate ACE_Cached_Allocator<ACE_Node<ACE_Event_Tuple>, ACE_SYNCH_NULL_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Priority_Reactor.h b/ace/Priority_Reactor.h new file mode 100644 index 00000000000..eef9fc9ea20 --- /dev/null +++ b/ace/Priority_Reactor.h @@ -0,0 +1,90 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Priority_Reactor.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_PRIORITY_REACTOR_H) +#define ACE_PRIORITY_REACTOR_H + +#include "ace/Containers.h" +#include "ace/Select_Reactor.h" + +class ACE_Export ACE_Priority_Reactor : public ACE_Select_Reactor + // = TITLE + // + // Implements priority based dispatching. + // + // = DESCRIPTION + // + // This class refines the dispatching mechanism for the + // Select_Reactor by taking advantage of the priority method on + // ACE_Event_Handler. +{ +public: + // = Initialization and termination methods. + + ACE_Priority_Reactor (ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0); + // Initialize <ACE_Priority_Reactor> with the default size. + + ACE_Priority_Reactor (size_t size, + int restart = 0, + ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0); + // Initialize <ACE_Priority_Reactor> with size <size>. + + virtual ~ACE_Priority_Reactor (void); + // Close down the select_reactor and release all of its resources. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + // = Dispatching methods. + + virtual int dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback); + // We simply override this function to implement the priority + // dispatching. + +private: + void init_bucket (void); + // A small helper to initialize the bucket. + + typedef ACE_Unbounded_Queue<ACE_Event_Tuple> QUEUE; + QUEUE** bucket; + // There is a queue per-priority, which simply holds the + // Event_Handlers until we knwo who goes first. + + ACE_Allocator* tuple_allocator_; + // The queues themselves use this allocator to minimize dynamic + // memory usage. + + ACE_Priority_Reactor (const ACE_Select_Reactor &); + ACE_Priority_Reactor &operator = (const ACE_Select_Reactor &); + // Deny access since member-wise won't work... +}; + +#if defined (__ACE_INLINE__) +#include "ace/Priority_Reactor.i" +#endif /* __ACE_INLINE__ */ + +#endif /* ACE_PRIORITY_REACTOR_H */ diff --git a/ace/Priority_Reactor.i b/ace/Priority_Reactor.i new file mode 100644 index 00000000000..8876a83cfdb --- /dev/null +++ b/ace/Priority_Reactor.i @@ -0,0 +1,18 @@ +/* -*- C++ -*- */ +// $Id$ + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (void) +: event_handler_ (0), + handle_ (ACE_INVALID_HANDLE) +{ +} + +ACE_INLINE +ACE_Event_Tuple::ACE_Event_Tuple (ACE_Event_Handler* eh, + ACE_HANDLE h) +: event_handler_ (eh), + handle_ (h) +{ +} + diff --git a/ace/Sched_Params.h b/ace/Sched_Params.h index 4d95b8f1017..e8f14b31df7 100644 --- a/ace/Sched_Params.h +++ b/ace/Sched_Params.h @@ -151,6 +151,46 @@ private: // setting the quantum (can that be done on Win32?). }; +class ACE_Export ACE_Sched_Priority_Iterator + // = TITLE + // An iterator over the OS defined priorities. + // + // = DESCRIPTION + // The order of priorities (numeric value vs. importance) is OS + // dependant, it can be the case that the priorities are not even + // contigous. + // This class permits iteration over priorities using the iterator + // pattern. +{ +public: + ACE_Sched_Priority_Iterator (const ACE_Sched_Params::Policy& policy, + int scope = ACE_SCOPE_THREAD); + // Initialize the iterator, the arguments define the scheduling + // policy and scope for the priorities (see ACE_Sched_Param). + + int more (void) const; + // Check if there are more priorities. + + int priority (void) const; + // Return the current priority + + void next (void); + // Move to the next priority. + // The iteration is from lowest to highest importance. + + const ACE_Sched_Params::Policy& policy (void) const; + // Accessor for the scheduling policy over which we are iterating. + + int scope (void) const; + // Accessor for the scheduling + +private: + ACE_Sched_Params::Policy policy_; + int scope_; + int priority_; + int done_; +}; + #if defined (__ACE_INLINE__) #include "ace/Sched_Params.i" #endif /* __ACE_INLINE__ */ diff --git a/ace/Sched_Params.i b/ace/Sched_Params.i index 08d4eb810b9..4008ccb5fe7 100644 --- a/ace/Sched_Params.i +++ b/ace/Sched_Params.i @@ -82,5 +82,55 @@ ACE_Sched_Params::quantum (const ACE_Time_Value &quant) this->quantum_ = quant; } +ACE_INLINE +ACE_Sched_Priority_Iterator::ACE_Sched_Priority_Iterator + (const ACE_Sched_Params::Policy& policy, int scope) + : policy_ (policy), + scope_ (scope) +{ + ACE_TRACE ("ACE_Sched_Priority_Iterator::ACE_Sched_Priority_Iterator"); + priority_ = ACE_Sched_Params::priority_min (this->policy (), this->scope ()); + this->done_ = 0; +} + +ACE_INLINE +int ACE_Sched_Priority_Iterator::more (void) const +{ + return !this->done_; +} + +ACE_INLINE +int ACE_Sched_Priority_Iterator::priority (void) const +{ + return this->priority_; +} + +ACE_INLINE +void ACE_Sched_Priority_Iterator::next (void) +{ + if (this->done_) + { + return; + } + + int old_priority = this->priority_; + priority_ = ACE_Sched_Params::next_priority (this->policy (), + this->priority (), + this->scope ()); + this->done_ = (old_priority == priority_); +} + +ACE_INLINE +const ACE_Sched_Params::Policy& + ACE_Sched_Priority_Iterator::policy (void) const +{ + return this->policy_; +} + +ACE_INLINE +int ACE_Sched_Priority_Iterator::scope (void) const +{ + return this->scope_; +} // EOF diff --git a/ace/Select_Reactor.cpp b/ace/Select_Reactor.cpp index 97a706799b2..a4c2fca700b 100644 --- a/ace/Select_Reactor.cpp +++ b/ace/Select_Reactor.cpp @@ -85,7 +85,7 @@ ACE_Select_Reactor_Handler_Repository::open (size_t size) #if defined (ACE_WIN32) // Try to allocate the memory. ACE_NEW_RETURN (this->event_handlers_, - ACE_NT_EH_Record[size], + ACE_Event_Tuple[size], -1); // Initialize the ACE_Event_Handler * to { ACE_INVALID_HANDLE, 0 }. @@ -1510,102 +1510,86 @@ ACE_Select_Reactor::dispatch_notification_handlers (int &number_of_active_handle } int -ACE_Select_Reactor::dispatch_io_handlers (int &number_of_active_handles, - ACE_Select_Reactor_Handle_Set &dispatch_set) +ACE_Select_Reactor::dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback) { - int number_dispatched = 0; - - if (number_of_active_handles > 0) - { - ACE_HANDLE handle; + ACE_HANDLE handle; - // Handle output events (this code needs to come first to handle - // the obscure case of piggy-backed data coming along with the - // final handshake message of a nonblocking connection). - - ACE_Handle_Set_Iterator handle_iter_wr (dispatch_set.wr_mask_); - - while ((handle = handle_iter_wr ()) != ACE_INVALID_HANDLE - && number_dispatched < number_of_active_handles - && this->state_changed_ == 0) - { - number_dispatched++; - this->notify_handle (handle, - ACE_Event_Handler::WRITE_MASK, - this->ready_set_.wr_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_output); - } - } + ACE_Handle_Set_Iterator handle_iter (dispatch_mask); - if (number_dispatched > 0) + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE + && number_dispatched < number_of_active_handles + && this->state_changed_ == 0) { - number_of_active_handles -= number_dispatched; - if (this->state_changed_) - return -1; + // ACE_DEBUG ((LM_DEBUG, "ACE_Select_Reactor::dispatching\n")); + number_dispatched++; + this->notify_handle (handle, + mask, + ready_mask, + this->handler_rep_.find (handle), + callback); } - number_dispatched = 0; - - if (number_of_active_handles > 0) + if (number_dispatched > 0 && this->state_changed_) { - ACE_HANDLE handle; - - // Handle "exceptional" events. + return -1; + } - ACE_Handle_Set_Iterator handle_iter_ex (dispatch_set.ex_mask_); + return 0; +} - while ((handle = handle_iter_ex ()) != ACE_INVALID_HANDLE - && number_dispatched < number_of_active_handles - && this->state_changed_ == 0) - { - this->notify_handle (handle, - ACE_Event_Handler::EXCEPT_MASK, - this->ready_set_.ex_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_exception); - number_dispatched++; - } - } +int +ACE_Select_Reactor::dispatch_io_handlers (int &number_of_active_handles, + ACE_Select_Reactor_Handle_Set &dispatch_set) +{ + int number_dispatched = 0; - if (number_dispatched > 0) + // Handle output events (this code needs to come first to handle + // the obscure case of piggy-backed data coming along with the + // final handshake message of a nonblocking connection). + + // ACE_DEBUG ((LM_DEBUG, "ACE_Select_Reactor::dispatch - WRITE\n")); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::WRITE_MASK, + dispatch_set.wr_mask_, + this->ready_set_.wr_mask_, + &ACE_Event_Handler::handle_output) == -1) { number_of_active_handles -= number_dispatched; - if (this->state_changed_) - return -1; + return -1; } - number_dispatched = 0; - if (number_of_active_handles > 0) + // ACE_DEBUG ((LM_DEBUG, "ACE_Select_Reactor::dispatch - EXCEPT\n")); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::EXCEPT_MASK, + dispatch_set.ex_mask_, + this->ready_set_.ex_mask_, + &ACE_Event_Handler::handle_exception) == -1) { - ACE_HANDLE handle; - - // Handle input, passive connection, and shutdown events. - - ACE_Handle_Set_Iterator handle_iter_rd (dispatch_set.rd_mask_); - - while ((handle = handle_iter_rd ()) != ACE_INVALID_HANDLE - && number_dispatched < number_of_active_handles - && this->state_changed_ == 0) - { - this->notify_handle (handle, - ACE_Event_Handler::READ_MASK, - this->ready_set_.rd_mask_, - this->handler_rep_.find (handle), - &ACE_Event_Handler::handle_input); - number_dispatched++; - } + number_of_active_handles -= number_dispatched; + return -1; } - if (number_dispatched > 0) + // ACE_DEBUG ((LM_DEBUG, "ACE_Select_Reactor::dispatch - READ\n")); + if (this->dispatch_io_set (number_of_active_handles, + number_dispatched, + ACE_Event_Handler::READ_MASK, + dispatch_set.rd_mask_, + this->ready_set_.rd_mask_, + &ACE_Event_Handler::handle_input) == -1) { number_of_active_handles -= number_dispatched; - - if (this->state_changed_) - return -1; + return -1; } + number_of_active_handles -= number_dispatched; return number_dispatched; } diff --git a/ace/Select_Reactor.h b/ace/Select_Reactor.h index e0ea91a3cd2..0b896b56afb 100644 --- a/ace/Select_Reactor.h +++ b/ace/Select_Reactor.h @@ -88,7 +88,26 @@ private: #else // If we're non-MT safe then this is just a no-op... typedef ACE_Null_Mutex ACE_Select_Reactor_Token; -#endif /* ACE_MT_SAFE */ +#endif /* ACE_MT_SAFE */ + +struct ACE_Event_Tuple + // = TITLE + // + // An ACE_Event_Handler and its associated ACE_HANDLE. + // + // = DESCRIPTION + // + // One ACE_Event_Handler is registered for one or more + // ACE_HANDLE, in some points this information must be stored + // explicitly. This structure provides a lightweight mechanism + // to do so. +{ + ACE_Event_Tuple (void); + ACE_Event_Tuple (ACE_Event_Handler* eh, ACE_HANDLE h); + + ACE_HANDLE handle_; + ACE_Event_Handler* event_handler_; +}; // The following two classes have to be moved out here to keep the SGI // C++ compiler happy (it doesn't like nested classes). @@ -235,16 +254,11 @@ private: // <max_size_>. #if defined (ACE_WIN32) - // = This structure maps <HANDLES> to <Event_Handlers>. - struct ACE_NT_EH_Record - { - ACE_HANDLE handle_; - ACE_Event_Handler *event_handler_; - }; + // = The mapping from <HANDLES> to <Event_Handlers>. - ACE_NT_EH_Record *event_handlers_; + ACE_Event_Tuple *event_handlers_; // The NT version implements this via a dynamically allocated - // array of <ACE_NT_EH_Record *>. Since NT implements ACE_HANDLE + // array of <ACE_Event_Tuple *>. Since NT implements ACE_HANDLE // as a void * we can't directly index into this array. Therefore, // we just do a linear search (for now). Next, we'll modify // things to use hashing or something faster... @@ -746,6 +760,19 @@ protected: // the <dispatch_set>. Returns -1 if the state of the <wait_set_> // has changed, else returns number of handlers dispatched. + virtual int dispatch_io_set (int number_of_active_handles, + int& number_dispatched, + int mask, + ACE_Handle_Set& dispatch_mask, + ACE_Handle_Set& ready_mask, + ACE_EH_PTMF callback); + // Factors the dispatching of an io handle set (each WRITE, EXCEPT + // or READ set of handles). + // It updates the number of handles already dispatched and + // invokes this->notify_handle for all the handles in <dispatch_set> + // using the <mask>, <ready_set> and <callback> parameters. + // Must return -1 if this->state_changed otherwise it must return 0. + virtual void notify_handle (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Handle_Set &, diff --git a/tests/Makefile b/tests/Makefile index 527f0e2338b..52e67ac7e45 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,5 +1,6 @@ #---------------------------------------------------------------------------- -# @(#)Makefile 1.1 10/18/96 +# +# $Id$ # # Makefile for all the ACE ``one-button' tests #---------------------------------------------------------------------------- @@ -31,6 +32,7 @@ BIN = Atomic_Op_Test \ Process_Strategy_Test \ Priority_Buffer_Test \ Priority_Task_Test \ + Priority_Reactor_Test \ Pipe_Test \ Reactors_Test \ Reactor_Exceptions_Test \ @@ -89,41 +91,6 @@ endif # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Async_Timer_Queue_Test.o .shobj/Async_Timer_Queue_Test.: Async_Timer_Queue_Test.cpp test_config.h \ - $(ACE_ROOT)/ace/OS.h \ - $(ACE_ROOT)/ace/config.h \ - $(ACE_ROOT)/ace/stdcpp.h \ - $(ACE_ROOT)/ace/OS.i \ - $(ACE_ROOT)/ace/Trace.h \ - $(ACE_ROOT)/ace/Log_Msg.h \ - $(ACE_ROOT)/ace/Log_Record.h \ - $(ACE_ROOT)/ace/ACE.h \ - $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/Log_Priority.h \ - $(ACE_ROOT)/ace/Log_Record.i \ - $(ACE_ROOT)/ace/Signal.h \ - $(ACE_ROOT)/ace/Synch.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ - $(ACE_ROOT)/ace/Synch.i \ - $(ACE_ROOT)/ace/Synch_T.h \ - $(ACE_ROOT)/ace/Event_Handler.h \ - $(ACE_ROOT)/ace/Event_Handler.i \ - $(ACE_ROOT)/ace/Synch_T.i \ - $(ACE_ROOT)/ace/Thread.h \ - $(ACE_ROOT)/ace/Thread.i \ - $(ACE_ROOT)/ace/Atomic_Op.i \ - $(ACE_ROOT)/ace/Containers.h \ - $(ACE_ROOT)/ace/Containers.i \ - $(ACE_ROOT)/ace/Signal.i \ - $(ACE_ROOT)/ace/Timer_List.h \ - $(ACE_ROOT)/ace/Timer_List_T.h \ - $(ACE_ROOT)/ace/Timer_Queue_T.h \ - $(ACE_ROOT)/ace/Free_List.h \ - $(ACE_ROOT)/ace/Free_List.i \ - $(ACE_ROOT)/ace/Timer_Queue_T.i .obj/Atomic_Op_Test.o .shobj/Atomic_Op_Test.: Atomic_Op_Test.cpp \ $(ACE_ROOT)/tests/test_config.h \ $(ACE_ROOT)/ace/OS.h \ @@ -710,7 +677,8 @@ endif $(ACE_ROOT)/ace/Handle_Set.h \ $(ACE_ROOT)/ace/Handle_Set.i .obj/Simple_Message_Block_Test.o .shobj/Simple_Message_Block_Test.: Simple_Message_Block_Test.cpp \ - test_config.h $(ACE_ROOT)/ace/OS.h \ + test_config.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/config.h \ $(ACE_ROOT)/ace/stdcpp.h \ $(ACE_ROOT)/ace/OS.i \ @@ -1321,6 +1289,113 @@ endif $(ACE_ROOT)/ace/Sched_Params.h \ $(ACE_ROOT)/ace/Sched_Params.i \ test_config.h +.obj/Priority_Reactor_Test.o .shobj/Priority_Reactor_Test.: Priority_Reactor_Test.cpp test_config.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/stdcpp.h \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Get_Opt.h \ + $(ACE_ROOT)/ace/Get_Opt.i \ + $(ACE_ROOT)/ace/SOCK_Connector.h \ + $(ACE_ROOT)/ace/SOCK_Stream.h \ + $(ACE_ROOT)/ace/SOCK_IO.h \ + $(ACE_ROOT)/ace/SOCK.h \ + $(ACE_ROOT)/ace/Addr.h \ + $(ACE_ROOT)/ace/Addr.i \ + $(ACE_ROOT)/ace/IPC_SAP.h \ + $(ACE_ROOT)/ace/IPC_SAP.i \ + $(ACE_ROOT)/ace/SOCK.i \ + $(ACE_ROOT)/ace/SOCK_IO.i \ + $(ACE_ROOT)/ace/INET_Addr.h \ + $(ACE_ROOT)/ace/INET_Addr.i \ + $(ACE_ROOT)/ace/SOCK_Stream.i \ + $(ACE_ROOT)/ace/Time_Value.h \ + $(ACE_ROOT)/ace/SOCK_Connector.i \ + $(ACE_ROOT)/ace/SOCK_Acceptor.h \ + $(ACE_ROOT)/ace/SOCK_Acceptor.i \ + $(ACE_ROOT)/ace/Acceptor.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Svc_Handler.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Svc_Handler.i \ + $(ACE_ROOT)/ace/Acceptor.i \ + $(ACE_ROOT)/ace/Connector.h \ + $(ACE_ROOT)/ace/Map_Manager.h \ + $(ACE_ROOT)/ace/Map_Manager.i \ + $(ACE_ROOT)/ace/Connector.i \ + $(ACE_ROOT)/ace/Auto_Ptr.h \ + $(ACE_ROOT)/ace/Auto_Ptr.i \ + $(ACE_ROOT)/ace/Priority_Reactor.h \ + $(ACE_ROOT)/ace/Select_Reactor.h \ + $(ACE_ROOT)/ace/Token.h \ + $(ACE_ROOT)/ace/Token.i \ + $(ACE_ROOT)/ace/Pipe.h \ + $(ACE_ROOT)/ace/Pipe.i \ + $(ACE_ROOT)/ace/Select_Reactor.i \ + $(ACE_ROOT)/ace/Priority_Reactor.i \ + Priority_Reactor_Test.h .obj/Pipe_Test.o .shobj/Pipe_Test.: Pipe_Test.cpp test_config.h \ $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/config.h \ @@ -2088,7 +2163,8 @@ endif $(ACE_ROOT)/ace/Process.i .obj/Time_Value_Test.o .shobj/Time_Value_Test.: Time_Value_Test.cpp \ $(ACE_ROOT)/ace/config.h \ - test_config.h $(ACE_ROOT)/ace/OS.h \ + test_config.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/stdcpp.h \ $(ACE_ROOT)/ace/OS.i \ $(ACE_ROOT)/ace/Trace.h \ diff --git a/tests/Priority_Reactor_Test.cpp b/tests/Priority_Reactor_Test.cpp new file mode 100644 index 00000000000..93fac8f5594 --- /dev/null +++ b/tests/Priority_Reactor_Test.cpp @@ -0,0 +1,313 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Priority_Reactor_Test.cpp +// +// = DESCRIPTION +// This is a test of the <ACE_Priority_Reactor>. +// The test forks two processes (for a total of three processes) +// which connect to the main process and +// The clients send data to a connector, interestingly enough the +// acceptor will give more priority to the second connection, +// which should run always before the first one. +// +// = AUTHOR +// Carlos O'Ryan +// +// ============================================================================ + +#include "test_config.h" +#include "ace/Get_Opt.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Acceptor.h" +#include "ace/Handle_Set.h" +#include "ace/Connector.h" +#include "ace/Strategies.h" +#include "ace/Auto_Ptr.h" +#include "ace/Priority_Reactor.h" +#include "Priority_Reactor_Test.h" + +int opt_nchildren = 20; +int opt_nloops = 200; +int opt_priority_reactor = 1; + + + +class Read_Handler : public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_INET_Addr, ACE_SYNCH> +// = TITLE +// A Svc_Handler with a priority twist. +// +// = DESCRIPTION +// This Svc_Handler receives the data sent by the childs or writer +// threads; each one sets it own priority to a new level, in a +// cyclic manner. +// The main point is test and exercise the priority dispatching +// features of ACE_Priority_Reactor. +{ +public: + static void set_countdown (int nchildren); + // Set the number of children or writer threads we will be running, + // when they are all gone we terminate the reactor loop. + + virtual int open (void *); + virtual int handle_input (ACE_HANDLE h); + // The Svc_Handler callbacks. + +private: + static int waiting; + // How many writers are we waiting for. + + static int started; + // How many readers have started. +}; + +typedef ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR; + +int Read_Handler::waiting = 0; +int Read_Handler::started = 0; + +void Read_Handler::set_countdown (int nchildren) +{ + Read_Handler::waiting = nchildren; +} + +int Read_Handler::open (void *) +{ + if (this->peer ().enable (ACE_NONBLOCK) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Read_Handler::open, " + "cannot set non blocking mode"), -1); + } + + if (reactor ()->register_handler (this, READ_MASK) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) Read_Handler::open, " + "cannot register handler"), -1); + } + + // A number larger than the actual number of priorities, so some + // clients are misbehaved, hence pusnished. + const int max_priority = 15; + + priority (ACE_Event_Handler::LO_PRIORITY + started % max_priority); + started++; + ACE_DEBUG((LM_DEBUG, "(%P|%t) created svc_handler for handle %d " + "with priority %d\n", get_handle (), priority ())); + + return 0; +} + +int Read_Handler::handle_input (ACE_HANDLE h) +{ + // ACE_DEBUG((LM_DEBUG, + // "(%P|%t) Read_Handler::handle_input(%d)\n", h)); + + char buf[BUFSIZ]; + + ssize_t result = this->peer ().recv (buf, sizeof(buf)); + if (result <= 0) + { + if (result < 0 && errno == EWOULDBLOCK) + return 0; + + if (result != 0) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %p\n", + "Read_Handler::handle_input")); + } + waiting--; + if (waiting == 0) + { + ACE_Reactor::instance()->end_event_loop(); + } + ACE_DEBUG ((LM_DEBUG, "(%P|%t) %p closing down\n", + "Read_Handler::handle_input")); + return -1; + } + + // ACE_DEBUG((LM_DEBUG, + // "(%P|%t) read %d bytes from handle %d, priority %d\n", + // result, h, priority ())); + return 0; +} + + + +class Write_Handler : public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_INET_Addr, ACE_SYNCH> +// = TITLE +// A simple writer. +// +// = DESCRIPTION +// This Svc_Handler simply connects to a server and sends some +// output to it. +// Its purpose is to feed the test. +{ +public: + virtual int open (void *); + virtual int svc (void); +}; + +typedef ACE_Connector<Write_Handler, ACE_SOCK_CONNECTOR> CONNECTOR; + +int Write_Handler::open (void *) +{ + return this->svc (); +} + +int Write_Handler::svc (void) +{ + ACE_Time_Value pause (0, 1000); + for (int i = 0; i < opt_nloops; ++i) + { + if (this->peer ().send_n (ACE_ALPHABET, + sizeof(ACE_ALPHABET) - 1) == -1) + ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n", "send_n")); + ACE_OS::sleep (pause); + } + return -1; +} + +// Execute the client tests. +void* client (void* arg) +{ + ACE_INET_Addr* server_addr = (ACE_INET_Addr*)arg; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) running client\n")); + CONNECTOR connector; + + Write_Handler* writer = 0; + + connector.connect (writer, *server_addr); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) finishing client\n")); + return 0; +} + +int main (int argc, char * argv[]) +{ + ACE_START_TEST ("Priority_Reactor_Test"); + + int disable_priorities = 0; + ACE_Get_Opt getopt (argc, argv, "dc:l:", 1); + for (int c; (c = getopt ()) != -1; ) + switch (c) + { + case 'd': + opt_priority_reactor = 0; + break; + case 'c': + opt_nchildren = atoi (getopt.optarg); + break; + case 'l': + opt_nloops = atoi (getopt.optarg); + break; + } + + // Manage memory automagically. + auto_ptr<ACE_Select_Reactor> impl; + auto_ptr<ACE_Reactor> reactor; + + if (opt_priority_reactor) + { + ACE_Select_Reactor* impl_ptr; + ACE_NEW_RETURN (impl_ptr, ACE_Priority_Reactor, -1); + impl = impl_ptr; + ACE_Reactor* reactor_ptr; + ACE_NEW_RETURN (reactor_ptr, ACE_Reactor (impl_ptr), -1); + reactor = reactor_ptr; + ACE_Reactor::instance (reactor_ptr); + } + + Read_Handler::set_countdown (opt_nchildren); + + // Acceptor + ACCEPTOR acceptor; + + acceptor.priority (ACE_Event_Handler::HI_PRIORITY); + ACE_INET_Addr server_addr; + + // Bind acceptor to any port and then find out what the port was. + if (acceptor.open ((const ACE_INET_Addr &) ACE_Addr::sap_any) == -1 + || acceptor.acceptor ().get_local_addr (server_addr) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "open"), -1); + } + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting server at port %d\n", + server_addr.get_port_number ())); + + int i; + +#if defined (ACE_HAS_THREADS) + for (i = 0; i < opt_nchildren; ++i) + { + if (ACE_Thread_Manager::instance ()->spawn + (ACE_THR_FUNC (client), + (void *) &server_addr, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n%a", "thread create failed")); + } +#elif !defined (ACE_WIN32) && !defined (VXWORKS) + for (i = 0; i < nchildren; ++i) + { + child_data[i].server_addr = server_addr; + + switch (ACE_OS::fork ("child")) + { + case -1: + ACE_ERROR ((LM_ERROR, "(%P|%t) %p\n%a", "fork failed")); + exit (-1); + /* NOTREACHED */ + case 0: + client (&child_data[i]); + exit (0); + break; + /* NOTREACHED */ + default: + break; + /* NOTREACHED */ + } + } +#else + ACE_ERROR ((LM_ERROR, + "(%P|%t) only one thread may be run in a process on this platform\n%a", 1)); +#endif /* ACE_HAS_THREADS */ + + ACE_Reactor::instance()->register_handler + (&acceptor, ACE_Event_Handler::READ_MASK); + ACE_Reactor::instance()->run_event_loop (); + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) waiting for the children...\n")); + +#if defined (ACE_HAS_THREADS) + ACE_Thread_Manager::instance ()->wait (); +#elif !defined (ACE_WIN32) && !defined (VXWORKS) + for (i = 0; i < nchildren; ++i) + { + pid_t pid = ACE_OS::wait(); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) child %d terminated\n", pid)); + } +#else + /* NOTREACHED */ + // We aborted on the previous #ifdef +#endif /* ACE_HAS_THREADS */ + + ACE_END_TEST; + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Connector<Write_Handler, ACE_SOCK_CONNECTOR>; +template class ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR>; +template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Connector<Write_Handler, ACE_SOCK_CONNECTOR> +#pragma instantiate ACE_Acceptor<Read_Handler, ACE_SOCK_ACCEPTOR> +#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_SYNCH> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + diff --git a/tests/Priority_Reactor_Test.h b/tests/Priority_Reactor_Test.h new file mode 100644 index 00000000000..43c4d3b556d --- /dev/null +++ b/tests/Priority_Reactor_Test.h @@ -0,0 +1,56 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Priority_Reactor_Test.h +// +// = DESCRIPTION +// This class gets its own header file to work around AIX C++ +// compiler "features" related to template instantiation... It is +// only used by Priority_Reactor_Test.cpp. +// +// = AUTHOR +// Carlos O'Ryan +// +// ============================================================================ + +#ifndef ACE_TESTS_PRIORITY_REACTOR_TEST_H +#define ACE_TESTS_PRIORITY_REACTOR_TEST_H + +#include "ace/Service_Config.h" +#include "ace/Svc_Handler.h" + +class Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> + // = TITLE + // This class is the product created by both <ACE_Connector> + // and <ACE_Acceptor> objects. + // + // = DESCRIPTION + // This class gets its own header file to work around AIX C++ + // compiler "features" related to template instantiation... It is + // only used by Conn_Test.cpp. + // Basically it will run as an active object on the clients, + // sending an stream of data to the server, where they will be + // demultiplexed using a reactive strategy. +{ +public: + Svc_Handler (ACE_Thread_Manager * = 0); + // Do-nothing constructor. + + virtual int open (void *); + // Initialization hook. + + void send_data (void); + // Send data to server. + + virtual int handle_input(ACE_HANDLE handle); + // Recv data from client. + +private: +}; + +#endif /* ACE_TESTS_PRIORITY_REACTOR_TEST_H */ diff --git a/tests/Priority_Task_Test.cpp b/tests/Priority_Task_Test.cpp index a06391c0799..ff72c86f1fd 100644 --- a/tests/Priority_Task_Test.cpp +++ b/tests/Priority_Task_Test.cpp @@ -24,12 +24,24 @@ #if defined (ACE_HAS_THREADS) class Priority_Task : public ACE_Task<ACE_MT_SYNCH> +// = TITLE +// A simple Task that runs itself a different priorities. +// +// = DESCRIPTION +// This task uses the void* argument on open to run the svc() method +// at a different priority. The point is execise the thread priority +// features of ACE. { public: Priority_Task (void); + // The constructor int open (void *); + // Receives the priority and run svc() on a separate thread at that + // priority. + int svc (void); + // Runs on a separate thread an checks the priority. private: int priority_; |