diff options
author | Steve Huston <shuston@riverace.com> | 2000-10-24 23:10:35 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2000-10-24 23:10:35 +0000 |
commit | 0e06f85c67d45eef5d6611afc29ffd9cd3e596c0 (patch) | |
tree | 7576282a8397cba7a0c92c3115eddbac3542cdeb /ace | |
parent | c60b8374d370d76e1eed1d40dee9f44d658148c6 (diff) | |
download | ATCD-0e06f85c67d45eef5d6611afc29ffd9cd3e596c0.tar.gz |
ChangeLogTag:Tue Oct 24 12:30:47 2000 Steve Huston <shuston@riverace.com>
Diffstat (limited to 'ace')
-rw-r--r-- | ace/Event_Handler.cpp | 2 | ||||
-rw-r--r-- | ace/Reactor.h | 5 | ||||
-rw-r--r-- | ace/Reactor.i | 10 | ||||
-rw-r--r-- | ace/Reactor_Impl.h | 10 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.cpp | 81 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.h | 10 | ||||
-rw-r--r-- | ace/Select_Reactor_Base.i | 6 | ||||
-rw-r--r-- | ace/WFMO_Reactor.cpp | 81 | ||||
-rw-r--r-- | ace/WFMO_Reactor.h | 10 |
9 files changed, 215 insertions, 0 deletions
diff --git a/ace/Event_Handler.cpp b/ace/Event_Handler.cpp index 2d705fbc73b..ebbd72c43aa 100644 --- a/ace/Event_Handler.cpp +++ b/ace/Event_Handler.cpp @@ -26,6 +26,8 @@ ACE_Event_Handler::ACE_Event_Handler (ACE_Reactor *r, ACE_Event_Handler::~ACE_Event_Handler (void) { // ACE_TRACE ("ACE_Event_Handler::~ACE_Event_Handler"); + if (this->reactor_ != 0) + this->reactor_->purge_pending_notifications (this); } // Gets the file descriptor associated with this I/O device. diff --git a/ace/Reactor.h b/ace/Reactor.h index 1bde5178fc4..0462732b0e3 100644 --- a/ace/Reactor.h +++ b/ace/Reactor.h @@ -442,6 +442,11 @@ public: // via the notify queue before breaking out of its // <ACE_Message_Queue::dequeue> loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0); + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + // = Assorted helper methods. virtual int handler (ACE_HANDLE handle, diff --git a/ace/Reactor.i b/ace/Reactor.i index 12a306711d0..eb392a78df2 100644 --- a/ace/Reactor.i +++ b/ace/Reactor.i @@ -502,6 +502,10 @@ ACE_Reactor::notify (ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask, ACE_Time_Value *tv) { + // First, try to remember this reactor in the event handler, in case + // the event handler goes away before the notification is delivered. + if (event_handler != 0 && event_handler->reactor () == 0) + event_handler->reactor (this); return this->implementation ()->notify (event_handler, mask, tv); @@ -520,6 +524,12 @@ ACE_Reactor::max_notify_iterations (void) } ACE_INLINE int +ACE_Reactor::purge_pending_notifications (ACE_Event_Handler *eh) +{ + return this->implementation ()->purge_pending_notifications (eh); +} + +ACE_INLINE int ACE_Reactor::handler (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Event_Handler **event_handler) diff --git a/ace/Reactor_Impl.h b/ace/Reactor_Impl.h index 9a651a099c2..e6949630fad 100644 --- a/ace/Reactor_Impl.h +++ b/ace/Reactor_Impl.h @@ -82,6 +82,11 @@ public: // passed in via the notify queue before breaking out of its event // loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0) = 0; + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + virtual void dump (void) const = 0; // Dump the state of an object. }; @@ -389,6 +394,11 @@ public: // via the notify queue before breaking out of its // <ACE_Message_Queue::dequeue> loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0) = 0; + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + virtual int handler (ACE_HANDLE handle, ACE_Reactor_Mask mask, ACE_Event_Handler **event_handler = 0) = 0; diff --git a/ace/Select_Reactor_Base.cpp b/ace/Select_Reactor_Base.cpp index a6f441d4bed..6169ae6f931 100644 --- a/ace/Select_Reactor_Base.cpp +++ b/ace/Select_Reactor_Base.cpp @@ -472,6 +472,87 @@ ACE_Select_Reactor_Notify::max_notify_iterations (void) return this->max_notify_iterations_; } +// purge_pending_notifications +// Removes all entries from the notify_queue_ and each one that +// matches <eh> is put on the free_queue_. The rest are saved on a +// local queue and copied back to the notify_queue_ at the end. +// Returns the number of entries removed. Returns -1 on error. +// ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined. +int +ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh) +{ + ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications"); + +#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, mon, this->notify_queue_lock_, -1); + + if (this->notify_queue_.is_empty ()) + return 0; + + ACE_Notification_Buffer *temp; + ACE_Unbounded_Queue <ACE_Notification_Buffer *> local_queue; + + size_t queue_size = this->notify_queue_.size (); + int number_purged = 0; + for (size_t index = 0; index < queue_size; ++index) + { + if (-1 == this->notify_queue_.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + -1); + + // check + if (eh && (eh != temp->eh_)) + { + if (-1 == local_queue.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_head")), + -1); + } + else + { // deallocate the space... + if (-1 == this->free_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_head")), + -1); + ++number_purged; + } + } + + if (this->notify_queue_.size ()) + { // should be empty! + ACE_ASSERT (0); + return -1; + } + + // now put it back in the notify queue + queue_size = local_queue.size (); + for (index = 0; index < queue_size; ++index) + { + if (-1 == local_queue.dequeue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("dequeue_head")), + -1); + + if (-1 == this->notify_queue_.enqueue_head (temp)) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("enqueue_head")), + -1); + } + + return number_purged; + +#else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ + ACE_NOTSUP_RETURN (-1); +#endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */ +} + void ACE_Select_Reactor_Notify::dump (void) const { diff --git a/ace/Select_Reactor_Base.h b/ace/Select_Reactor_Base.h index f0e4b596ae6..0948ecce90e 100644 --- a/ace/Select_Reactor_Base.h +++ b/ace/Select_Reactor_Base.h @@ -155,6 +155,11 @@ public: // dispatch the <ACE_Event_Handlers> that are passed in via the // notify pipe before breaking out of its <recv> loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0); + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + virtual void dump (void) const; // Dump the state of an object. @@ -352,6 +357,11 @@ public: friend class ACE_Select_Reactor_Notify; friend class ACE_Select_Reactor_Handler_Repository; + virtual int purge_pending_notifications (ACE_Event_Handler * = 0); + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + protected: virtual int bit_ops (ACE_HANDLE handle, ACE_Reactor_Mask mask, diff --git a/ace/Select_Reactor_Base.i b/ace/Select_Reactor_Base.i index 9cc272afe1f..0f2e26a04c4 100644 --- a/ace/Select_Reactor_Base.i +++ b/ace/Select_Reactor_Base.i @@ -71,6 +71,12 @@ ACE_Select_Reactor_Impl::ACE_Select_Reactor_Impl () } ACE_INLINE int +ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh) +{ + return this->notify_handler_->purge_pending_notifications (eh); +} + +ACE_INLINE int ACE_Select_Reactor_Impl::supress_notify_renew (void) { return this->supress_renew_; diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp index 6c818e12239..43fffdcb476 100644 --- a/ace/WFMO_Reactor.cpp +++ b/ace/WFMO_Reactor.cpp @@ -2299,6 +2299,80 @@ ACE_WFMO_Reactor_Notify::max_notify_iterations (void) return this->max_notify_iterations_; } +int +ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh) +{ + ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications"); + + // go over message queue and take out all the matching event handlers + // if eh = 0, purge all.. + + if (this->message_queue_.is_empty ()) + return 0; + + // Guard against new and/or delivered notifications while purging. + // WARNING!!! The use of the notification queue's lock object for this + // guard makes use of the knowledge that on Win32, the mutex protecting + // the queue is really a CriticalSection, which is recursive. This is + // how we can get away with locking it down here and still calling + // member functions on the queue object. + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1); + + // first, copy all to our own local queue. Since we've locked everyone out + // of here, there's no need to use any synchronization on this queue. + ACE_Message_Queue<ACE_NULL_SYNCH> local_queue; + + size_t queue_size = this->message_queue_.message_count (); + int number_purged = 0; + for (size_t index = 0; index < queue_size; ++index) + { + ACE_Message_Block *mb; + if (-1 == this->message_queue_.dequeue_head (mb)) + return -1; // This shouldn't happen... + + ACE_Notification_Buffer *buffer = + ACE_reinterpret_cast (ACE_Notification_Buffer *, mb->base ()); + + if (eh && (eh != buffer->eh_)) + { // remove it by not copying it to the new queue + if (-1 == local_queue.enqueue_head (mb)) + return -1; + } + else + { + mb->release (); + ++number_purged; + } + } + + if (this->message_queue_.message_count ()) + { // Should be empty! + ACE_ASSERT (0); + return -1; + } + + // Now copy back from the local queue to the class queue, taking care to + // preserve the original order... + queue_size = local_queue.message_count (); + for (index = 0; index < queue_size; ++index) + { + ACE_Message_Block *mb; + if (-1 == local_queue.dequeue_head (mb)) + { + ACE_ASSERT (0); + return -1; + } + + if (-1 == this->message_queue_.enqueue_head (mb)) + { + ACE_ASSERT (0); + return -1; + } + } + + return number_purged; +} + void ACE_WFMO_Reactor_Notify::dump (void) const { @@ -2330,6 +2404,13 @@ ACE_WFMO_Reactor::max_notify_iterations (void) return this->notify_handler_->max_notify_iterations (); } +int +ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh) +{ + ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications"); + return this->notify_handler_->purge_pending_notifications (eh); +} + // No-op WinSOCK2 methods to help WFMO_Reactor compile #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0) int diff --git a/ace/WFMO_Reactor.h b/ace/WFMO_Reactor.h index b22016fec60..5980711f42d 100644 --- a/ace/WFMO_Reactor.h +++ b/ace/WFMO_Reactor.h @@ -502,6 +502,11 @@ public: // notify queue before breaking out of its // <ACE_Message_Queue::dequeue> loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0); + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + virtual void dump (void) const; // Dump the state of an object. @@ -915,6 +920,11 @@ public: // notify queue before breaking out of its // <ACE_Message_Queue::dequeue> loop. + virtual int purge_pending_notifications (ACE_Event_Handler * = 0); + // Purge any notifications pending in this reactor for the specified + // <ACE_Event_Handler> object. Returns the number of notifications + // purged. Returns -1 on error. + // = Assorted helper methods. virtual int handler (ACE_HANDLE handle, |