summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Huston <shuston@riverace.com>2002-06-06 22:21:56 +0000
committerSteve Huston <shuston@riverace.com>2002-06-06 22:21:56 +0000
commit021fd35768d4e53ea8fadd5eb44fad52335a4cd5 (patch)
tree48416e4198fe853708ab30565b2f49887a368ee8
parent49dccf135fbb933fde13413f413a9f227ea9db20 (diff)
downloadATCD-021fd35768d4e53ea8fadd5eb44fad52335a4cd5.tar.gz
Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com>
-rw-r--r--ChangeLog51
-rw-r--r--ChangeLogs/ChangeLog-02a51
-rw-r--r--ChangeLogs/ChangeLog-03a51
-rw-r--r--ace/Activation_Queue.cpp17
-rw-r--r--ace/Log_Msg.cpp21
-rw-r--r--ace/Message_Block.cpp1
-rw-r--r--ace/Message_Queue.h63
-rw-r--r--ace/Message_Queue_T.cpp84
-rw-r--r--ace/Message_Queue_T.h160
-rw-r--r--ace/Message_Queue_T.i37
-rw-r--r--ace/README5
-rw-r--r--ace/Read_Buffer.cpp2
-rw-r--r--ace/WFMO_Reactor.cpp14
-rw-r--r--ace/WFMO_Reactor.h8
-rw-r--r--examples/C++NPv2/AC_Client_Logging_Daemon.cpp12
15 files changed, 366 insertions, 211 deletions
diff --git a/ChangeLog b/ChangeLog
index fdfb5f5b71e..50996aeb8b7 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,50 @@
+Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com>
+
+ * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs
+ use it, and it's functionality doesn't really support the
+ message queue deactivation/pulse semantics.
+
+ * ace/Message_Queue.h:
+ * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument
+ from deactivate() and add a pulse() method. States stay the
+ same.
+ Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's
+ not used any longer.
+
+ * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse()
+ method instead of deactivate (1).
+
+ * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered
+ questions. Removed reimplemented handle_connect() method
+ because the needed things are now available via the framework.
+
+ * ace/Asynch_Acceptor.{h cpp}:
+ * ace/Asynch_Connector.{h cpp}: Added new hook method,
+ int validate_connection (const ACE_Asynch_Accept::Result&result,
+ const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr)
+ that allows access to success/fail, handle, and addresses. The
+ validate_new_connection (const ACE_INET_Addr&) method is now
+ deprecated.
+
+ * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an
+ optional argument to ACE_WFMO_Reactor_Notify constructor.
+ size_t max_notifies is used to specify a limit for how many
+ notifications can be queued. The value is used to calculate
+ new high and low watermarks to the message_queue_. Default 1024.
+
+ * ace/Activation_Queue.cpp:
+ * ace/Message_Block.cpp:
+ * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get
+ the ACE_DEBUG, etc. definitions.
+
+ * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and
+ ace/Malloc_Base.h (for ACE_Allocator).
+
+ * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a
+ ACE_Log_Msg_Callback, do it before sending the log record
+ to any other logging sinks. Allows the callback to munge
+ the data.
+
Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu>
* ace/Dev_Poll_Reactor.cpp (cancel_timer):
@@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
* ace/Malloc_T.cpp: Clarify that the memory backing store must
reside in a directory with the appropriate visibility and
- permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov>
- for reporting this.
+ permissions. Thanks to Frank O. Flemisch
+ <f.o.flemisch@larc.nasa.gov> for reporting this.
Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index fdfb5f5b71e..50996aeb8b7 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,50 @@
+Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com>
+
+ * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs
+ use it, and it's functionality doesn't really support the
+ message queue deactivation/pulse semantics.
+
+ * ace/Message_Queue.h:
+ * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument
+ from deactivate() and add a pulse() method. States stay the
+ same.
+ Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's
+ not used any longer.
+
+ * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse()
+ method instead of deactivate (1).
+
+ * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered
+ questions. Removed reimplemented handle_connect() method
+ because the needed things are now available via the framework.
+
+ * ace/Asynch_Acceptor.{h cpp}:
+ * ace/Asynch_Connector.{h cpp}: Added new hook method,
+ int validate_connection (const ACE_Asynch_Accept::Result&result,
+ const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr)
+ that allows access to success/fail, handle, and addresses. The
+ validate_new_connection (const ACE_INET_Addr&) method is now
+ deprecated.
+
+ * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an
+ optional argument to ACE_WFMO_Reactor_Notify constructor.
+ size_t max_notifies is used to specify a limit for how many
+ notifications can be queued. The value is used to calculate
+ new high and low watermarks to the message_queue_. Default 1024.
+
+ * ace/Activation_Queue.cpp:
+ * ace/Message_Block.cpp:
+ * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get
+ the ACE_DEBUG, etc. definitions.
+
+ * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and
+ ace/Malloc_Base.h (for ACE_Allocator).
+
+ * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a
+ ACE_Log_Msg_Callback, do it before sending the log record
+ to any other logging sinks. Allows the callback to munge
+ the data.
+
Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu>
* ace/Dev_Poll_Reactor.cpp (cancel_timer):
@@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
* ace/Malloc_T.cpp: Clarify that the memory backing store must
reside in a directory with the appropriate visibility and
- permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov>
- for reporting this.
+ permissions. Thanks to Frank O. Flemisch
+ <f.o.flemisch@larc.nasa.gov> for reporting this.
Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index fdfb5f5b71e..50996aeb8b7 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,50 @@
+Thu Jun 6 18:10:45 Steve Huston <shuston@riverace.com>
+
+ * ace/README: Removed ACE_HAS_OPTIMIZED_MESSAGE_QUEUE. No configs
+ use it, and it's functionality doesn't really support the
+ message queue deactivation/pulse semantics.
+
+ * ace/Message_Queue.h:
+ * ace/Message_Queue_T.{h cpp i}: Removed the "int pulse" argument
+ from deactivate() and add a pulse() method. States stay the
+ same.
+ Took out all the ACE_HAS_OPTIMIZED_MESSAGE_QUEUE stuff. It's
+ not used any longer.
+
+ * examples/C++NPv2/AC_Client_Logging_Daemon.cpp: Use the pulse()
+ method instead of deactivate (1).
+
+ * examples/C++NPv2/AIO_Client_Logging_Daemon.cpp: Removed answered
+ questions. Removed reimplemented handle_connect() method
+ because the needed things are now available via the framework.
+
+ * ace/Asynch_Acceptor.{h cpp}:
+ * ace/Asynch_Connector.{h cpp}: Added new hook method,
+ int validate_connection (const ACE_Asynch_Accept::Result&result,
+ const ACE_INET_Addr& remote_addr, const ACE_INET_Addr& local_addr)
+ that allows access to success/fail, handle, and addresses. The
+ validate_new_connection (const ACE_INET_Addr&) method is now
+ deprecated.
+
+ * ace/WFMO_Reactor.{h cpp} (ACE_WFMO_Reactor_Notify): Add an
+ optional argument to ACE_WFMO_Reactor_Notify constructor.
+ size_t max_notifies is used to specify a limit for how many
+ notifications can be queued. The value is used to calculate
+ new high and low watermarks to the message_queue_. Default 1024.
+
+ * ace/Activation_Queue.cpp:
+ * ace/Message_Block.cpp:
+ * ace/Message_Queue_T.cpp: Added #include "ace/Log_Msg.h" to get
+ the ACE_DEBUG, etc. definitions.
+
+ * ace/Read_Buffer.cpp: Added #include for ace/Log_Msg.h and
+ ace/Malloc_Base.h (for ACE_Allocator).
+
+ * ace/Log_Msg.cpp (log (ACE_Log_Record&)): If there's a
+ ACE_Log_Msg_Callback, do it before sending the log record
+ to any other logging sinks. Allows the callback to munge
+ the data.
+
Thu Jun 06 10:50:37 2002 Ossama Othman <ossama@uci.edu>
* ace/Dev_Poll_Reactor.cpp (cancel_timer):
@@ -24,8 +71,8 @@ Thu Jun 6 07:11:15 2002 Douglas C. Schmidt <schmidt@tango.doc.wustl.edu>
* ace/Malloc_T.cpp: Clarify that the memory backing store must
reside in a directory with the appropriate visibility and
- permissions. Thanks to Frank O. Flemisch <f.o.flemisch@larc.nasa.gov>
- for reporting this.
+ permissions. Thanks to Frank O. Flemisch
+ <f.o.flemisch@larc.nasa.gov> for reporting this.
Wed Jun 5 23:06:11 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ace/Activation_Queue.cpp b/ace/Activation_Queue.cpp
index b1a523082a8..9a271c4d7ae 100644
--- a/ace/Activation_Queue.cpp
+++ b/ace/Activation_Queue.cpp
@@ -4,6 +4,7 @@
#include "ace/Activation_Queue.i"
#endif /* __ACE_INLINE__ */
+#include "ace/Log_Msg.h"
#include "ace/Malloc_Base.h"
ACE_RCSID (ace,
@@ -11,13 +12,13 @@ ACE_RCSID (ace,
"$Id$")
-void
+void
ACE_Activation_Queue::dump (void) const
{
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("delete_queue_ = %d\n"),
- this->delete_queue_));
+ ACE_LIB_TEXT ("delete_queue_ = %d\n"),
+ this->delete_queue_));
ACE_DEBUG ((LM_INFO, ACE_LIB_TEXT ("queue_: \n")));
if (this->queue_)
this->queue_->dump();
@@ -28,7 +29,7 @@ ACE_Activation_Queue::dump (void) const
ACE_Activation_Queue::ACE_Activation_Queue (ACE_Message_Queue<ACE_SYNCH> *new_queue,
ACE_Allocator *alloc,
- ACE_Allocator *db_alloc)
+ ACE_Allocator *db_alloc)
: delete_queue_ (0)
, allocator_(alloc)
, data_block_allocator_(db_alloc)
@@ -46,7 +47,7 @@ ACE_Activation_Queue::ACE_Activation_Queue (ACE_Message_Queue<ACE_SYNCH> *new_qu
}
}
-ACE_Activation_Queue::~ACE_Activation_Queue (void)
+ACE_Activation_Queue::~ACE_Activation_Queue (void)
{
if (this->delete_queue_ != 0)
delete this->queue_;
@@ -72,9 +73,9 @@ ACE_Activation_Queue::dequeue (ACE_Time_Value *tv)
return 0;
}
-int
+int
ACE_Activation_Queue::enqueue (ACE_Method_Request *mr,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_Message_Block *mb;
@@ -106,5 +107,3 @@ ACE_Activation_Queue::enqueue (ACE_Method_Request *mr,
return result;
}
-
-
diff --git a/ace/Log_Msg.cpp b/ace/Log_Msg.cpp
index 5ad0c3abca3..bd42f091471 100644
--- a/ace/Log_Msg.cpp
+++ b/ace/Log_Msg.cpp
@@ -1704,6 +1704,14 @@ ACE_Log_Msg::log (ACE_Log_Record &log_record,
ACE_Log_Msg_Sig_Guard sb;
#endif /* !ACE_WIN32 && !ACE_PSOS */
+ // Do the callback, if needed, before acquiring the lock
+ // to avoid holding the lock during the callback so we don't
+ // have deadlock if the callback uses the logger.
+ if (ACE_BIT_ENABLED (ACE_Log_Msg::flags_,
+ ACE_Log_Msg::MSG_CALLBACK)
+ && this->msg_callback () != 0)
+ this->msg_callback ()->log (log_record);
+
// Make sure that the lock is held during all this.
ACE_MT (ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon,
*ACE_Log_Msg_Manager::get_lock (),
@@ -1754,19 +1762,6 @@ ACE_Log_Msg::log (ACE_Log_Record &log_record,
#endif /* ! ACE_LACKS_IOSTREAM_TOTALLY */
);
- if (ACE_BIT_ENABLED (ACE_Log_Msg::flags_,
- ACE_Log_Msg::MSG_CALLBACK)
- && this->msg_callback () != 0)
- {
- // Use a "reverse lock" to avoid holding the lock during the
- // callback so we don't have deadlock if the callback uses
- // the logger.
- ACE_MT (ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> reverse_lock
- (*ACE_Log_Msg_Manager::get_lock ()));
- ACE_MT (ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>,
- ace_mon_1, reverse_lock, -1));
- this->msg_callback ()->log (log_record);
- }
if (tracing)
this->start_tracing ();
}
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index 290c1a57256..f03575700df 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -1,4 +1,5 @@
#include "ace/Message_Block.h"
+#include "ace/Log_Msg.h"
#include "ace/Malloc_Base.h"
#include "ace/Synch_T.h"
diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h
index d864482f256..bcf4248abcb 100644
--- a/ace/Message_Queue.h
+++ b/ace/Message_Queue.h
@@ -47,29 +47,33 @@ template <ACE_SYNCH_DECL> class ACE_Message_Queue_Reverse_Iterator;
class ACE_Export ACE_Message_Queue_Base
{
public:
- // = Default high and low water marks.
enum
{
+ // Default high and low watermarks.
+
/// Default high watermark (16 K).
DEFAULT_HWM = 16 * 1024,
/// Default low watermark (same as high water mark).
DEFAULT_LWM = 16 * 1024,
- /// Message queue was activated before <activate> or <deactivate>.
+ // Queue states. Before PULSED state was added, the activate()
+ // and deactivate() methods returned WAS_INACTIVE or WAS_ACTIVE
+ // to indicate the previous condition. Now those methods
+ // return the state the queue was previously in. WAS_ACTIVE
+ // and WAS_INACTIVE are defined to match previous semantics for
+ // applications that don't use the PULSED state.
+
WAS_ACTIVE = 1, /* DEPRECATED */
- WAS_ACTIVATED = 1,
+ /// Message queue is active and processing normally
+ ACTIVATED = 1,
- /// Message queue was deactivated before <activate> or <deactivate>.
WAS_INACTIVE = 2, /* DEPRECATED */
- WAS_DEACTIVATED = 2,
+ /// Queue is deactivated; no enqueue or dequeue operations allowed.
+ DEACTIVATED = 2,
- /// Message queue was pulsed before <activate> or <deactivate>.
- WAS_PULSED = 3,
+ /// Message queue was pulsed; enqueue and dequeue may proceed normally.
+ PULSED = 3
- /// The following are the states that a queue can be in.
- ACTIVATED = 0,
- DEACTIVATED = 1,
- PULSED = 2
};
ACE_Message_Queue_Base (void);
@@ -153,27 +157,38 @@ public:
// = Activation control methods.
/**
- * Notify all waiting threads so they can wakeup and continue other
- * processing. If <pulse> is 0 the queue's state is changed to
- * deactivated and other operations called until the queue is
- * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is
- * non-0 then only the waiting threads are notified and the queue's state
- * is not changed. In either case, however, no messages are removed
- * from the queue. Return the state of the queue before the call. */
- virtual int deactivate (int pulse = 0) = 0;
+ * Deactivate the queue and wake up all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1 with @c errno
+ * ESHUTDOWN.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int deactivate (void) = 0;
/**
* Reactivate the queue so that threads can enqueue and dequeue
- * messages again. Returns the state of the queue before the call.
+ * messages again.
+ *
+ * @retval The queue's state before this call.
*/
virtual int activate (void) = 0;
- /// Returns the current state of the queue, which can either
- /// be <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
+ /**
+ * Pulse the queue to wake up any waiting threads. Changes the
+ * queue state to PULSED; future enqueue/dequeue operations proceed
+ * as in ACTIVATED state.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int pulse (void) = 0;
+
+ /// Returns the current state of the queue.
virtual int state (void);
- /// Returns true if the state of the queue is <DEACTIVATED>,
- /// but false if the queue's is <ACTIVATED> or <PULSED>.
+ /// Returns 1 if the state of the queue is DEACTIVATED,
+ /// and 0 if the queue's state is ACTIVATED or PULSED.
virtual int deactivated (void) = 0;
/// Get the notification strategy for the <Message_Queue>
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index 7c6399a6374..e480a2c36a2 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -6,6 +6,7 @@
// #include Message_Queue.h instead of Message_Queue_T.h to avoid
// circular include problems.
#include "ace/Message_Queue.h"
+#include "ace/Log_Msg.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -428,18 +429,18 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- switch (this->state_)
+ switch (this->state_)
{
case ACE_Message_Queue_Base::ACTIVATED:
- ACE_DEBUG ((LM_DEBUG,
+ ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("state = ACTIVATED\n")));
break;
case ACE_Message_Queue_Base::DEACTIVATED:
- ACE_DEBUG ((LM_DEBUG,
+ ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("state = DEACTIVATED\n")));
break;
case ACE_Message_Queue_Base::PULSED:
- ACE_DEBUG ((LM_DEBUG,
+ ACE_DEBUG ((LM_DEBUG,
ACE_LIB_TEXT ("state = PULSED\n")));
break;
}
@@ -487,15 +488,8 @@ template <ACE_SYNCH_DECL>
ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
size_t lwm,
ACE_Notification_Strategy *ns)
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- : not_empty_cond_ (0),
- not_full_cond_ (0),
- enqueue_waiters_ (0),
- dequeue_waiters_ (0)
-#else
: not_empty_cond_ (this->lock_),
not_full_cond_ (this->lock_)
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
@@ -578,10 +572,8 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse)
if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
{
// Wakeup all waiters.
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
this->not_empty_cond_.broadcast ();
this->not_full_cond_.broadcast ();
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
if (pulse)
this->state_ = ACE_Message_Queue_Base::PULSED;
@@ -629,33 +621,17 @@ ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
if (this->not_full_cond_.signal () != 0)
return -1;
-#else
- if (this->enqueue_waiters_ > 0)
- {
- --this->enqueue_waiters_;
- return this->not_full_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
return 0;
}
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
// Tell any blocked threads that the queue has a new item!
if (this->not_empty_cond_.signal () != 0)
return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- return this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
return 0;
}
@@ -1126,30 +1102,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i
}
template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *timeout)
+ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond
+ (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
{
int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_full_i () && result != -1)
- {
- ++this->enqueue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- result = this->not_full_cond_.acquire (timeout);
-
- if (result == -1 && errno == ETIME)
- {
- --this->enqueue_waiters_;
- errno = EWOULDBLOCK;
- }
-
- // Save/restore errno.
- ACE_Errno_Guard error (errno);
- mon.acquire ();
- }
-#else
- ACE_UNUSED_ARG (mon);
// Wait while the queue is full.
@@ -1169,35 +1125,14 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
break;
}
}
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
return result;
}
template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *timeout)
+ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond
+ (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout)
{
int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_empty_i () && result != -1)
- {
- ++this->dequeue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- result = this->not_empty_cond_.acquire (timeout);
-
- if (result == -1 && errno == ETIME)
- {
- --this->dequeue_waiters_;
- errno = EWOULDBLOCK;
- }
-
- // Save/restore errno.
- ACE_Errno_Guard error (errno);
- mon.acquire ();
- }
-#else
- ACE_UNUSED_ARG (mon);
// Wait while the queue is empty.
@@ -1217,7 +1152,6 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
break;
}
}
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
return result;
}
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index e023abcbd43..63c9831563a 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -100,14 +100,23 @@ public:
/// Release all resources from the message queue and mark it as deactivated.
virtual ~ACE_Message_Queue (void);
- /// Release all resources from the message queue but do not mark it as deactivated.
- /// This method holds the queue lock during this operation. Returns the number of
- /// messages flushed.
+ /// Release all resources from the message queue but do not mark it
+ /// as deactivated.
+ /**
+ * This method holds the queue lock during this operation.
+ *
+ * @retval The number of messages flushed.
+ */
virtual int flush (void);
- /// Release all resources from the message queue but do not mark it as deactivated.
- /// This method does not hold the queue lock during this operation, i.e., it assume
- /// the lock is held externally. Returns the number of messages flushed.
+ /// Release all resources from the message queue but do not mark it
+ /// as deactivated.
+ /**
+ * The caller must be holding the queue lock before calling this
+ * method.
+ *
+ * @retval The number of messages flushed.
+ */
virtual int flush_i (void);
// = Enqueue and dequeue methods.
@@ -120,27 +129,41 @@ public:
// elapses, (in which case errno = EWOULDBLOCK).
/**
- * Retrieve the first <ACE_Message_Block> without removing it. Note
- * that <timeout> uses <{absolute}> time rather than <{relative}>
- * time. If the <timeout> elapses without receiving a message -1 is
- * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
- * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
- * Otherwise, returns -1 on failure, else the number of items still
- * on the queue.
+ * Retrieve a poiner to the first ACE_Message_Block in the queue
+ * without removing it.
+ *
+ * @arg first_item Reference to an ACE_Message_Block * that will
+ * point to the first block on the queue. The block
+ * remains on the queue until this or another thread
+ * dequeues it.
+ * @arg timeout The absolute time the caller will wait until
+ * for a block to be queued.
+ *
+ * @retval The number of ACE_Message_Blocks on the queue.
+ * @return -1 on failure. errno holds the reason. If EWOULDBLOCK,
+ * the timeout elapsed. If ESHUTDOWN, the queue was
+ * deactivated or pulsed.
*/
virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
/**
- * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
- * accordance with its <msg_priority> (0 is lowest priority). FIFO
+ * Enqueue an ACE_Message_Block into the queue in accordance with
+ * the ACE_Message_Block's priority (0 is lowest priority). FIFO
* order is maintained when messages of the same priority are
- * inserted consecutively. Note that <timeout> uses <{absolute}>
- * time rather than <{relative}> time. If the <timeout> elapses
- * without receiving a message -1 is returned and <errno> is set to
- * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and
- * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure,
- * else the number of items still on the queue.
+ * inserted consecutively.
+ *
+ * @arg new_item Pointer to an ACE_Message_Block that will be
+ * added to the queue. The block's @c msg_priority()
+ * method will be called to obtain the queueing priority.
+ * @arg timeout The absolute time the caller will wait until
+ * for the block to be queued.
+ *
+ * @retval The number of ACE_Message_Blocks on the queue after adding
+ * the specified block.
+ * @return -1 on failure. errno holds the reason. If EWOULDBLOCK,
+ * the timeout elapsed. If ESHUTDOWN, the queue was
+ * deactivated or pulsed.
*/
virtual int enqueue_prio (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
@@ -305,14 +328,14 @@ public:
// = Activation control methods.
/**
- * Notify all waiting threads so they can wakeup and continue other
- * processing. If <pulse> is 0 the queue's state is changed to
- * deactivated and other operations called until the queue is
- * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is
- * non-0 then only the waiting threads are notified and the queue's state
- * is not changed. In either case, however, no messages are removed
- * from the queue. Returns the state of the queue before the call. */
- virtual int deactivate (int pulse = 0);
+ * Deactivate the queue and wakeup all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1 with <errno> ==
+ * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ * call and WAS_ACTIVE if queue was active before the call.
+ */
+ virtual int deactivate (void);
/**
* Reactivate the queue so that threads can enqueue and dequeue
@@ -320,6 +343,19 @@ public:
*/
virtual int activate (void);
+ /**
+ * Pulse the queue to wake up any waiting threads. Changes the
+ * queue state to PULSED; future enqueue/dequeue operations proceed
+ * as in ACTIVATED state.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int pulse (void);
+
+ /// Returns the current state of the queue, which can be one of
+ /// ACTIVATED, DEACTIVATED, or PULSED.
+ virtual int state (void);
+
/// Returns true if the state of the queue is <DEACTIVATED>,
/// but false if the queue's is <ACTIVATED> or <PULSED>.
virtual int deactivated (void);
@@ -405,13 +441,18 @@ protected:
/**
* Notifies all waiting threads that the queue has been deactivated
- * so they can wakeup and continue other processing. If <pulse> is
- * 0 then the queue's state is changed to deactivated and any other
- * operations called until the queue is activated again will
- * immediately return -1 with <errno> == ESHUTDOWN. If <pulse> is
- * non-0 then only the waiting threads are notified and the queue's state
- * is not changed. In either case, however, no messages are removed
- * from the queue. Returns the state of the queue before the call. */
+ * so they can wakeup and continue other processing.
+ * No messages are removed from the queue.
+ *
+ * @arg pulse If 0, the queue's state is changed to DEACTIVATED
+ * and any other operations called until the queue is
+ * reactivated will immediately return -1 with
+ * errno == ESHUTDOWN.
+ * If not zero, only the waiting threads are notified and
+ * the queue's state changes to PULSED.
+ *
+ * @retval The state of the queue before the call.
+ */
virtual int deactivate_i (int pulse = 0);
/// Activate the queue.
@@ -461,25 +502,11 @@ protected:
/// Protect queue from concurrent access.
ACE_SYNCH_MUTEX_T lock_;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- /// Used to make threads sleep until the queue is no longer empty.
- ACE_SYNCH_SEMAPHORE_T not_empty_cond_;
-
- /// Used to make threads sleep until the queue is no longer full.
- ACE_SYNCH_SEMAPHORE_T not_full_cond_;
-
- /// Number of threads waiting to dequeue a <Message_Block>.
- size_t dequeue_waiters_;
-
- /// Number of threads waiting to enqueue a <Message_Block>.
- size_t enqueue_waiters_;
-#else
/// Used to make threads sleep until the queue is no longer empty.
ACE_SYNCH_CONDITION_T not_empty_cond_;
/// Used to make threads sleep until the queue is no longer full.
ACE_SYNCH_CONDITION_T not_full_cond_;
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
private:
@@ -1120,14 +1147,14 @@ public:
// = Activation control methods.
/**
- * Notify all waiting threads so they can wakeup and continue other
- * processing. If <pulse> is 0 the queue's state is changed to
- * deactivated and other operations called until the queue is
- * activated again will return -1 with <errno> == ESHUTDOWN. If <pulse> is
- * non-0 then only the waiting threads are notified and the queue's state
- * is not changed. In either case, however, no messages are removed
- * from the queue. Returns the state of the queue before the call. */
- virtual int deactivate (int pulse = 0);
+ * Deactivate the queue and wakeup all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1 with <errno> ==
+ * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ * call and WAS_ACTIVE if queue was active before the call.
+ */
+ virtual int deactivate (void);
/**
* Reactivate the queue so that threads can enqueue and dequeue
@@ -1135,12 +1162,21 @@ public:
*/
virtual int activate (void);
- /// Returns the current state of the queue, which can either
- /// be <ACTIVATED>, <DEACTIVATED>, or <PULSED>.
+ /**
+ * Pulse the queue to wake up any waiting threads. Changes the
+ * queue state to PULSED; future enqueue/dequeue operations proceed
+ * as in ACTIVATED state.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int pulse (void);
+
+ /// Returns the current state of the queue, which can be one of
+ /// ACTIVATED, DEACTIVATED, or PULSED.
virtual int state (void);
- /// Returns true if the state of the queue is <DEACTIVATED>,
- /// but false if the queue's is <ACTIVATED> or <PULSED>.
+ /// Returns true if the state of the queue is DEACTIVATED,
+ /// but false if the queue's state is ACTIVATED or PULSED.
virtual int deactivated (void);
// = Notification hook.
diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i
index b9d89ec2805..d04fd9041a3 100644
--- a/ace/Message_Queue_T.i
+++ b/ace/Message_Queue_T.i
@@ -129,6 +129,15 @@ ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
}
template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::deactivate ()
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->deactivate_i (0); // Not a pulse
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
@@ -138,12 +147,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
}
template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (int pulse)
+ACE_Message_Queue<ACE_SYNCH_USE>::pulse ()
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
- return this->deactivate_i (pulse);
+ return this->deactivate_i (1); // Just a pulse
}
template <ACE_SYNCH_DECL> ACE_INLINE int
@@ -154,6 +163,14 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
}
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::state (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state");
+
+ return this->state_;
+}
+
#if 0
// The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the
// header file (j.russell.noseworthy@objectsciences.com)
@@ -269,6 +286,14 @@ ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void)
}
template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
+
+ return this->queue_.deactivate ();
+}
+
+template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int
ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
{
ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate");
@@ -277,11 +302,11 @@ ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void)
}
template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (int pulse)
+ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void)
{
- ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate");
+ ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse");
- return this->queue_.deactivate (pulse);
+ return this->queue_.pulse ();
}
template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_INLINE int
diff --git a/ace/README b/ace/README
index 325768e255a..c885ecd5f0a 100644
--- a/ace/README
+++ b/ace/README
@@ -403,11 +403,6 @@ ACE_HAS_ONLY_SCHED_OTHER Platform, e.g., Solaris 2.5,
ACE_HAS_2_PARAM_ASCTIME_R_AND_CTIME_R Uses ctime_r & asctime_r with
only two parameters
vs. three.
-ACE_HAS_OPTIMIZED_MESSAGE_QUEUE Use the semaphore
- implementation of
- ACE_Message_Queue rather than
- the emulated condition
- variable (NT and VxWorks).
ACE_HAS_OSF_TIMOD_H Platform supports the OSF TLI
timod STREAMS module
ACE_HAS_3_PARAM_WCSTOK Platform has 3-parameter version
diff --git a/ace/Read_Buffer.cpp b/ace/Read_Buffer.cpp
index e78315e0990..c3367902808 100644
--- a/ace/Read_Buffer.cpp
+++ b/ace/Read_Buffer.cpp
@@ -1,6 +1,8 @@
// $Id$
#include "ace/Read_Buffer.h"
+#include "ace/Log_Msg.h"
+#include "ace/Malloc_Base.h"
#include "ace/Service_Config.h"
#if !defined (__ACE_INLINE__)
diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp
index 0e8c79ebe8d..303b18de723 100644
--- a/ace/WFMO_Reactor.cpp
+++ b/ace/WFMO_Reactor.cpp
@@ -1017,7 +1017,8 @@ ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
}
ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
- ACE_Timer_Queue *tq)
+ ACE_Timer_Queue *tq,
+ ACE_Reactor_Notify *notify)
: signal_handler_ (0),
delete_signal_handler_ (0),
timer_queue_ (0),
@@ -1039,7 +1040,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
open_for_business_ (0),
deactivated_ (0)
{
- if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq) == -1)
+ if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("WFMO_Reactor")));
@@ -1048,7 +1049,8 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
int unused,
ACE_Sig_Handler *sh,
- ACE_Timer_Queue *tq)
+ ACE_Timer_Queue *tq,
+ ACE_Reactor_Notify *notify)
: signal_handler_ (0),
delete_signal_handler_ (0),
timer_queue_ (0),
@@ -1072,7 +1074,7 @@ ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
{
ACE_UNUSED_ARG (unused);
- if (this->open (size, 0, sh, tq) == -1)
+ if (this->open (size, 0, sh, tq, 0, notify) == -1)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("WFMO_Reactor")));
@@ -2228,8 +2230,10 @@ ACE_WFMO_Reactor_Notify::close (void)
return -1;
}
-ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (void)
+ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
: timer_queue_ (0),
+ message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
+ max_notifies * sizeof (ACE_Notification_Buffer)),
max_notify_iterations_ (-1)
{
}
diff --git a/ace/WFMO_Reactor.h b/ace/WFMO_Reactor.h
index b380aebe21a..0aeac50beef 100644
--- a/ace/WFMO_Reactor.h
+++ b/ace/WFMO_Reactor.h
@@ -476,7 +476,7 @@ class ACE_Export ACE_WFMO_Reactor_Notify : public ACE_Reactor_Notify
{
public:
/// Constructor
- ACE_WFMO_Reactor_Notify (void);
+ ACE_WFMO_Reactor_Notify (size_t max_notifies = 1024);
/// Initialization. <timer_queue> is stored to call <gettimeofday>.
virtual int open (ACE_Reactor_Impl *wfmo_reactor,
@@ -637,7 +637,8 @@ public:
/// Initialize <ACE_WFMO_Reactor> with the default size.
ACE_WFMO_Reactor (ACE_Sig_Handler * = 0,
- ACE_Timer_Queue * = 0);
+ ACE_Timer_Queue * = 0,
+ ACE_Reactor_Notify * = 0);
/**
* Initialize <ACE_WFMO_Reactor> with size <size>. <size> should
@@ -648,7 +649,8 @@ public:
ACE_WFMO_Reactor (size_t size,
int unused = 0,
ACE_Sig_Handler * = 0,
- ACE_Timer_Queue * = 0);
+ ACE_Timer_Queue * = 0,
+ ACE_Reactor_Notify * = 0);
/**
* Initialize <ACE_WFMO_Reactor> with size <size>. <size> should
diff --git a/examples/C++NPv2/AC_Client_Logging_Daemon.cpp b/examples/C++NPv2/AC_Client_Logging_Daemon.cpp
index 09e8bd9ee82..82dcdc73b1d 100644
--- a/examples/C++NPv2/AC_Client_Logging_Daemon.cpp
+++ b/examples/C++NPv2/AC_Client_Logging_Daemon.cpp
@@ -190,15 +190,21 @@ int AC_Output_Handler::open (void *connector) {
}
int AC_Output_Handler::put (ACE_Message_Block *mb,
- ACE_Time_Value *timeout)
-{ return putq (mb, timeout); }
+ ACE_Time_Value *timeout) {
+ int retval;
+ while ((retval = putq (mb, timeout)) == -1) {
+ if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED)
+ break;
+ }
+ return retval;
+}
int AC_Output_Handler::handle_input (ACE_HANDLE h) {
peer ().close ();
reactor ()->remove_handler
(h, ACE_Event_Handler::READ_MASK
| ACE_Event_Handler::DONT_CALL);
- msg_queue ()->deactivate (1);
+ msg_queue ()->pulse ();
return 0;
}