summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2002-06-04 15:51:30 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2002-06-04 15:51:30 +0000
commit2e8c2bfc686195b9a98d8cf412c3e762ab03b145 (patch)
tree3f3bc6c0e7e47f19324d8939bd468bd54414a461 /ace/Message_Queue.cpp
parent13176c7cfd3ae7d477316d579ecbf437964a44a2 (diff)
downloadATCD-2e8c2bfc686195b9a98d8cf412c3e762ab03b145.tar.gz
ChangeLogTag:Tue Jun 4 10:03:19 2002 Douglas C. Schmidt <schmidt@macarena.cs.wustl.edu>
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp129
1 files changed, 72 insertions, 57 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 0456f6f3fb5..3e76849fc34 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -22,8 +22,22 @@ ACE_Message_Queue_Vx::dump (void) const
{
ACE_TRACE ("ACE_Message_Queue_Vx::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ switch (this->state_)
+ {
+ case ACE_Message_Queue_Base::ACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = ACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::DEACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = DEACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::PULSED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = PULSED\n")));
+ break;
+ }
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("deactivated = %d\n")
ACE_LIB_TEXT ("low_water_mark = %d\n")
ACE_LIB_TEXT ("high_water_mark = %d\n")
ACE_LIB_TEXT ("cur_bytes = %d\n")
@@ -31,7 +45,6 @@ ACE_Message_Queue_Vx::dump (void) const
ACE_LIB_TEXT ("cur_count = %d\n")
ACE_LIB_TEXT ("head_ = %u\n")
ACE_LIB_TEXT ("MSG_Q_ID = %u\n"),
- this->deactivated_,
this->low_water_mark_,
this->high_water_mark_,
this->cur_bytes_,
@@ -75,7 +88,6 @@ ACE_Message_Queue_Vx::open (size_t max_messages,
ACE_TRACE ("ACE_Message_Queue_Vx::open");
this->high_water_mark_ = 0;
this->low_water_mark_ = 0;
- this->deactivated_ = 0;
this->cur_bytes_ = 0;
this->cur_length_ = 0;
this->cur_count_ = 0;
@@ -98,32 +110,6 @@ ACE_Message_Queue_Vx::open (size_t max_messages,
MSG_Q_FIFO))) == NULL ? -1 : 0;
}
-// Implementation of the public deactivate() method
-// (assumes locks are held).
-
-int
-ACE_Message_Queue_Vx::deactivate_i (void)
-{
- ACE_TRACE ("ACE_Message_Queue_Vx::deactivate_i");
-
- int current_status =
- this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
-
- this->deactivated_ = 1;
-
- return current_status;
-}
-
-int
-ACE_Message_Queue_Vx::activate_i (void)
-{
- ACE_TRACE ("ACE_Message_Queue_Vx::activate_i");
- int current_status =
- this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
- this->deactivated_ = 0;
- return current_status;
-}
-
// Clean up the queue if we have not already done so!
int
@@ -302,7 +288,6 @@ ACE_Message_Queue_NT::ACE_Message_Queue_NT (size_t max_threads)
cur_bytes_ (0),
cur_length_ (0),
cur_count_ (0),
- deactivated_ (0),
completion_port_ (ACE_INVALID_HANDLE)
{
ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
@@ -342,13 +327,17 @@ ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
{
ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
- if (!this->deactivated_)
+ if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
{
size_t msize = new_item->total_size ();
size_t mlength = new_item->total_length ();
if (::PostQueuedCompletionStatus (this->completion_port_,
msize,
- this->deactivated_,
+ // Irfan, can you please figure out what this should be? It used
+ // to be this->deactivated_, but that was removed... I don't
+ // see how that ever worked, however, since this seems to want
+ // to be a pointer!
+ this->state_,
ACE_reinterpret_cast (LPOVERLAPPED, new_item)))
{
// Update the states once I succeed.
@@ -372,8 +361,10 @@ ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
{
ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
- if (this->deactivated_) // Make sure the MQ is not deactivated before
- { // I proceed.
+
+ // Make sure the MQ is not deactivated before proceeding.
+ if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
+ {
errno = ESHUTDOWN; // Operation on deactivated MQ not allowed.
return -1;
}
@@ -409,26 +400,38 @@ ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
}
int
-ACE_Message_Queue_NT::deactivate (void)
+ACE_Message_Queue_NT::deactivate (int pulse)
{
ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
- if (this->deactivated_) // Check if I have been deactivated already.
- return ACE_Message_Queue_Base::WAS_INACTIVE;
-
- this->deactivated_ = 1;
-
- // Get the number of shutdown messages necessary to wake up
- // all waiting threads.
-
- for (size_t cntr = this->cur_thrs_ - this->cur_count_;
- cntr > 0; cntr++)
- ::PostQueuedCompletionStatus (this->completion_port_,
- 0,
- this->deactivated_,
- NULL);
- return ACE_Message_Queue_Base::WAS_ACTIVE;
+ int previous_status = this->state_;
+ if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
+ {
+ if (pulse)
+ this->state_ = ACE_Message_Queue_Base::PULSED;
+ else
+ this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
+
+ // Get the number of shutdown messages necessary to wake up all
+ // waiting threads.
+
+ for (size_t cntr = this->cur_thrs_ - this->cur_count_;
+ cntr > 0; cntr++)
+ ::PostQueuedCompletionStatus (this->completion_port_,
+ 0,
+ // Irfan, can you please figure
+ // out what this should be? It
+ // used to be
+ // this->deactivated_, but that
+ // was removed... I don't see
+ // how that ever worked,
+ // however, since this seems to
+ // want to be a pointer!
+ this->state_,
+ NULL);
+ }
+ return previous_state;
}
int
@@ -436,11 +439,9 @@ ACE_Message_Queue_NT::activate (void)
{
ACE_TRACE ("ACE_Message_Queue_NT::activate");
ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
- if (!this->deactivated_)
- return ACE_Message_Queue_Base::WAS_ACTIVE;
-
- this->deactivated_ = 0;
- return ACE_Message_Queue_Base::WAS_INACTIVE;
+ int previous_status = this->state_;
+ this->state_ = ACE_Message_Queue_Base::ACTIVATED;
+ return previous_status;
}
void
@@ -449,15 +450,29 @@ ACE_Message_Queue_NT::dump (void) const
ACE_TRACE ("ACE_Message_Queue_NT::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ switch (this->state_)
+ {
+ case ACE_Message_Queue_Base::ACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = ACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::DEACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = DEACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::PULSED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT ("state = PULSED\n")));
+ break;
+ }
+
ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT ("deactivated = %d\n")
ACE_LIB_TEXT ("max_cthrs_ = %d\n")
ACE_LIB_TEXT ("cur_thrs_ = %d\n")
ACE_LIB_TEXT ("cur_bytes = %d\n")
ACE_LIB_TEXT ("cur_length = %d\n")
ACE_LIB_TEXT ("cur_count = %d\n")
ACE_LIB_TEXT ("completion_port_ = %x\n"),
- this->deactivated_,
this->max_cthrs_,
this->cur_thrs_,
this->cur_bytes_,