diff options
author | Steve Huston <shuston@riverace.com> | 2002-06-06 22:21:56 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2002-06-06 22:21:56 +0000 |
commit | 021fd35768d4e53ea8fadd5eb44fad52335a4cd5 (patch) | |
tree | 48416e4198fe853708ab30565b2f49887a368ee8 | |
parent | 49dccf135fbb933fde13413f413a9f227ea9db20 (diff) | |
download | ATCD-021fd35768d4e53ea8fadd5eb44fad52335a4cd5.tar.gz |
Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com>
-rw-r--r-- | ChangeLog | 51 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 51 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 51 | ||||
-rw-r--r-- | ace/Activation_Queue.cpp | 17 | ||||
-rw-r--r-- | ace/Log_Msg.cpp | 21 | ||||
-rw-r--r-- | ace/Message_Block.cpp | 1 | ||||
-rw-r--r-- | ace/Message_Queue.h | 63 | ||||
-rw-r--r-- | ace/Message_Queue_T.cpp | 84 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 160 | ||||
-rw-r--r-- | ace/Message_Queue_T.i | 37 | ||||
-rw-r--r-- | ace/README | 5 | ||||
-rw-r--r-- | ace/Read_Buffer.cpp | 2 | ||||
-rw-r--r-- | ace/WFMO_Reactor.cpp | 14 | ||||
-rw-r--r-- | ace/WFMO_Reactor.h | 8 | ||||
-rw-r--r-- | examples/C++NPv2/AC_Client_Logging_Daemon.cpp | 12 |
15 files changed, 366 insertions, 211 deletions
diff --git a/ChangeLog b/ChangeLog index fdfb5f5b71e..50996aeb8b7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,50 @@ +Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com> + + * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs + use it, and it's functionality doesn't really support the + message queue deactivation/pulse semantics. + + * ace/Message_Queue.h: + * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument + from deactivate() and add a pulse() method. States stay the + same. + Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's + not used any longer. + + * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse() + method instead of deactivate (1). + + * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered + questions. Removed reimplemented handle_connect() method + because the needed things are now available via the framework. + + * ace/Asynch_Acceptor.{h cpp}: + * ace/Asynch_Connector.{h cpp}: Added new hook method, + int validate_connection (const ACE_Asynch_Accept::Result&result, + const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr) + that allows access to success/fail, handle, and addresses. The + validate_new_connection (const ACE_INET_Addr&) method is now + deprecated. + + * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an + optional argument to ACE_WFMO_Reactor_Notify constructor. + size_t max_notifies is used to specify a limit for how many + notifications can be queued. The value is used to calculate + new high and low watermarks to the message_queue_. Default 1024. + + * ace/Activation_Queue.cpp: + * ace/Message_Block.cpp: + * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get + the ACE_DEBUG, etc. definitions. + + * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and + ace/Malloc_Base.h (for ACE_Allocator). + + * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a + ACE_Log_Msg_Callback, do it before sending the log record + to any other logging sinks. Allows the callback to munge + the data. + Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu> * ace/Dev_Poll_Reactor.cpp (cancel_timer): @@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> * ace/Malloc_T.cpp: Clarify that the memory backing store must reside in a directory with the appropriate visibility and - permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov> - for reporting this. + permissions. Thanks to Frank O. Flemisch + <f.o.flemisch@larc.nasa.gov> for reporting this. Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index fdfb5f5b71e..50996aeb8b7 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,50 @@ +Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com> + + * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs + use it, and it's functionality doesn't really support the + message queue deactivation/pulse semantics. + + * ace/Message_Queue.h: + * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument + from deactivate() and add a pulse() method. States stay the + same. + Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's + not used any longer. + + * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse() + method instead of deactivate (1). + + * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered + questions. Removed reimplemented handle_connect() method + because the needed things are now available via the framework. + + * ace/Asynch_Acceptor.{h cpp}: + * ace/Asynch_Connector.{h cpp}: Added new hook method, + int validate_connection (const ACE_Asynch_Accept::Result&result, + const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr) + that allows access to success/fail, handle, and addresses. The + validate_new_connection (const ACE_INET_Addr&) method is now + deprecated. + + * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an + optional argument to ACE_WFMO_Reactor_Notify constructor. + size_t max_notifies is used to specify a limit for how many + notifications can be queued. The value is used to calculate + new high and low watermarks to the message_queue_. Default 1024. + + * ace/Activation_Queue.cpp: + * ace/Message_Block.cpp: + * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get + the ACE_DEBUG, etc. definitions. + + * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and + ace/Malloc_Base.h (for ACE_Allocator). + + * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a + ACE_Log_Msg_Callback, do it before sending the log record + to any other logging sinks. Allows the callback to munge + the data. + Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu> * ace/Dev_Poll_Reactor.cpp (cancel_timer): @@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> * ace/Malloc_T.cpp: Clarify that the memory backing store must reside in a directory with the appropriate visibility and - permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov> - for reporting this. + permissions. Thanks to Frank O. Flemisch + <f.o.flemisch@larc.nasa.gov> for reporting this. Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index fdfb5f5b71e..50996aeb8b7 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,50 @@ +Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com> + + * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs + use it, and it's functionality doesn't really support the + message queue deactivation/pulse semantics. + + * ace/Message_Queue.h: + * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument + from deactivate() and add a pulse() method. States stay the + same. + Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's + not used any longer. + + * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse() + method instead of deactivate (1). + + * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered + questions. Removed reimplemented handle_connect() method + because the needed things are now available via the framework. + + * ace/Asynch_Acceptor.{h cpp}: + * ace/Asynch_Connector.{h cpp}: Added new hook method, + int validate_connection (const ACE_Asynch_Accept::Result&result, + const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr) + that allows access to success/fail, handle, and addresses. The + validate_new_connection (const ACE_INET_Addr&) method is now + deprecated. + + * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an + optional argument to ACE_WFMO_Reactor_Notify constructor. + size_t max_notifies is used to specify a limit for how many + notifications can be queued. The value is used to calculate + new high and low watermarks to the message_queue_. Default 1024. + + * ace/Activation_Queue.cpp: + * ace/Message_Block.cpp: + * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get + the ACE_DEBUG, etc. definitions. + + * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and + ace/Malloc_Base.h (for ACE_Allocator). + + * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a + ACE_Log_Msg_Callback, do it before sending the log record + to any other logging sinks. Allows the callback to munge + the data. + Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu> * ace/Dev_Poll_Reactor.cpp (cancel_timer): @@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu> * ace/Malloc_T.cpp: Clarify that the memory backing store must reside in a directory with the appropriate visibility and - permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov> - for reporting this. + permissions. Thanks to Frank O. Flemisch + <f.o.flemisch@larc.nasa.gov> for reporting this. Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ace/Activation_Queue.cpp b/ace/Activation_Queue.cpp index b1a523082a8..9a271c4d7ae 100644 --- a/ace/Activation_Queue.cpp +++ b/ace/Activation_Queue.cpp @@ -4,6 +4,7 @@ #include "ace/Activation_Queue.i" #endif /* __ACE_INLINE__ */ +#include "ace/Log_Msg.h" #include "ace/Malloc_Base.h" ACE_RCSID (ace, @@ -11,13 +12,13 @@ ACE_RCSID (ace, "$Id$") -void +void ACE_Activation_Queue::dump (void) const { ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("delete_queue_ = %d\n"), - this->delete_queue_)); + ACE_LIB_TEXT ("delete_queue_ = %d\n"), + this->delete_queue_)); ACE_DEBUG ((LM_INFO, ACE_LIB_TEXT ("queue_: \n"))); if (this->queue_) this->queue_->dump(); @@ -28,7 +29,7 @@ ACE_Activation_Queue::dump (void) const ACE_Activation_Queue::ACE_Activation_Queue (ACE_Message_Queue<ACE_SYNCH> *new_queue, ACE_Allocator *alloc, - ACE_Allocator *db_alloc) + ACE_Allocator *db_alloc) : delete_queue_ (0) , allocator_(alloc) , data_block_allocator_(db_alloc) @@ -46,7 +47,7 @@ ACE_Activation_Queue::ACE_Activation_Queue (ACE_Message_Queue<ACE_SYNCH> *new_qu } } -ACE_Activation_Queue::~ACE_Activation_Queue (void) +ACE_Activation_Queue::~ACE_Activation_Queue (void) { if (this->delete_queue_ != 0) delete this->queue_; @@ -72,9 +73,9 @@ ACE_Activation_Queue::dequeue (ACE_Time_Value *tv) return 0; } -int +int ACE_Activation_Queue::enqueue (ACE_Method_Request *mr, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_Message_Block *mb; @@ -106,5 +107,3 @@ ACE_Activation_Queue::enqueue (ACE_Method_Request *mr, return result; } - - diff --git a/ace/Log_Msg.cpp b/ace/Log_Msg.cpp index 5ad0c3abca3..bd42f091471 100644 --- a/ace/Log_Msg.cpp +++ b/ace/Log_Msg.cpp @@ -1704,6 +1704,14 @@ ACE_Log_Msg::log (ACE_Log_Record &log_record, ACE_Log_Msg_Sig_Guard sb; #endif /* !ACE_WIN32 && !ACE_PSOS */ + // Do the callback, if needed, before acquiring the lock + // to avoid holding the lock during the callback so we don't + // have deadlock if the callback uses the logger. + if (ACE_BIT_ENABLED (ACE_Log_Msg::flags_, + ACE_Log_Msg::MSG_CALLBACK) + && this->msg_callback () != 0) + this->msg_callback ()->log (log_record); + // Make sure that the lock is held during all this. ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, *ACE_Log_Msg_Manager::get_lock (), @@ -1754,19 +1762,6 @@ ACE_Log_Msg::log (ACE_Log_Record &log_record, #endif /* ! ACE_LACKS_IOSTREAM_TOTALLY */ ); - if (ACE_BIT_ENABLED (ACE_Log_Msg::flags_, - ACE_Log_Msg::MSG_CALLBACK) - && this->msg_callback () != 0) - { - // Use a "reverse lock" to avoid holding the lock during the - // callback so we don't have deadlock if the callback uses - // the logger. - ACE_MT (ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> reverse_lock - (*ACE_Log_Msg_Manager::get_lock ())); - ACE_MT (ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>, - ace_mon_1, reverse_lock, -1)); - this->msg_callback ()->log (log_record); - } if (tracing) this->start_tracing (); } diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp index 290c1a57256..f03575700df 100644 --- a/ace/Message_Block.cpp +++ b/ace/Message_Block.cpp @@ -1,4 +1,5 @@ #include "ace/Message_Block.h" +#include "ace/Log_Msg.h" #include "ace/Malloc_Base.h" #include "ace/Synch_T.h" diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h index d864482f256..bcf4248abcb 100644 --- a/ace/Message_Queue.h +++ b/ace/Message_Queue.h @@ -47,29 +47,33 @@ template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator; class ACE_Export ACE_Message_Queue_Base { public: - // = Default high and low water marks. enum { + // Default high and low watermarks. + /// Default high watermark (16 K). DEFAULT_HWM = 16 * 1024, /// Default low watermark (same as high water mark). DEFAULT_LWM = 16 * 1024, - /// Message queue was activated before <activate> or <deactivate>. + // Queue states. Before PULSED state was added, the activate() + // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE + // to indicate the previous condition. Now those methods + // return the state the queue was previously in. WAS_ACTIVE + // and WAS_INACTIVE are defined to match previous semantics for + // applications that don't use the PULSED state. + WAS_ACTIVE = 1, /* DEPRECATED */ - WAS_ACTIVATED = 1, + /// Message queue is active and processing normally + ACTIVATED = 1, - /// Message queue was deactivated before <activate> or <deactivate>. WAS_INACTIVE = 2, /* DEPRECATED */ - WAS_DEACTIVATED = 2, + /// Queue is deactivated; no enqueue or dequeue operations allowed. + DEACTIVATED = 2, - /// Message queue was pulsed before <activate> or <deactivate>. - WAS_PULSED = 3, + /// Message queue was pulsed; enqueue and dequeue may proceed normally. + PULSED = 3 - /// The following are the states that a queue can be in. - ACTIVATED = 0, - DEACTIVATED = 1, - PULSED = 2 }; ACE_Message_Queue_Base (void); @@ -153,27 +157,38 @@ public: // = Activation control methods. /** - * Notify all waiting threads so they can wakeup and continue other - * processing. If <pulse> is 0 the queue's state is changed to - * deactivated and other operations called until the queue is - * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is - * non-0 then only the waiting threads are notified and the queue's state - * is not changed. In either case, however, no messages are removed - * from the queue. Return the state of the queue before the call. */ - virtual int deactivate (int pulse = 0) = 0; + * Deactivate the queue and wake up all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1 with @c errno + * ESHUTDOWN. + * + * @retval The queue's state before this call. + */ + virtual int deactivate (void) = 0; /** * Reactivate the queue so that threads can enqueue and dequeue - * messages again. Returns the state of the queue before the call. + * messages again. + * + * @retval The queue's state before this call. */ virtual int activate (void) = 0; - /// Returns the current state of the queue, which can either - /// be <ACTIVATED>, <DEACTIVATED>, or <PULSED>. + /** + * Pulse the queue to wake up any waiting threads. Changes the + * queue state to PULSED; future enqueue/dequeue operations proceed + * as in ACTIVATED state. + * + * @retval The queue's state before this call. + */ + virtual int pulse (void) = 0; + + /// Returns the current state of the queue. virtual int state (void); - /// Returns true if the state of the queue is <DEACTIVATED>, - /// but false if the queue's is <ACTIVATED> or <PULSED>. + /// Returns 1 if the state of the queue is DEACTIVATED, + /// and 0 if the queue's state is ACTIVATED or PULSED. virtual int deactivated (void) = 0; /// Get the notification strategy for the <Message_Queue> diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index 7c6399a6374..e480a2c36a2 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -6,6 +6,7 @@ // #include Message_Queue.h instead of Message_Queue_T.h to avoid // circular include problems. #include "ace/Message_Queue.h" +#include "ace/Log_Msg.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -428,18 +429,18 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - switch (this->state_) + switch (this->state_) { case ACE_Message_Queue_Base::ACTIVATED: - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("state = ACTIVATED\n"))); break; case ACE_Message_Queue_Base::DEACTIVATED: - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("state = DEACTIVATED\n"))); break; case ACE_Message_Queue_Base::PULSED: - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("state = PULSED\n"))); break; } @@ -487,15 +488,8 @@ template <ACE_SYNCH_DECL> ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - : not_empty_cond_ (0), - not_full_cond_ (0), - enqueue_waiters_ (0), - dequeue_waiters_ (0) -#else : not_empty_cond_ (this->lock_), not_full_cond_ (this->lock_) -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue"); @@ -578,10 +572,8 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse) if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) { // Wakeup all waiters. -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) this->not_empty_cond_.broadcast (); this->not_full_cond_.broadcast (); -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ if (pulse) this->state_ = ACE_Message_Queue_Base::PULSED; @@ -629,33 +621,17 @@ ACE_Message_Queue<ACE_SYNCH_USE>::close (void) template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void) { -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) if (this->not_full_cond_.signal () != 0) return -1; -#else - if (this->enqueue_waiters_ > 0) - { - --this->enqueue_waiters_; - return this->not_full_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void) { -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) // Tell any blocked threads that the queue has a new item! if (this->not_empty_cond_.signal () != 0) return -1; -#else - if (this->dequeue_waiters_ > 0) - { - --this->dequeue_waiters_; - return this->not_empty_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } @@ -1126,30 +1102,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i } template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *timeout) +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond + (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout) { int result = 0; -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - while (this->is_full_i () && result != -1) - { - ++this->enqueue_waiters_; - // @@ Need to add sanity checks for failure... - mon.release (); - result = this->not_full_cond_.acquire (timeout); - - if (result == -1 && errno == ETIME) - { - --this->enqueue_waiters_; - errno = EWOULDBLOCK; - } - - // Save/restore errno. - ACE_Errno_Guard error (errno); - mon.acquire (); - } -#else - ACE_UNUSED_ARG (mon); // Wait while the queue is full. @@ -1169,35 +1125,14 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ break; } } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return result; } template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *timeout) +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond + (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout) { int result = 0; -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - while (this->is_empty_i () && result != -1) - { - ++this->dequeue_waiters_; - // @@ Need to add sanity checks for failure... - mon.release (); - result = this->not_empty_cond_.acquire (timeout); - - if (result == -1 && errno == ETIME) - { - --this->dequeue_waiters_; - errno = EWOULDBLOCK; - } - - // Save/restore errno. - ACE_Errno_Guard error (errno); - mon.acquire (); - } -#else - ACE_UNUSED_ARG (mon); // Wait while the queue is empty. @@ -1217,7 +1152,6 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX break; } } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return result; } diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index e023abcbd43..63c9831563a 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -100,14 +100,23 @@ public: /// Release all resources from the message queue and mark it as deactivated. virtual ~ACE_Message_Queue (void); - /// Release all resources from the message queue but do not mark it as deactivated. - /// This method holds the queue lock during this operation. Returns the number of - /// messages flushed. + /// Release all resources from the message queue but do not mark it + /// as deactivated. + /** + * This method holds the queue lock during this operation. + * + * @retval The number of messages flushed. + */ virtual int flush (void); - /// Release all resources from the message queue but do not mark it as deactivated. - /// This method does not hold the queue lock during this operation, i.e., it assume - /// the lock is held externally. Returns the number of messages flushed. + /// Release all resources from the message queue but do not mark it + /// as deactivated. + /** + * The caller must be holding the queue lock before calling this + * method. + * + * @retval The number of messages flushed. + */ virtual int flush_i (void); // = Enqueue and dequeue methods. @@ -120,27 +129,41 @@ public: // elapses, (in which case errno = EWOULDBLOCK). /** - * Retrieve the first <ACE_Message_Block> without removing it. Note - * that <timeout> uses <{absolute}> time rather than <{relative}> - * time. If the <timeout> elapses without receiving a message -1 is - * returned and <errno> is set to <EWOULDBLOCK>. If the queue is - * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. - * Otherwise, returns -1 on failure, else the number of items still - * on the queue. + * Retrieve a poiner to the first ACE_Message_Block in the queue + * without removing it. + * + * @arg first_item Reference to an ACE_Message_Block * that will + * point to the first block on the queue. The block + * remains on the queue until this or another thread + * dequeues it. + * @arg timeout The absolute time the caller will wait until + * for a block to be queued. + * + * @retval The number of ACE_Message_Blocks on the queue. + * @return -1 on failure. errno holds the reason. If EWOULDBLOCK, + * the timeout elapsed. If ESHUTDOWN, the queue was + * deactivated or pulsed. */ virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); /** - * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in - * accordance with its <msg_priority> (0 is lowest priority). FIFO + * Enqueue an ACE_Message_Block into the queue in accordance with + * the ACE_Message_Block's priority (0 is lowest priority). FIFO * order is maintained when messages of the same priority are - * inserted consecutively. Note that <timeout> uses <{absolute}> - * time rather than <{relative}> time. If the <timeout> elapses - * without receiving a message -1 is returned and <errno> is set to - * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and - * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, - * else the number of items still on the queue. + * inserted consecutively. + * + * @arg new_item Pointer to an ACE_Message_Block that will be + * added to the queue. The block's @c msg_priority() + * method will be called to obtain the queueing priority. + * @arg timeout The absolute time the caller will wait until + * for the block to be queued. + * + * @retval The number of ACE_Message_Blocks on the queue after adding + * the specified block. + * @return -1 on failure. errno holds the reason. If EWOULDBLOCK, + * the timeout elapsed. If ESHUTDOWN, the queue was + * deactivated or pulsed. */ virtual int enqueue_prio (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); @@ -305,14 +328,14 @@ public: // = Activation control methods. /** - * Notify all waiting threads so they can wakeup and continue other - * processing. If <pulse> is 0 the queue's state is changed to - * deactivated and other operations called until the queue is - * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is - * non-0 then only the waiting threads are notified and the queue's state - * is not changed. In either case, however, no messages are removed - * from the queue. Returns the state of the queue before the call. */ - virtual int deactivate (int pulse = 0); + * Deactivate the queue and wakeup all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1 with <errno> == + * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + * call and WAS_ACTIVE if queue was active before the call. + */ + virtual int deactivate (void); /** * Reactivate the queue so that threads can enqueue and dequeue @@ -320,6 +343,19 @@ public: */ virtual int activate (void); + /** + * Pulse the queue to wake up any waiting threads. Changes the + * queue state to PULSED; future enqueue/dequeue operations proceed + * as in ACTIVATED state. + * + * @retval The queue's state before this call. + */ + virtual int pulse (void); + + /// Returns the current state of the queue, which can be one of + /// ACTIVATED, DEACTIVATED, or PULSED. + virtual int state (void); + /// Returns true if the state of the queue is <DEACTIVATED>, /// but false if the queue's is <ACTIVATED> or <PULSED>. virtual int deactivated (void); @@ -405,13 +441,18 @@ protected: /** * Notifies all waiting threads that the queue has been deactivated - * so they can wakeup and continue other processing. If <pulse> is - * 0 then the queue's state is changed to deactivated and any other - * operations called until the queue is activated again will - * immediately return -1 with <errno> == ESHUTDOWN. If <pulse> is - * non-0 then only the waiting threads are notified and the queue's state - * is not changed. In either case, however, no messages are removed - * from the queue. Returns the state of the queue before the call. */ + * so they can wakeup and continue other processing. + * No messages are removed from the queue. + * + * @arg pulse If 0, the queue's state is changed to DEACTIVATED + * and any other operations called until the queue is + * reactivated will immediately return -1 with + * errno == ESHUTDOWN. + * If not zero, only the waiting threads are notified and + * the queue's state changes to PULSED. + * + * @retval The state of the queue before the call. + */ virtual int deactivate_i (int pulse = 0); /// Activate the queue. @@ -461,25 +502,11 @@ protected: /// Protect queue from concurrent access. ACE_SYNCH_MUTEX_T lock_; -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - /// Used to make threads sleep until the queue is no longer empty. - ACE_SYNCH_SEMAPHORE_T not_empty_cond_; - - /// Used to make threads sleep until the queue is no longer full. - ACE_SYNCH_SEMAPHORE_T not_full_cond_; - - /// Number of threads waiting to dequeue a <Message_Block>. - size_t dequeue_waiters_; - - /// Number of threads waiting to enqueue a <Message_Block>. - size_t enqueue_waiters_; -#else /// Used to make threads sleep until the queue is no longer empty. ACE_SYNCH_CONDITION_T not_empty_cond_; /// Used to make threads sleep until the queue is no longer full. ACE_SYNCH_CONDITION_T not_full_cond_; -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ private: @@ -1120,14 +1147,14 @@ public: // = Activation control methods. /** - * Notify all waiting threads so they can wakeup and continue other - * processing. If <pulse> is 0 the queue's state is changed to - * deactivated and other operations called until the queue is - * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is - * non-0 then only the waiting threads are notified and the queue's state - * is not changed. In either case, however, no messages are removed - * from the queue. Returns the state of the queue before the call. */ - virtual int deactivate (int pulse = 0); + * Deactivate the queue and wakeup all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1 with <errno> == + * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + * call and WAS_ACTIVE if queue was active before the call. + */ + virtual int deactivate (void); /** * Reactivate the queue so that threads can enqueue and dequeue @@ -1135,12 +1162,21 @@ public: */ virtual int activate (void); - /// Returns the current state of the queue, which can either - /// be <ACTIVATED>, <DEACTIVATED>, or <PULSED>. + /** + * Pulse the queue to wake up any waiting threads. Changes the + * queue state to PULSED; future enqueue/dequeue operations proceed + * as in ACTIVATED state. + * + * @retval The queue's state before this call. + */ + virtual int pulse (void); + + /// Returns the current state of the queue, which can be one of + /// ACTIVATED, DEACTIVATED, or PULSED. virtual int state (void); - /// Returns true if the state of the queue is <DEACTIVATED>, - /// but false if the queue's is <ACTIVATED> or <PULSED>. + /// Returns true if the state of the queue is DEACTIVATED, + /// but false if the queue's state is ACTIVATED or PULSED. virtual int deactivated (void); // = Notification hook. diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i index b9d89ec2805..d04fd9041a3 100644 --- a/ace/Message_Queue_T.i +++ b/ace/Message_Queue_T.i @@ -129,6 +129,15 @@ ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void) } template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivate () +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->deactivate_i (0); // Not a pulse +} + +template <ACE_SYNCH_DECL> ACE_INLINE int ACE_Message_Queue<ACE_SYNCH_USE>::activate (void) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate"); @@ -138,12 +147,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::activate (void) } template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (int pulse) +ACE_Message_Queue<ACE_SYNCH_USE>::pulse () { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate"); + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - return this->deactivate_i (pulse); + return this->deactivate_i (1); // Just a pulse } template <ACE_SYNCH_DECL> ACE_INLINE int @@ -154,6 +163,14 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void) return this->state_ == ACE_Message_Queue_Base::DEACTIVATED; } +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::state (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state"); + + return this->state_; +} + #if 0 // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the // header file (j.russell.noseworthy@objectsciences.com) @@ -269,6 +286,14 @@ ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void) } template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate"); + + return this->queue_.deactivate (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void) { ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate"); @@ -277,11 +302,11 @@ ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void) } template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (int pulse) +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void) { - ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate"); + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse"); - return this->queue_.deactivate (pulse); + return this->queue_.pulse (); } template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int diff --git a/ace/README b/ace/README index 325768e255a..c885ecd5f0a 100644 --- a/ace/README +++ b/ace/README @@ -403,11 +403,6 @@ ACE_HAS_ONLY_SCHED_OTHER Platform, e.g., Solaris 2.5, ACE_HAS_2_PARAM_ASCTIME_R_AND_CTIME_R Uses ctime_r & asctime_r with only two parameters vs. three. -ACE_HAS_OPTIMIZED_MESSAGE_QUEUE Use the semaphore - implementation of - ACE_Message_Queue rather than - the emulated condition - variable (NT and VxWorks). ACE_HAS_OSF_TIMOD_H Platform supports the OSF TLI timod STREAMS module ACE_HAS_3_PARAM_WCSTOK Platform has 3-parameter version diff --git a/ace/Read_Buffer.cpp b/ace/Read_Buffer.cpp index e78315e0990..c3367902808 100644 --- a/ace/Read_Buffer.cpp +++ b/ace/Read_Buffer.cpp @@ -1,6 +1,8 @@ // $Id$ #include "ace/Read_Buffer.h" +#include "ace/Log_Msg.h" +#include "ace/Malloc_Base.h" #include "ace/Service_Config.h" #if !defined (__ACE_INLINE__) diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp index 0e8c79ebe8d..303b18de723 100644 --- a/ace/WFMO_Reactor.cpp +++ b/ace/WFMO_Reactor.cpp @@ -1017,7 +1017,8 @@ ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &) } ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh, - ACE_Timer_Queue *tq) + ACE_Timer_Queue *tq, + ACE_Reactor_Notify *notify) : signal_handler_ (0), delete_signal_handler_ (0), timer_queue_ (0), @@ -1039,7 +1040,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh, open_for_business_ (0), deactivated_ (0) { - if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq) == -1) + if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("WFMO_Reactor"))); @@ -1048,7 +1049,8 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh, ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size, int unused, ACE_Sig_Handler *sh, - ACE_Timer_Queue *tq) + ACE_Timer_Queue *tq, + ACE_Reactor_Notify *notify) : signal_handler_ (0), delete_signal_handler_ (0), timer_queue_ (0), @@ -1072,7 +1074,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size, { ACE_UNUSED_ARG (unused); - if (this->open (size, 0, sh, tq) == -1) + if (this->open (size, 0, sh, tq, 0, notify) == -1) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("WFMO_Reactor"))); @@ -2228,8 +2230,10 @@ ACE_WFMO_Reactor_Notify::close (void) return -1; } -ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (void) +ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies) : timer_queue_ (0), + message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer), + max_notifies * sizeof (ACE_Notification_Buffer)), max_notify_iterations_ (-1) { } diff --git a/ace/WFMO_Reactor.h b/ace/WFMO_Reactor.h index b380aebe21a..0aeac50beef 100644 --- a/ace/WFMO_Reactor.h +++ b/ace/WFMO_Reactor.h @@ -476,7 +476,7 @@ class ACE_Export ACE_WFMO_Reactor_Notify : public ACE_Reactor_Notify { public: /// Constructor - ACE_WFMO_Reactor_Notify (void); + ACE_WFMO_Reactor_Notify (size_t max_notifies = 1024); /// Initialization. <timer_queue> is stored to call <gettimeofday>. virtual int open (ACE_Reactor_Impl *wfmo_reactor, @@ -637,7 +637,8 @@ public: /// Initialize <ACE_WFMO_Reactor> with the default size. ACE_WFMO_Reactor (ACE_Sig_Handler * = 0, - ACE_Timer_Queue * = 0); + ACE_Timer_Queue * = 0, + ACE_Reactor_Notify * = 0); /** * Initialize <ACE_WFMO_Reactor> with size <size>. <size> should @@ -648,7 +649,8 @@ public: ACE_WFMO_Reactor (size_t size, int unused = 0, ACE_Sig_Handler * = 0, - ACE_Timer_Queue * = 0); + ACE_Timer_Queue * = 0, + ACE_Reactor_Notify * = 0); /** * Initialize <ACE_WFMO_Reactor> with size <size>. <size> should diff --git a/examples/C++NPv2/AC_Client_Logging_Daemon.cpp b/examples/C++NPv2/AC_Client_Logging_Daemon.cpp index 09e8bd9ee82..82dcdc73b1d 100644 --- a/examples/C++NPv2/AC_Client_Logging_Daemon.cpp +++ b/examples/C++NPv2/AC_Client_Logging_Daemon.cpp @@ -190,15 +190,21 @@ int AC_Output_Handler::open (void *connector) { } int AC_Output_Handler::put (ACE_Message_Block *mb, - ACE_Time_Value *timeout) -{ return putq (mb, timeout); } + ACE_Time_Value *timeout) { + int retval; + while ((retval = putq (mb, timeout)) == -1) { + if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED) + break; + } + return retval; +} int AC_Output_Handler::handle_input (ACE_HANDLE h) { peer ().close (); reactor ()->remove_handler (h, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); - msg_queue ()->deactivate (1); + msg_queue ()->pulse (); return 0; } |