summaryrefslogtreecommitdiff
path: root/ace
diff options
context:
space:
mode:
Diffstat (limited to 'ace')
-rw-r--r--ace/Containers_T.cpp2
-rw-r--r--ace/Containers_T.h3
-rw-r--r--ace/Event_Handler.h2
-rw-r--r--ace/OS.h4
-rw-r--r--ace/Process_Manager.cpp1
-rw-r--r--ace/Select_Reactor_Base.cpp164
-rw-r--r--ace/Select_Reactor_Base.h15
7 files changed, 184 insertions, 7 deletions
diff --git a/ace/Containers_T.cpp b/ace/Containers_T.cpp
index 9956bdcdb52..c21860c5ff1 100644
--- a/ace/Containers_T.cpp
+++ b/ace/Containers_T.cpp
@@ -416,7 +416,7 @@ ACE_Unbounded_Queue<T>::dump (void) const
for (ACE_Unbounded_Queue_Iterator<T> iter (*(ACE_Unbounded_Queue<T> *) this);
iter.next (item) != 0;
iter.advance ())
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++));
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("count = %d\n"), count++));
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
diff --git a/ace/Containers_T.h b/ace/Containers_T.h
index 410c1d23306..222d3a4dc34 100644
--- a/ace/Containers_T.h
+++ b/ace/Containers_T.h
@@ -475,7 +475,8 @@ public:
// = Additional utility methods.
void reset (void);
- // Reset the <ACE_Unbounded_Queue> to be empty.
+ // Reset the <ACE_Unbounded_Queue> to be empty and release all its
+ // dynamically allocated resources.
int get (T *&item, size_t slot = 0) const;
// Get the <slot>th element in the set. Returns -1 if the element
diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h
index 3ed97652830..1f33a298f36 100644
--- a/ace/Event_Handler.h
+++ b/ace/Event_Handler.h
@@ -10,7 +10,7 @@
// Event_Handler.h
//
// = AUTHOR
-// Doug Schmidt
+// Douglas C. Schmidt <schmidt@cs.wustl.edu>
//
// ============================================================================
diff --git a/ace/OS.h b/ace/OS.h
index 51ed6162553..cbeae015024 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -56,6 +56,10 @@ enum ACE_Recyclable_State
#define ACE_DEFAULT_SERVICE_REPOSITORY_SIZE 1024
#endif /* ACE_DEFAULT_SERVICE_REPOSITORY_SIZE */
+#if !defined (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE)
+#define ACE_REACTOR_NOTIFICATION_ARRAY_SIZE 1024
+#endif /* ACE_REACTOR_NOTIFICATION_ARRAY_SIZE */
+
// Do not change these values wantonly since GPERF depends on them..
#define ACE_ASCII_SIZE 128
#define ACE_EBCDIC_SIZE 256
diff --git a/ace/Process_Manager.cpp b/ace/Process_Manager.cpp
index 961d9b25345..111c2d19ef8 100644
--- a/ace/Process_Manager.cpp
+++ b/ace/Process_Manager.cpp
@@ -340,6 +340,7 @@ ACE_Process_Manager::handle_close (ACE_HANDLE handle,
ACE_Reactor_Mask)
{
ACE_TRACE ("ACE_Process_Manager::handle_close");
+ ACE_UNUSED_ARG (handle);
ACE_ASSERT (handle == this->dummy_handle_);
diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp
index 670de5d78b9..017262e78ac 100644
--- a/ace/Select_Reactor_Base.cpp
+++ b/ace/Select_Reactor_Base.cpp
@@ -517,6 +517,22 @@ ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
if (this->notification_pipe_.open () == -1)
return -1;
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Notification_Buffer *temp;
+
+ ACE_NEW_RETURN (temp,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_set_.enqueue_head (temp) == -1)
+ return -1;
+
+ for (size_t i = 0; i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE; i++)
+ if (free_set_.enqueue_head (temp + i) == -1)
+ return -1;
+
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
// There seems to be a Win32 bug with this... Set this into
// non-blocking mode.
if (ACE::set_flags (this->notification_pipe_.read_handle (),
@@ -539,6 +555,21 @@ int
ACE_Select_Reactor_Notify::close (void)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::close");
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Free up the dynamically allocated resources.
+ ACE_Notification_Buffer **b;
+
+ for (ACE_Unbounded_Queue_Iterator<ACE_Notification_Buffer *> alloc_iter (this->alloc_set_);
+ alloc_iter.next (b) != 0;
+ alloc_iter.advance ())
+ delete [] *b;
+
+ this->alloc_set_.reset ();
+ this->notify_set_.reset ();
+ this->free_set_.reset ();
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+
return this->notification_pipe_.close ();
}
@@ -555,6 +586,58 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
return 0;
else
{
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Notification_Buffer buffer (eh, mask);
+ int notification_required = 0;
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ // No pending notifications.
+ if (this->notify_set_.is_empty ())
+ notification_required = 1;
+
+ ACE_Notification_Buffer *temp = 0;
+
+ if (free_set_.dequeue_head (temp) == -1)
+ {
+ // Grow the queue of available buffers.
+ ACE_Notification_Buffer *temp1;
+
+ ACE_NEW_RETURN (temp1,
+ ACE_Notification_Buffer[ACE_REACTOR_NOTIFICATION_ARRAY_SIZE],
+ -1);
+
+ if (this->alloc_set_.enqueue_head (temp1) == -1)
+ return -1;
+
+ // Start at 1 and enqueue only
+ // (ACE_REACTOR_NOTIFICATION_ARRAY_SIZE - 1) elements since
+ // the first one will be used right now.
+ for (size_t i = 1;
+ i < ACE_REACTOR_NOTIFICATION_ARRAY_SIZE;
+ i++)
+ this->free_set_.enqueue_head (temp1 + i);
+
+ temp = temp1;
+ }
+
+ ACE_ASSERT (temp != 0);
+ *temp = buffer;
+
+ if (notify_set_.enqueue_tail (temp) == -1)
+ return -1;
+
+ if (notification_required)
+ {
+ ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
+ (char *) &buffer,
+ sizeof buffer,
+ timeout);
+ if (n == -1)
+ return -1;
+ }
+ return 0;
+#else
ACE_Notification_Buffer buffer (eh, mask);
ssize_t n = ACE::send (this->notification_pipe_.write_handle (),
@@ -565,6 +648,7 @@ ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *eh,
return -1;
else
return 0;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
}
}
@@ -624,11 +708,68 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
return -1;
}
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ // Dispatch all messages that are in the <notify_set_>.
+ for (;;)
+ {
+ {
+ // We acquire the lock in a block to make sure we're not
+ // holding the lock while delivering callbacks...
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1);
+
+ ACE_Notification_Buffer *temp;
+
+ if (notify_set_.is_empty ())
+ break;
+ else if (notify_set_.dequeue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("%p\n"),
+ ASYS_TEXT ("dequeue_head")),
+ -1);
+ buffer = *temp;
+ if (free_set_.enqueue_head (temp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ASYS_TEXT ("%p\n"),
+ ASYS_TEXT ("enqueue_head")),
+ -1);
+ }
+
+ // If eh == 0 then another thread is unblocking the
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
+ // internal structures. Otherwise, we need to dispatch the
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
+ if (buffer.eh_ != 0)
+ {
+ int result = 0;
+
+ switch (buffer.mask_)
+ {
+ case ACE_Event_Handler::READ_MASK:
+ case ACE_Event_Handler::ACCEPT_MASK:
+ result = buffer.eh_->handle_input (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::WRITE_MASK:
+ result = buffer.eh_->handle_output (ACE_INVALID_HANDLE);
+ break;
+ case ACE_Event_Handler::EXCEPT_MASK:
+ result = buffer.eh_->handle_exception (ACE_INVALID_HANDLE);
+ break;
+ default:
+ // Should we bail out if we get an invalid mask?
+ ACE_ERROR ((LM_ERROR, ASYS_TEXT ("invalid mask = %d\n"), buffer.mask_));
+ }
+ if (result == -1)
+ buffer.eh_->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::EXCEPT_MASK);
+ }
+ }
+#else
// If eh == 0 then another thread is unblocking the
- // ACE_Select_Reactor to update the ACE_Select_Reactor's
+ // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
- // appropriate handle_* method on the ACE_Event_Handler pointer
- // we've been passed.
+ // appropriate handle_* method on the <ACE_Event_Handler>
+ // pointer we've been passed.
if (buffer.eh_ != 0)
{
int result = 0;
@@ -662,6 +803,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
ACE_Event_Handler::EXCEPT_MASK);
}
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
number_dispatched++;
// Bail out if we've reached the <notify_threshold_>. Note that
@@ -679,7 +821,7 @@ ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
// Enqueue ourselves into the list of waiting threads. When we
// reacquire the token we'll be off and running again with ownership
// of the token. The postcondition of this call is that
- // this->select_reactor_.token_.current_owner () == ACE_Thread::self ();
+ // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
this->select_reactor_->renew ();
return number_dispatched;
}
@@ -781,3 +923,17 @@ ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
}
return omask;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+template class ACE_Unbounded_Queue <ACE_Notification_Buffer *>;
+template class ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>;
+template class ACE_Node <ACE_Notification_Buffer *>;
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+#pragma instantiate ACE_Unbounded_Queue <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Unbounded_Queue_Iterator <ACE_Notification_Buffer *>
+#pragma instantiate ACE_Node <ACE_Notification_Buffer *>
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h
index 10e333cb144..45e87f3be84 100644
--- a/ace/Select_Reactor_Base.h
+++ b/ace/Select_Reactor_Base.h
@@ -169,6 +169,21 @@ private:
// dispatch the <ACE_Event_Handlers> that are passed in via the
// notify pipe before breaking out of its <recv> loop. By default,
// this is set to -1, which means "iterate until the pipe is empty."
+
+#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> alloc_set_;
+ // Keeps track of allocated arrays of type
+ // <ACE_Notification_Buffer>.
+
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> notify_set_;
+ // Keeps track of all pending notifications.
+
+ ACE_Unbounded_Queue <ACE_Notification_Buffer *> free_set_;
+ // Keeps track of all free buffers.
+
+ ACE_SYNCH_MUTEX notify_queue_lock_;
+ // synchronization for handling of queues
+#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
};
class ACE_Export ACE_Select_Reactor_Handler_Repository