summaryrefslogtreecommitdiff
path: root/ace/ReactorEx.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/ReactorEx.cpp')
-rw-r--r--ace/ReactorEx.cpp911
1 files changed, 0 insertions, 911 deletions
diff --git a/ace/ReactorEx.cpp b/ace/ReactorEx.cpp
deleted file mode 100644
index baf32c205d9..00000000000
--- a/ace/ReactorEx.cpp
+++ /dev/null
@@ -1,911 +0,0 @@
-// ReactorEx.cpp
-// $Id$
-
-#define ACE_BUILD_DLL
-#include "ace/ReactorEx.h"
-#include "ace/Timer_List.h"
-#include "ace/Thread.h"
-
-#if defined (ACE_WIN32)
-
-#if !defined (__ACE_INLINE__)
-#include "ace/ReactorEx.i"
-#endif /* __ACE_INLINE__ */
-
-int
-ACE_Wakeup_All_Threads_Handler::handle_signal (int signum,
- siginfo_t *siginfo,
- ucontext_t *)
-{
- // This will get called when <ReactorEx->wakeup_all_threads_> event
- // is signaled. There is nothing to be done here.
- // ACE_DEBUG ((LM_DEBUG, "(%t) waking up to get updated handle set info\n"));
- return 0;
-}
-
-ACE_ReactorEx_Handler_Repository::ACE_ReactorEx_Handler_Repository (ACE_ReactorEx &reactorEx)
- : reactorEx_ (reactorEx)
-{
-}
-
-int
-ACE_ReactorEx_Handler_Repository::open (size_t size)
-{
- // Dynamic allocation
- ACE_NEW_RETURN (this->current_handles_, ACE_HANDLE[size], -1);
- ACE_NEW_RETURN (this->current_event_handlers_, ACE_Event_Handler *[size], -1);
- ACE_NEW_RETURN (this->to_be_added_handles_, ACE_HANDLE[size], -1);
- ACE_NEW_RETURN (this->to_be_added_event_handlers_, ACE_Event_Handler *[size], -1);
- ACE_NEW_RETURN (this->to_be_deleted_set_, int[size], -1);
-
- // Initialization
- this->max_size_ = size;
- this->max_handlep1_ = 0;
- this->handles_to_be_added_ = 0;
- this->handles_to_be_deleted_ = 0;
- for (size_t i = 0; i < size; i++)
- {
- this->current_handles_[i] = ACE_INVALID_HANDLE;
- this->current_event_handlers_[i] = 0;
- this->to_be_added_handles_[i] = ACE_INVALID_HANDLE;
- this->to_be_added_event_handlers_[i] = 0;
- this->to_be_deleted_set_[i] = 0;
- }
-
- return 0;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::close (void)
-{
- // Let all the handlers know that the <ReactorEx> is closing down
- this->unbind_all ();
-
- return 0;
-}
-
-
-ACE_ReactorEx_Handler_Repository::~ACE_ReactorEx_Handler_Repository (void)
-{
- // Free up dynamically allocated space
- delete[] this->current_handles_;
- delete[] this->current_event_handlers_;
- delete[] this->to_be_added_handles_;
- delete[] this->to_be_added_event_handlers_;
- delete[] this->to_be_deleted_set_;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::remove_handler_i (size_t index,
- ACE_Reactor_Mask mask)
-{
- // Make sure that the DONT_CALL mask is not set and that we haven't
- // already called handle_close() on this handle.
- if (this->to_be_deleted_set_[index] == 0)
- {
- // Add to <to_be_deleted_set_>
- this->to_be_deleted_set_[index] = 1;
- this->handles_to_be_deleted_++;
-
- if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
- this->current_event_handlers_[index]->handle_close
- (this->current_handles_[index], mask);
- return 0;
- }
- else
- return -1;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::unbind (ACE_HANDLE handle,
- ACE_Reactor_Mask mask)
-{
- int error = 0;
- {
- ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->reactorEx_.lock_, -1);
-
- // Go through all the handles looking for <handle>. Even if we find
- // it, we continue through the rest of the list since <handle> could
- // appear multiple times. All handles are checked.
- int result = 0;
- for (size_t i = 0; i < this->max_handlep1_ && error == 0; i++)
- {
- if (this->current_handles_[i] == handle)
- {
- result = this->remove_handler_i (i, mask);
- if (result == -1)
- error = 1;
- }
- }
- }
- // The guard is released here
-
- // Wake up all threads in WaitForMultipleObjects so that they can
- // reconsult the handle set
- this->reactorEx_.wakeup_all_threads ();
-
- return error == 0 ? 0 : -1;
-}
-
-void
-ACE_ReactorEx_Handler_Repository::unbind_all (void)
-{
- {
- ACE_GUARD (ACE_Process_Mutex, ace_mon, this->reactorEx_.lock_);
-
- // Remove all the handlers
- for (size_t i = 0; i < this->max_handlep1_; i++)
- this->remove_handler_i (i, ACE_Event_Handler::NULL_MASK);
- }
- // The guard is released here
-
- // Wake up all threads in WaitForMultipleObjects so that they can
- // reconsult the handle set
- this->reactorEx_.wakeup_all_threads ();
-}
-
-int
-ACE_ReactorEx_Handler_Repository::bind (ACE_HANDLE handle,
- ACE_Event_Handler *event_handler)
-{
- // Make sure that the <handle> is valid
- if (handle == ACE_INVALID_HANDLE)
- handle = event_handler->get_handle ();
- if (this->invalid_handle (handle))
- return -1;
-
- // This GUARD is necessary since we are updating shared state.
- ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->reactorEx_.lock_, -1);
-
- size_t current_size = this->max_handlep1_ +
- this->handles_to_be_added_ -
- this->handles_to_be_deleted_;
-
- // Make sure that there's room in the table.
- if (current_size < this->max_size_)
- {
- // Cache this set into the <to_be_added_*> arrays, till we come
- // around to actually add to the <current_*> arrays
- this->to_be_added_handles_[this->handles_to_be_added_] = handle;
- this->to_be_added_event_handlers_[this->handles_to_be_added_] = event_handler;
- this->handles_to_be_added_++;
-
- // Assign *this* <ReactorEx> to the <Event_Handler>.
- event_handler->reactorEx (&this->reactorEx_);
-
- // Wake up all threads in WaitForMultipleObjects so that they can
- // reconsult the handle set
- this->reactorEx_.wakeup_all_threads ();
- }
- else
- return -1;
-
- return 0;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::changes_required (void)
-{
- // Check if handles have be scheduled for additions or removal
- return (this->handles_to_be_added_ > 0) || (this->handles_to_be_deleted_ > 0);
-}
-
-int
-ACE_ReactorEx_Handler_Repository::make_changes (void)
-{
- // This method must ONLY be called by the
- // <ReactorEx->change_state_thread_>. We therefore assume that there
- // will be no contention for this method and hence no guards are
- // neccessary.
-
- // DELETIONS first
- this->handle_deletions ();
-
- // ADDITIONS here
- this->handle_additions ();
-
- return 0;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::handle_deletions (void)
-{
- // This will help us in keeping track of the last valid index in the
- // handle arrays
- int last_valid_index = this->max_handlep1_ - 1;
-
- // Go through the entire valid array and check for all handles that
- // have been schedule for deletion
- for (int i = last_valid_index; i > 0; i--)
- if (this->to_be_deleted_set_[i] == 1)
- {
- if (i == last_valid_index)
- // If this is the last handle in the set, no need to swap
- // places. Simply remove it.
- {
- this->current_handles_[i] = ACE_INVALID_HANDLE;
- this->current_event_handlers_[i] = 0;
- }
- else
- // Swap this handle with the last valid handle
- {
- this->current_handles_[i] = this->current_handles_[last_valid_index];
- this->current_event_handlers_[i] = this->current_event_handlers_[last_valid_index];
- }
- // Reset the last valid index and clean up the entry in the
- // <to_be_deleted_set_>
- last_valid_index--;
- this->to_be_deleted_set_[i] = 0;
- }
- // Since all to be deleted handles have been taken care of, reset
- // the flag
- this->handles_to_be_deleted_ = 0;
- // Reset <this->max_handlep1_>
- this->max_handlep1_ = last_valid_index + 1;
-
- return 0;
-}
-
-int
-ACE_ReactorEx_Handler_Repository::handle_additions (void)
-{
- // Go through the <to_be_added_*> arrays
- for (int i = 0; i < this->handles_to_be_added_; i++)
- {
- // Add to the end of the current handles set
- this->current_handles_[this->max_handlep1_] = this->to_be_added_handles_[i];
- this->current_event_handlers_[this->max_handlep1_] = this->to_be_added_event_handlers_[i];
- this->max_handlep1_++;
-
- // Reset the <to_be_added_*> arrays entries
- this->to_be_added_handles_[i] = ACE_INVALID_HANDLE;
- this->to_be_added_event_handlers_[i] = 0;
- }
- // Since all to be added handles have been taken care of, reset the
- // counter
- this->handles_to_be_added_ = 0;
-
- return 0;
-}
-
-
-ACE_ReactorEx::ACE_ReactorEx (ACE_Sig_Handler *sh,
- ACE_Timer_Queue *tq)
- : timer_queue_ (0),
- delete_timer_queue_ (0),
- handler_rep_ (*this),
- delete_handler_rep_ (0),
- // this event is initially signaled
- ok_to_wait_ (1),
- // this event is initially unsignaled
- wakeup_all_threads_ (0),
- // this event is initially unsignaled
- waiting_to_change_state_ (0),
- new_owner_ (0),
- active_threads_ (0),
- owner_ (ACE_Thread::self ()),
- change_state_thread_ (0),
- open_for_business_ (0)
-{
- if (this->open (ACE_ReactorEx::DEFAULT_SIZE, 0, sh, tq) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "ReactorEx"));
-}
-
-ACE_ReactorEx::ACE_ReactorEx (size_t size,
- int unused,
- ACE_Sig_Handler *sh,
- ACE_Timer_Queue *tq)
- : timer_queue_ (0),
- delete_timer_queue_ (0),
- handler_rep_ (*this),
- delete_handler_rep_ (0),
- // this event is initially signaled
- ok_to_wait_ (1),
- // this event is initially unsignaled
- wakeup_all_threads_ (0),
- // this event is initially unsignaled
- waiting_to_change_state_ (0),
- new_owner_ (0),
- active_threads_ (0),
- owner_ (ACE_Thread::self ()),
- change_state_thread_ (0),
- open_for_business_ (0)
-{
- if (this->open (size, 0, sh, tq) == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "ReactorEx"));
-}
-
-int
-ACE_ReactorEx::open (size_t size,
- int unused,
- ACE_Sig_Handler *sh,
- ACE_Timer_Queue *tq)
-{
- // This GUARD is necessary since we are updating shared state.
- ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
-
- // If we are already open, return -1
- if (this->open_for_business_)
- return -1;
-
- // Setup the atomic wait array (used later in <handle_events>)
- this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
- this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
-
- // This is to guard against reopens of ReactorEx
- if (this->delete_handler_rep_)
- this->handler_rep_.~ACE_ReactorEx_Handler_Repository ();
-
- // Open the handle repository
- // Two additional handles for internal purposes
- if (this->handler_rep_.open (size + 2) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
- "opening handler repository"),
- -1);
- else
- this->delete_handler_rep_ = 1;
-
- // Open the notification handler
- if (this->notify_handler_.open (*this, timer_queue_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
- "opening notify handler "),
- -1);
-
- // Register for <wakeup_all_threads> event
- if (this->register_handler (&this->wakeup_all_threads_handler_,
- this->wakeup_all_threads_.handle ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
- "registering thread wakeup handler"),
- -1);
-
- // Since we have added two handles into the handler repository,
- // update the <handler_repository_>
- if (this->handler_rep_.changes_required ())
- {
- // Make necessary changes to the handler repository
- this->handler_rep_.make_changes ();
- // Turn off <wakeup_all_threads_> since all necessary changes
- // have completed
- this->wakeup_all_threads_.reset ();
- }
-
- // Timer Queue
- if (this->delete_timer_queue_)
- delete this->timer_queue_;
-
- if (tq == 0)
- {
- ACE_NEW_RETURN (this->timer_queue_, ACE_Timer_List, -1);
- this->delete_timer_queue_ = 1;
- }
- else
- {
- this->timer_queue_ = tq;
- this->delete_timer_queue_ = 0;
- }
-
- // We are open for business
- this->open_for_business_ = 1;
-
- return 0;
-}
-
-int
-ACE_ReactorEx::close (void)
-{
- // This GUARD is necessary since we are updating shared state.
- ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
-
- // If we are already closed, return error
- if (!this->open_for_business_)
- return -1;
-
- // We are now closed
- this->open_for_business_ = 0;
- // This will unregister all handles
- this->handler_rep_.close ();
-
- return 0;
-}
-
-ACE_ReactorEx::~ACE_ReactorEx (void)
-{
- this->close ();
-
- if (this->delete_timer_queue_ == 1)
- {
- delete this->timer_queue_;
- this->timer_queue_ = 0;
- this->delete_timer_queue_ = 0;
- }
-}
-
-void
-ACE_ReactorEx::wakeup_all_threads (void)
-{
- this->wakeup_all_threads_.signal ();
-}
-
-int
-ACE_ReactorEx::notify (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- ACE_Time_Value *timeout)
-{
- return this->notify_handler_.notify (eh, mask, timeout);
-}
-
-int
-ACE_ReactorEx::register_handler (ACE_Event_Handler *eh,
- ACE_HANDLE handle)
-{
- return this->handler_rep_.bind (handle, eh);
-}
-
-int
-ACE_ReactorEx::remove_handler (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask)
-{
- return this->handler_rep_.unbind (eh->get_handle (), mask);
-}
-
-int
-ACE_ReactorEx::remove_handler (ACE_HANDLE handle,
- ACE_Reactor_Mask mask)
-{
- return this->handler_rep_.unbind (handle, mask);
-}
-
-int
-ACE_ReactorEx::schedule_timer (ACE_Event_Handler *handler,
- const void *arg,
- const ACE_Time_Value &delta_time,
- const ACE_Time_Value &interval)
-{
- ACE_TRACE ("ACE_ReactorEx::schedule_timer");
-
- int result = this->timer_queue_->schedule
- (handler, arg, timer_queue_->gettimeofday () + delta_time, interval);
-
- // Wakeup the owner thread so that it gets the latest timer values
- this->notify ();
-
- return result;
-}
-
-// Waits for and dispatches all events. Returns -1 on error, 0 if
-// max_wait_time expired, or the number of events that were dispatched.
-int
-ACE_ReactorEx::handle_events (ACE_Time_Value *max_wait_time,
- int alertable)
-{
- ACE_TRACE ("ACE_ReactorEx::handle_events");
-
- // Make sure we are not closed
- if (!this->open_for_business_)
- return -1;
-
- // Stash the current time -- the destructor of this object will
- // automatically compute how much time elapsed since this method was
- // called.
- ACE_Countdown_Time countdown (max_wait_time);
-
- // Check to see if it is ok to enter ::WaitForMultipleObjects
- // This will acquire <this->lock_> on success
- // On failure, the lock will not be acquired
- int result = this->ok_to_wait (max_wait_time, alertable);
- if (result != 1)
- return result;
-
- // Increment the number of active threads
- this->active_threads_++;
-
- // Release the <lock_>
- this->lock_.release ();
-
- // Update the countdown to reflect time waiting to play with the
- // mutex and event.
- countdown.update ();
-
- int wait_status = this->wait_for_multiple_events (max_wait_time,
- alertable);
-
- result = this->dispatch (wait_status);
-
- this->update_state ();
-
- return result;
-}
-
-int
-ACE_ReactorEx::ok_to_wait (ACE_Time_Value *max_wait_time,
- int alertable)
-{
- // Calculate the max time we should spend here
- //
- // Note: There is really no need to involve the <timer_queue_> here
- // because even if a timeout in the <timer_queue_> does expire we
- // will not be able to dispatch it
- int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
-
- // Atomically wait for both the <lock_> and <ok_to_wait_> event
- int result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
- this->atomic_wait_array_,
- TRUE,
- timeout,
- alertable);
-
- switch (result)
- {
- case WAIT_TIMEOUT:
- errno = ETIME;
- return 0;
- case WAIT_FAILED:
- case WAIT_ABANDONED_0:
- errno = ::GetLastError ();
- return -1;
- default:
- break;
- }
-
- // It is ok to enter ::WaitForMultipleObjects
- return 1;
-}
-
-int
-ACE_ReactorEx::wait_for_multiple_events (ACE_Time_Value *max_wait_time,
- int alertable)
-{
- int timeout = this->calculate_timeout (max_wait_time);
-
- // Wait for any of handles_ to be active, or until timeout expires.
- // If <alertable> is enabled allow asynchronous completion of
- // ReadFileEx and WriteFileEx operations.
- return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
- this->handler_rep_.handles (),
- FALSE,
- timeout,
- alertable);
-}
-
-int
-ACE_ReactorEx::calculate_timeout (ACE_Time_Value *max_wait_time)
-{
- ACE_Time_Value *time = 0;
- if (this->owner_ == ACE_Thread::self ())
- time = this->timer_queue_->calculate_timeout (max_wait_time);
- else
- time = max_wait_time;
-
- if (time == 0)
- return INFINITE;
- else
- return time->msec ();
-}
-
-
-int
-ACE_ReactorEx::dispatch (int wait_status)
-{
- // If "owner" thread
- if (ACE_Thread::self () == this->owner_)
- // Expire all pending timers.
- this->timer_queue_->expire ();
-
- switch (wait_status)
- {
- case WAIT_FAILED: // Failure.
- errno = ::GetLastError ();
- return -1;
- case WAIT_TIMEOUT: // Timeout.
- errno = ETIME;
- return 0;
- case WAIT_ABANDONED_0:
- // We'll let dispatch worry about abandoned mutexes.
- default: // Dispatch.
- return this->dispatch_handles (wait_status - WAIT_OBJECT_0);
- }
-}
-
-// Dispatches any active handles from <handles_[index]> to
-// <handles_[max_handlep1_]> using <WaitForMultipleObjects> to poll
-// through our handle set looking for active handles.
-
-int
-ACE_ReactorEx::dispatch_handles (size_t index)
-{
- for (int number_of_handlers_dispatched = 1;
- ;
- number_of_handlers_dispatched++)
- {
- this->dispatch_handler (index++);
-
- // We're done.
- if (index >= this->handler_rep_.max_handlep1 ())
- return number_of_handlers_dispatched;
-
- DWORD wait_status =
- ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - index,
- this->handler_rep_.handles () + index,
- FALSE, 0); // We're polling.
- switch (wait_status)
- {
- case WAIT_FAILED: // Failure.
- errno = ::GetLastError ();
- /* FALLTHRU */
- case WAIT_TIMEOUT:
- // There are no more handles ready, we can return.
- return number_of_handlers_dispatched;
- default: // Dispatch.
- // Check if a handle successfully became signaled.
- if (wait_status >= WAIT_OBJECT_0 &&
- wait_status < WAIT_OBJECT_0 + this->handler_rep_.max_handlep1 ())
- index += wait_status - WAIT_OBJECT_0;
- else
- // Otherwise, a handle was abandoned.
- index += wait_status - WAIT_ABANDONED_0;
- }
- }
-}
-
-// Dispatches a single handler. Returns 0 on success, -1 if the
-// handler was removed.
-
-int
-ACE_ReactorEx::dispatch_handler (int index)
-{
- // Assign the ``signaled'' HANDLE so that callers can get
- // it.
- ACE_HANDLE handle = *(this->handler_rep_.handles () + index);
- siginfo_t sig (handle);
-
- // Dispatch the handler if it has not been scheduled for deletion.
- // Note that this is a very week test if there are multiple threads
- // dispatching this index as no locks are held here. Generally, you
- // do not want to do something like deleting the this pointer in
- // handle_close() if you have registered multiple times and there is
- // more than one thread in ReactorEx->handle_events().
- if (!this->handler_rep_.scheduled_for_deletion (index))
- if (this->handler_rep_.event_handlers ()[index]->handle_signal (0, &sig) == -1)
- this->handler_rep_.unbind (handle, ACE_Event_Handler::NULL_MASK);
- return 0;
-}
-
-int
-ACE_ReactorEx::update_state (void)
-{
- // This GUARD is necessary since we are updating shared state.
- ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
-
- // Decrement active threads
- this->active_threads_--;
-
- // Check if the state of the handler repository has changed or new
- // owner has to be set
- if (this->handler_rep_.changes_required () || this->new_owner ())
- {
- if (this->change_state_thread_ == 0)
- // Try to become the thread which will be responsible for the
- // changes
- {
- this->change_state_thread_ = ACE_Thread::self ();
- // Make sure no new threads are allowed to enter
- this->ok_to_wait_.reset ();
-
- if (this->active_threads_ > 0)
- // Check for other active threads
- {
- // Wake up all other threads
- this->wakeup_all_threads_.signal ();
- // Release <lock_>
- monitor.release ();
- // Go to sleep waiting for all other threads to get done
- this->waiting_to_change_state_.wait ();
- // Re-acquire <lock_> again
- monitor.acquire ();
- }
-
- if (this->handler_rep_.changes_required ())
- // Make necessary changes to the handler repository
- this->handler_rep_.make_changes ();
- if (this->new_owner ())
- // Update the owner
- this->change_owner ();
- // Turn off <wakeup_all_threads_>
- this->wakeup_all_threads_.reset ();
- // Let everyone know that it is ok to go ahead
- this->ok_to_wait_.signal ();
- // Reset this flag
- this->change_state_thread_ = 0;
- }
- else
- {
- if (this->active_threads_ == 0)
- // This thread did not get a chance to become the change
- // thread. If it is the last one out, it will wakeup the
- // change thread
- this->waiting_to_change_state_.signal ();
- }
- }
-
- return 0;
-}
-
-// ************************************************************
-
-ACE_ReactorEx_Notify::ACE_ReactorEx_Notify (void)
- : max_notify_iterations_ (-1),
- timer_queue_ (0)
-{
-}
-
-int
-ACE_ReactorEx_Notify::open (ACE_ReactorEx &reactorEx,
- ACE_Timer_Queue *timer_queue)
-{
- timer_queue_ = timer_queue;
- return reactorEx.register_handler (this);
-}
-
-ACE_HANDLE
-ACE_ReactorEx_Notify::get_handle (void) const
-{
- return this->wakeup_one_thread_.handle ();
-}
-
-// Handle all pending notifications.
-
-int
-ACE_ReactorEx_Notify::handle_signal (int signum,
- siginfo_t *siginfo,
- ucontext_t *)
-{
- ACE_UNUSED_ARG (signum);
-
- // Just check for sanity...
- if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
- return -1;
-
- // This will get called when <ReactorEx->wakeup_one_thread_> event
- // is signaled.
- // ACE_DEBUG ((LM_DEBUG, "(%t) waking up to handle internal notifications\n"));
-
- for (int i = 1; ; i++)
- {
- ACE_Message_Block *mb = 0;
-
- if (this->message_queue_.dequeue_head
- (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
- {
- if (errno == EWOULDBLOCK)
- // We've reached the end of the processing, return
- // normally.
- return 0;
- else
- return -1; // Something weird happened...
- }
- else
- {
- ACE_Notification_Buffer *buffer =
- (ACE_Notification_Buffer *) mb->base ();
-
- // If eh == 0 then we've got major problems! Otherwise, we
- // need to dispatch the appropriate handle_* method on the
- // ACE_Event_Handler pointer we've been passed.
-
- if (buffer->eh_ != 0)
- {
- int result = 0;
-
- switch (buffer->mask_)
- {
- case ACE_Event_Handler::READ_MASK:
- result = buffer->eh_->handle_input (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::WRITE_MASK:
- result = buffer->eh_->handle_output (ACE_INVALID_HANDLE);
- break;
- case ACE_Event_Handler::EXCEPT_MASK:
- result = buffer->eh_->handle_exception (ACE_INVALID_HANDLE);
- break;
- default:
- ACE_ERROR ((LM_ERROR, "invalid mask = %d\n", buffer->mask_));
- break;
- }
- if (result == -1)
- buffer->eh_->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::EXCEPT_MASK);
- }
-
- // Make sure to delete the memory regardless of success or
- // failure!
- mb->release ();
-
- // Bail out if we've reached the <notify_threshold_>. Note
- // that by default <notify_threshold_> is -1, so we'll loop
- // until we're done.
- if (i == this->max_notify_iterations_)
- break;
- }
- }
-}
-
-// Notify the ReactorEx, potentially enqueueing the
-// <ACE_Event_Handler> for subsequent processing in the ReactorEx
-// thread of control.
-
-int
-ACE_ReactorEx_Notify::notify (ACE_Event_Handler *eh,
- ACE_Reactor_Mask mask,
- ACE_Time_Value *timeout)
-{
- if (eh != 0)
- {
- ACE_Message_Block *mb = 0;
- ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof ACE_Notification_Buffer), -1);
-
- ACE_Notification_Buffer *buffer =
- (ACE_Notification_Buffer *) mb->base ();
- buffer->eh_ = eh;
- buffer->mask_ = mask;
-
- // Convert from relative time to absolute time by adding the
- // current time of day. This is what <ACE_Message_Queue>
- // expects.
- if (timeout != 0)
- *timeout += timer_queue_->gettimeofday ();
-
- if (this->message_queue_.enqueue_tail
- (mb, timeout) == -1)
- {
- mb->release ();
- return -1;
- }
- }
-
- return this->wakeup_one_thread_.signal ();
-}
-
-void
-ACE_ReactorEx_Notify::max_notify_iterations (int iterations)
-{
- ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations");
- // Must always be > 0 or < 0 to optimize the loop exit condition.
- if (iterations == 0)
- iterations = 1;
-
- this->max_notify_iterations_ = iterations;
-}
-
-int
-ACE_ReactorEx_Notify::max_notify_iterations (void)
-{
- ACE_TRACE ("ACE_ReactorEx_Notify::max_notify_iterations");
- return this->max_notify_iterations_;
-}
-
-void
-ACE_ReactorEx::max_notify_iterations (int iterations)
-{
- ACE_TRACE ("ACE_ReactorEx::max_notify_iterations");
- ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
-
- // Must always be > 0 or < 0 to optimize the loop exit condition.
- this->notify_handler_.max_notify_iterations (iterations);
-}
-
-int
-ACE_ReactorEx::max_notify_iterations (void)
-{
- ACE_TRACE ("ACE_ReactorEx::max_notify_iterations");
- ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
-
- return this->notify_handler_.max_notify_iterations ();
-}
-
-#endif /* ACE_WIN32 */
-
-
-
-
-
-