diff options
Diffstat (limited to 'ace')
-rw-r--r-- | ace/Containers_T.cpp | 2 | ||||
-rw-r--r-- | ace/Containers_T.h | 3 | ||||
-rw-r--r-- | ace/Event_Handler.h | 2 | ||||
-rw-r--r-- | ace/OS.h | 4 | ||||
-rw-r--r-- | ace/Process_Manager.cpp | 1 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 164 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 15 |
7 files changed, 184 insertions, 7 deletions
diff --git a/ace/Containers_T.cpp b/ace/Containers_T.cpp index 9956bdcdb52..c21860c5ff1 100644 --- a/ace/Containers_T.cpp +++ b/ace/Containers_T.cpp @@ -416,7 +416,7 @@ ACE_Unbounded_Queue<T>::dump (void) const for (ACE_Unbounded_Queue_Iterator<T> iter (*(ACE_Unbounded_Queue<T> *) this); iter.next (item) != 0; iter.advance ()) - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++)); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } diff --git a/ace/Containers_T.h b/ace/Containers_T.h index 410c1d23306..222d3a4dc34 100644 --- a/ace/Containers_T.h +++ b/ace/Containers_T.h @@ -475,7 +475,8 @@ public: // = Additional utility methods. void reset (void); - // Reset the <ACE_Unbounded_Queue> to be empty. + // Reset the <ACE_Unbounded_Queue> to be empty and release all its + // dynamically allocated resources. int get (T *&item, size_t slot = 0) const; // Get the <slot>th element in the set. Returns -1 if the element diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h index 3ed97652830..1f33a298f36 100644 --- a/ace/Event_Handler.h +++ b/ace/Event_Handler.h @@ -10,7 +10,7 @@ // Event_Handler.h // // = AUTHOR -// Doug Schmidt +// Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ @@ -56,6 +56,10 @@ enum ACE_Recyclable_State #define ACE_DEFAULT_SERVICE_REPOSITORY_SIZE 1024 #endif /* ACE_DEFAULT_SERVICE_REPOSITORY_SIZE */ +#if !defined (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE) +#define ACE_REACTOR_NOTIFICATION_ARRAY_SIZE 1024 +#endif /* ACE_REACTOR_NOTIFICATION_ARRAY_SIZE */ + // Do not change these values wantonly since GPERF depends on them.. #define ACE_ASCII_SIZE 128 #define ACE_EBCDIC_SIZE 256 diff --git a/ace/Process_Manager.cpp b/ace/Process_Manager.cpp index 961d9b25345..111c2d19ef8 100644 --- a/ace/Process_Manager.cpp +++ b/ace/Process_Manager.cpp @@ -340,6 +340,7 @@ ACE_Process_Manager::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask) { ACE_TRACE ("ACE_Process_Manager::handle_close"); + ACE_UNUSED_ARG (handle); ACE_ASSERT (handle == this->dummy_handle_); diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index 670de5d78b9..017262e78ac 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -517,6 +517,22 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r, if (this->notification_pipe_.open () == -1) return -1; +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer *temp; + + ACE_NEW_RETURN (temp, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_set_.enqueue_head (temp) == -1) + return -1; + + for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++) + if (free_set_.enqueue_head (temp + i) == -1) + return -1; + +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + // There seems to be a Win32 bug with this... Set this into // non-blocking mode. if (ACE::set_flags (this->notification_pipe_.read_handle (), @@ -539,6 +555,21 @@ int ACE_Select_Reactor_Notify::close (void) { ACE_TRACE ("ACE_Select_Reactor_Notify::close"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Free up the dynamically allocated resources. + ACE_Notification_Buffer **b; + + for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_set_); + alloc_iter.next (b) != 0; + alloc_iter.advance ()) + delete [] *b; + + this->alloc_set_.reset (); + this->notify_set_.reset (); + this->free_set_.reset (); +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ + return this->notification_pipe_.close (); } @@ -555,6 +586,58 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, return 0; else { +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Notification_Buffer buffer (eh, mask); + int notification_required = 0; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + // No pending notifications. + if (this->notify_set_.is_empty ()) + notification_required = 1; + + ACE_Notification_Buffer *temp = 0; + + if (free_set_.dequeue_head (temp) == -1) + { + // Grow the queue of available buffers. + ACE_Notification_Buffer *temp1; + + ACE_NEW_RETURN (temp1, + ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE], + -1); + + if (this->alloc_set_.enqueue_head (temp1) == -1) + return -1; + + // Start at 1 and enqueue only + // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since + // the first one will be used right now. + for (size_t i = 1; + i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; + i++) + this->free_set_.enqueue_head (temp1 + i); + + temp = temp1; + } + + ACE_ASSERT (temp != 0); + *temp = buffer; + + if (notify_set_.enqueue_tail (temp) == -1) + return -1; + + if (notification_required) + { + ssize_t n = ACE::send (this->notification_pipe_.write_handle (), + (char *) &buffer, + sizeof buffer, + timeout); + if (n == -1) + return -1; + } + return 0; +#else ACE_Notification_Buffer buffer (eh, mask); ssize_t n = ACE::send (this->notification_pipe_.write_handle (), @@ -565,6 +648,7 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh, return -1; else return 0; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ } } @@ -624,11 +708,68 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) return -1; } +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + // Dispatch all messages that are in the <notify_set_>. + for (;;) + { + { + // We acquire the lock in a block to make sure we're not + // holding the lock while delivering callbacks... + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + ACE_Notification_Buffer *temp; + + if (notify_set_.is_empty ()) + break; + else if (notify_set_.dequeue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("dequeue_head")), + -1); + buffer = *temp; + if (free_set_.enqueue_head (temp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%p\n"), + ASYS_TEXT ("enqueue_head")), + -1); + } + + // If eh == 0 then another thread is unblocking the + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s + // internal structures. Otherwise, we need to dispatch the + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. + if (buffer.eh_ != 0) + { + int result = 0; + + switch (buffer.mask_) + { + case ACE_Event_Handler::READ_MASK: + case ACE_Event_Handler::ACCEPT_MASK: + result = buffer.eh_->handle_input (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::WRITE_MASK: + result = buffer.eh_->handle_output (ACE_INVALID_HANDLE); + break; + case ACE_Event_Handler::EXCEPT_MASK: + result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE); + break; + default: + // Should we bail out if we get an invalid mask? + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_)); + } + if (result == -1) + buffer.eh_->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::EXCEPT_MASK); + } + } +#else // If eh == 0 then another thread is unblocking the - // ACE_Select_Reactor to update the ACE_Select_Reactor's + // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s // internal structures. Otherwise, we need to dispatch the - // appropriate handle_* method on the ACE_Event_Handler pointer - // we've been passed. + // appropriate handle_* method on the <ACE_Event_Handler> + // pointer we've been passed. if (buffer.eh_ != 0) { int result = 0; @@ -662,6 +803,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) ACE_Event_Handler::EXCEPT_MASK); } +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ number_dispatched++; // Bail out if we've reached the <notify_threshold_>. Note that @@ -679,7 +821,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) // Enqueue ourselves into the list of waiting threads. When we // reacquire the token we'll be off and running again with ownership // of the token. The postcondition of this call is that - // this->select_reactor_.token_.current_owner () == ACE_Thread::self (); + // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. this->select_reactor_->renew (); return number_dispatched; } @@ -781,3 +923,17 @@ ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle, } return omask; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>; +template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>; +template class ACE_Node <ACE_Notification_Buffer *>; +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) +#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *> +#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *> +#pragma instantiate ACE_Node <ACE_Notification_Buffer *> +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index 10e333cb144..45e87f3be84 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -169,6 +169,21 @@ private: // dispatch the <ACE_Event_Handlers> that are passed in via the // notify pipe before breaking out of its <recv> loop. By default, // this is set to -1, which means "iterate until the pipe is empty." + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_set_; + // Keeps track of allocated arrays of type + // <ACE_Notification_Buffer>. + + ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_set_; + // Keeps track of all pending notifications. + + ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_set_; + // Keeps track of all free buffers. + + ACE_SYNCH_MUTEX notify_queue_lock_; + // synchronization for handling of queues +#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */ }; class ACE_Export ACE_Select_Reactor_Handler_Repository |