summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-07-23 20:41:14 +0000
committernanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-07-23 20:41:14 +0000
commit0d24e1dd36bb0f6d2d9d5f4a15a08e84829cf28b (patch)
tree5766137d365bd7373df5803479636e4b81211af3
parentbcd589b1b647b7595735af1fa76ab3fd900d494f (diff)
downloadATCD-0d24e1dd36bb0f6d2d9d5f4a15a08e84829cf28b.tar.gz
Use semaphores on applicable platforms. Differentiate read/write token operations.
-rw-r--r--ace/Token.cpp158
-rw-r--r--ace/Token.h56
-rw-r--r--ace/Token.i40
3 files changed, 182 insertions, 72 deletions
diff --git a/ace/Token.cpp b/ace/Token.cpp
index 4c6f80fe7b5..9ccbbb06135 100644
--- a/ace/Token.cpp
+++ b/ace/Token.cpp
@@ -37,41 +37,36 @@ ACE_Token::dump (void) const
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
-ACE_Token::ACE_Queue_Entry::ACE_Queue_Entry (ACE_Thread_Mutex &m,
- ACE_thread_t t_id)
+ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m,
+ ACE_thread_t t_id)
: next_ (0),
thread_id_ (t_id),
+#if defined (ACE_TOKEN_USES_SEMAPHORE)
+ cv_ (0),
+#else
cv_ (m),
+#endif /* ACE_TOKEN_USES_SEMAPHORE */
runable_ (0)
{
- ACE_TRACE ("ACE_Token::ACE_Queue_Entry::ACE_Queue_Entry");
+ ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry");
}
-ACE_Token::ACE_Token (LPCTSTR name, void *any)
+ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void)
: head_ (0),
- tail_ (0),
- lock_ (name, any),
- in_use_ (0),
- waiters_ (0),
- nesting_level_ (0)
-{
-// ACE_TRACE ("ACE_Token::ACE_Token");
-}
-
-ACE_Token::~ACE_Token (void)
+ tail_ (0)
{
- ACE_TRACE ("ACE_Token::~ACE_Token");
+ ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue");
}
// Remove an entry from the list. Must be
// called with locks held.
void
-ACE_Token::remove_entry (ACE_Token::ACE_Queue_Entry *entry)
+ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry)
{
ACE_TRACE ("ACE_Token::remove_entry");
- ACE_Token::ACE_Queue_Entry *curr = 0;
- ACE_Token::ACE_Queue_Entry *prev = 0;
+ ACE_Token_Queue_Entry *curr = 0;
+ ACE_Token_Queue_Entry *prev = 0;
if (this->head_ == 0)
return;
@@ -95,16 +90,35 @@ ACE_Token::remove_entry (ACE_Token::ACE_Queue_Entry *entry)
this->tail_ = curr;
}
+ACE_Token::ACE_Token (LPCTSTR name, void *any)
+ : lock_ (name, any),
+ in_use_ (0),
+ waiters_ (0),
+ nesting_level_ (0)
+{
+// ACE_TRACE ("ACE_Token::ACE_Token");
+}
+
+ACE_Token::~ACE_Token (void)
+{
+ ACE_TRACE ("ACE_Token::~ACE_Token");
+}
+
int
ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
void *arg,
- ACE_Time_Value *timeout)
+ ACE_Time_Value *timeout,
+ ACE_Token_Op_Type op_type)
{
ACE_TRACE ("ACE_Token::shared_acquire");
ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
ACE_thread_t thr_id = ACE_Thread::self ();
+ ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN ?
+ &this->readers_ :
+ &this->writers_);
+
#if defined (DEBUGGING)
cerr << '(' << ACE_Thread::self () << ')'
<< " acquire: owner_ = " << this->owner_
@@ -132,18 +146,18 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
// Allocate q entry on stack. This works since we don't
// exit this method's activation record until we've got the
// token.
- ACE_Token::ACE_Queue_Entry my_entry (this->lock_, thr_id);
+ ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_, thr_id);
int ret = 0;
- if (this->head_ == 0) // I'm first and only waiter in line...
+ if (queue->head_ == 0) // I'm first and only waiter in line...
{
- this->head_ = &my_entry;
- this->tail_ = &my_entry;
+ queue->head_ = &my_entry;
+ queue->tail_ = &my_entry;
}
else // I'm queued at the end of the list.
{
- this->tail_->next_ = &my_entry;
- this->tail_ = &my_entry;
+ queue->tail_->next_ = &my_entry;
+ queue->tail_ = &my_entry;
}
this->waiters_++;
@@ -164,8 +178,8 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
}
// Sleep until we've got the token (ignore signals).
-
- while (my_entry.cv_.wait (timeout) == -1)
+
+ while (my_entry.wait (timeout, this->lock_) == -1)
{
// Note, this should obey whatever thread-specific
// interrupt policy is currently in place...
@@ -179,7 +193,7 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
#endif /* DEBUGGING */
// We come here if a timeout occurs or some serious
// ACE_Condition object error.
- this->remove_entry (&my_entry);
+ queue->remove_entry (&my_entry);
return -1;
}
@@ -193,7 +207,7 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),
}
else
{
- this->in_use_ = 1;
+ this->in_use_ = op_type;
this->owner_ = thr_id; // Its mine!
return 0;
}
@@ -212,7 +226,7 @@ int
ACE_Token::acquire (ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Token::acquire");
- return this->shared_acquire (0, 0, timeout);
+ return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN);
}
// Acquire the token, sleeping until it is obtained or until
@@ -224,7 +238,7 @@ ACE_Token::acquire (void (*sleep_hook_func)(void *),
ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Token::acquire");
- return this->shared_acquire (sleep_hook_func, arg, timeout);
+ return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN);
}
// Try to renew the token.
@@ -241,38 +255,57 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout)
// ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_));
// Check to see if there are any waiters. If not, we just keep the token.
- if (this->head_ != 0)
+ if (this->writers_.head_ != 0 ||
+ (this->in_use_ == ACE_Token::READ_TOKEN && this->readers_.head_ != 0))
{
- ACE_Token::ACE_Queue_Entry my_entry (this->lock_, this->owner_);
+ // First we determine which queue should we the original threads
+ // back and then, from which queue to wake next thread.
+ ACE_Token::ACE_Token_Queue *old_q = (this->in_use_ == ACE_Token::READ_TOKEN ?
+ &this->readers_ : &this->writers_);
+ ACE_Token::ACE_Token_Queue *queue;
+ if (this->writers_.head_ != 0)
+ {
+ queue = &this->writers_;
+ this->in_use_ = ACE_Token::WRITE_TOKEN;
+ }
+ else
+ {
+ queue = &this->readers_;
+ this->in_use_ = ACE_Token::READ_TOKEN;
+ }
+
+ ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_, this->owner_);
int save_nesting_level_ = this->nesting_level_;
- this->owner_ = this->head_->thread_id_;
+ this->owner_ = queue->head_->thread_id_;
this->nesting_level_ = 0;
// Wake up next waiter and make it runable.
- this->head_->cv_.signal ();
- this->head_->runable_ = 1;
+ queue->head_->runable_ = 1;
+ queue->head_->signal ();
- this->head_ = this->head_->next_;
+ // And remove it from it's queue.
+ queue->remove_entry (queue->head_);
- if (this->head_ == 0) // No other threads - just add me
+ // Now put the current thread back in its queue.
+ if (old_q->head_ == 0) // No other threads - just add me
{
- this->head_ = &my_entry;
- this->tail_ = &my_entry;
+ old_q->head_ = &my_entry;
+ old_q->tail_ = &my_entry;
}
else if (requeue_position == -1) // Insert at the end of the queue.
{
- this->tail_->next_ = &my_entry;
- this->tail_ = &my_entry;
+ old_q->tail_->next_ = &my_entry;
+ old_q->tail_ = &my_entry;
}
else if (requeue_position == 0) // Insert at head of queue.
{
- my_entry.next_ = this->head_;
- this->head_ = &my_entry;
+ my_entry.next_ = old_q->head_;
+ old_q->head_ = &my_entry;
}
else // Insert in the middle of the queue somewhere.
{
- ACE_Token::ACE_Queue_Entry *insert_after = this->head_;
+ ACE_Token::ACE_Token_Queue_Entry *insert_after = old_q->head_;
// Determine where our thread should go in the queue of
// waiters.
@@ -283,14 +316,13 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout)
my_entry.next_ = insert_after->next_;
if (my_entry.next_ == 0)
- this->tail_ = &my_entry;
+ old_q->tail_ = &my_entry;
insert_after->next_ = &my_entry;
}
// Sleep until we've got the token (ignore signals).
-
- while (my_entry.cv_.wait (timeout) == -1)
+ while (my_entry.wait (0, this->lock_) == -1)
{
// Note, this should obey whatever thread-specific
// interrupt policy is currently in place...
@@ -303,7 +335,7 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout)
#endif /* DEBUGGING */
// We come here if a timeout occurs or
// some serious ACE_Condition object error.
- this->remove_entry (&my_entry);
+ old_q->remove_entry (&my_entry);
return -1;
}
@@ -333,21 +365,35 @@ ACE_Token::release (void)
--this->nesting_level_;
else
{
- if (this->head_ == 0)
+ if (this->writers_.head_ == 0 && this->readers_.head_ == 0)
this->in_use_ = 0; // No more waiters...
else
{
- this->owner_ = this->head_->thread_id_;
+ ACE_Token_Queue *queue;
+
+ if (this->writers_.head_ != 0)
+ // Writer threads get priority to run first.
+ {
+ queue = &this->writers_;
+ this->in_use_ = ACE_Token::WRITE_TOKEN;
+ }
+ else
+ {
+ queue = &this->readers_;
+ this->in_use_ = ACE_Token::READ_TOKEN;
+ }
+
+ this->owner_ = queue->head_->thread_id_;
--this->waiters_;
// Wake up waiter and make it runable.
- this->head_->cv_.signal ();
- this->head_->runable_ = 1;
+ queue->head_->runable_ = 1;
+ queue->head_->signal ();
- this->head_ = this->head_->next_;
+ queue->head_ = queue->head_->next_;
- if (this->head_ == 0)
- this->tail_ = 0;
+ if (queue->head_ == 0)
+ queue->tail_ = 0;
}
}
return 0;
diff --git a/ace/Token.h b/ace/Token.h
index 0364eaf4082..653af1c6f4e 100644
--- a/ace/Token.h
+++ b/ace/Token.h
@@ -22,6 +22,11 @@
#if defined (ACE_HAS_THREADS)
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || defined (VXWORKS) || defined (ACE_PSOS)
+// If platforms support semaphores with timed wait, then we use semaphores instead of c.v.
+# define ACE_TOKEN_USES_SEMAPHORE
+#endif /* (ACE_WIN32 && !ACE_HAS_WINCE) || VXWORKS || ACE_PSOS */
+
class ACE_Export ACE_Token
{
// = TITLE
@@ -125,39 +130,68 @@ public:
// Declare the dynamic allocation hooks.
private:
+ enum ACE_Token_Op_Type
+ {
+ READ_TOKEN = 1,
+ WRITE_TOKEN
+ };
+
// = The following structure implements a ACE_FIFO of waiter threads
// that are asleep waiting to obtain the token.
- struct ACE_Queue_Entry
+ struct ACE_Token_Queue_Entry
{
- ACE_Queue_Entry (ACE_Thread_Mutex &m, ACE_thread_t t_id);
+ ACE_Token_Queue_Entry (ACE_Thread_Mutex &m, ACE_thread_t t_id);
- ACE_Queue_Entry *next_;
+ int wait (ACE_Time_Value *timeout, ACE_Thread_Mutex &lock);
+ // Entry blocks on the token.
+
+ int signal (void);
+ // Notify (unblock) the entry.
+
+ ACE_Token_Queue_Entry *next_;
// Pointer to next waiter.
ACE_thread_t thread_id_;
// ACE_Thread id of this waiter.
+#if defined (ACE_TOKEN_USES_SEMAPHORE)
+ ACE_Semaphore cv_;
+ // ACE_Semaphore object used to wake up waiter when it can run again.
+#else
ACE_Condition_Thread_Mutex cv_;
// ACE_Condition object used to wake up waiter when it can run again.
+#endif /* ACE_TOKEN_USES_SEMAPHORE */
int runable_;
// Ok to run.
};
+ struct ACE_Token_Queue
+ {
+ ACE_Token_Queue (void);
+
+ void remove_entry (ACE_Token_Queue_Entry *);
+ // Remove a waiter from the queue (used when a timeout occurs).
+
+ ACE_Token_Queue_Entry *head_;
+ // Head of the list of waiting threads.
+
+ ACE_Token_Queue_Entry *tail_;
+ // Tail of the list of waiting threads.
+ };
+
int shared_acquire (void (*sleep_hook_func)(void *),
void *arg,
- ACE_Time_Value *timeout);
+ ACE_Time_Value *timeout,
+ ACE_Token_Op_Type op_type);
// Implements the <acquire> and <tryacquire> methods above.
- void remove_entry (ACE_Queue_Entry *);
- // Remove a waiter from the queue (used when a timeout occurs).
-
- ACE_Queue_Entry *head_;
- // Head of the list of waiting threads.
+ ACE_Token_Queue writers_;
+ // A queue of writer threads.
- ACE_Queue_Entry *tail_;
- // Tail of the list of waiting threads.
+ ACE_Token_Queue readers_;
+ // A queue of reader threads.
ACE_Thread_Mutex lock_;
// ACE_Thread_Mutex used to lock internal data structures.
diff --git a/ace/Token.i b/ace/Token.i
index 44e7b506f5f..9a61b19f176 100644
--- a/ace/Token.i
+++ b/ace/Token.i
@@ -16,7 +16,7 @@ ACE_Token::tryacquire (void)
{
ACE_TRACE ("ACE_Token::tryacquire");
return this->shared_acquire
- (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero);
+ (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::WRITE_TOKEN);
}
ACE_INLINE int
@@ -42,27 +42,57 @@ ACE_INLINE int
ACE_Token::acquire_read (void)
{
ACE_TRACE ("ACE_Token::acquire_read");
- return this->acquire ();
+ return this->shared_acquire
+ (0, 0, 0, ACE_Token::READ_TOKEN);
}
ACE_INLINE int
ACE_Token::acquire_write (void)
{
ACE_TRACE ("ACE_Token::acquire_write");
- return this->acquire ();
+ return this->shared_acquire
+ (0, 0, 0, ACE_Token::WRITE_TOKEN);
}
ACE_INLINE int
ACE_Token::tryacquire_read (void)
{
ACE_TRACE ("ACE_Token::tryacquire_read");
- return this->tryacquire ();
+ return this->shared_acquire
+ (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::READ_TOKEN);
}
ACE_INLINE int
ACE_Token::tryacquire_write (void)
{
ACE_TRACE ("ACE_Token::tryacquire_write");
- return this->tryacquire ();
+ return this->shared_acquire
+ (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::WRITE_TOKEN);
}
+ACE_INLINE int
+ACE_Token::ACE_Token_Queue_Entry::wait (ACE_Time_Value *timeout, ACE_Thread_Mutex &lock)
+{
+#if defined (ACE_TOKEN_USES_SEMAPHORE)
+ lock.release ();
+ int retv = (timeout == 0 ?
+ this->cv_.acquire () :
+ this->cv_.acquire (*timeout));
+ lock.acquire ();
+ return retv;
+#else
+ ACE_UNUSED_ARG (lock);
+ return this->cv_.wait (timeout);
+#endif /* ACE_TOKEN_USES_SEMAPHORE */
+}
+
+ACE_INLINE int
+ACE_Token::ACE_Token_Queue_Entry::signal (void)
+{
+ return
+#if defined (ACE_TOKEN_USES_SEMAPHORE)
+ this->cv_.release ();
+#else
+ this->cv_.signal ();
+#endif /* ACE_TOKEN_USES_SEMAPHORE */
+}