diff options
author | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-23 20:41:14 +0000 |
---|---|---|
committer | nanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-23 20:41:14 +0000 |
commit | 0d24e1dd36bb0f6d2d9d5f4a15a08e84829cf28b (patch) | |
tree | 5766137d365bd7373df5803479636e4b81211af3 | |
parent | bcd589b1b647b7595735af1fa76ab3fd900d494f (diff) | |
download | ATCD-0d24e1dd36bb0f6d2d9d5f4a15a08e84829cf28b.tar.gz |
Use semaphores on applicable platforms. Differentiate read/write token operations.
-rw-r--r-- | ace/Token.cpp | 158 | ||||
-rw-r--r-- | ace/Token.h | 56 | ||||
-rw-r--r-- | ace/Token.i | 40 |
3 files changed, 182 insertions, 72 deletions
diff --git a/ace/Token.cpp b/ace/Token.cpp index 4c6f80fe7b5..9ccbbb06135 100644 --- a/ace/Token.cpp +++ b/ace/Token.cpp @@ -37,41 +37,36 @@ ACE_Token::dump (void) const ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } -ACE_Token::ACE_Queue_Entry::ACE_Queue_Entry (ACE_Thread_Mutex &m, - ACE_thread_t t_id) +ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry (ACE_Thread_Mutex &m, + ACE_thread_t t_id) : next_ (0), thread_id_ (t_id), +#if defined (ACE_TOKEN_USES_SEMAPHORE) + cv_ (0), +#else cv_ (m), +#endif /* ACE_TOKEN_USES_SEMAPHORE */ runable_ (0) { - ACE_TRACE ("ACE_Token::ACE_Queue_Entry::ACE_Queue_Entry"); + ACE_TRACE ("ACE_Token::ACE_Token_Queue_Entry::ACE_Token_Queue_Entry"); } -ACE_Token::ACE_Token (LPCTSTR name, void *any) +ACE_Token::ACE_Token_Queue::ACE_Token_Queue (void) : head_ (0), - tail_ (0), - lock_ (name, any), - in_use_ (0), - waiters_ (0), - nesting_level_ (0) -{ -// ACE_TRACE ("ACE_Token::ACE_Token"); -} - -ACE_Token::~ACE_Token (void) + tail_ (0) { - ACE_TRACE ("ACE_Token::~ACE_Token"); + ACE_TRACE ("ACE_Token::ACE_Token_Queue::ACE_Token_Queue"); } // Remove an entry from the list. Must be // called with locks held. void -ACE_Token::remove_entry (ACE_Token::ACE_Queue_Entry *entry) +ACE_Token::ACE_Token_Queue::remove_entry (ACE_Token::ACE_Token_Queue_Entry *entry) { ACE_TRACE ("ACE_Token::remove_entry"); - ACE_Token::ACE_Queue_Entry *curr = 0; - ACE_Token::ACE_Queue_Entry *prev = 0; + ACE_Token_Queue_Entry *curr = 0; + ACE_Token_Queue_Entry *prev = 0; if (this->head_ == 0) return; @@ -95,16 +90,35 @@ ACE_Token::remove_entry (ACE_Token::ACE_Queue_Entry *entry) this->tail_ = curr; } +ACE_Token::ACE_Token (LPCTSTR name, void *any) + : lock_ (name, any), + in_use_ (0), + waiters_ (0), + nesting_level_ (0) +{ +// ACE_TRACE ("ACE_Token::ACE_Token"); +} + +ACE_Token::~ACE_Token (void) +{ + ACE_TRACE ("ACE_Token::~ACE_Token"); +} + int ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), void *arg, - ACE_Time_Value *timeout) + ACE_Time_Value *timeout, + ACE_Token_Op_Type op_type) { ACE_TRACE ("ACE_Token::shared_acquire"); ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); ACE_thread_t thr_id = ACE_Thread::self (); + ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN ? + &this->readers_ : + &this->writers_); + #if defined (DEBUGGING) cerr << '(' << ACE_Thread::self () << ')' << " acquire: owner_ = " << this->owner_ @@ -132,18 +146,18 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), // Allocate q entry on stack. This works since we don't // exit this method's activation record until we've got the // token. - ACE_Token::ACE_Queue_Entry my_entry (this->lock_, thr_id); + ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_, thr_id); int ret = 0; - if (this->head_ == 0) // I'm first and only waiter in line... + if (queue->head_ == 0) // I'm first and only waiter in line... { - this->head_ = &my_entry; - this->tail_ = &my_entry; + queue->head_ = &my_entry; + queue->tail_ = &my_entry; } else // I'm queued at the end of the list. { - this->tail_->next_ = &my_entry; - this->tail_ = &my_entry; + queue->tail_->next_ = &my_entry; + queue->tail_ = &my_entry; } this->waiters_++; @@ -164,8 +178,8 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), } // Sleep until we've got the token (ignore signals). - - while (my_entry.cv_.wait (timeout) == -1) + + while (my_entry.wait (timeout, this->lock_) == -1) { // Note, this should obey whatever thread-specific // interrupt policy is currently in place... @@ -179,7 +193,7 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), #endif /* DEBUGGING */ // We come here if a timeout occurs or some serious // ACE_Condition object error. - this->remove_entry (&my_entry); + queue->remove_entry (&my_entry); return -1; } @@ -193,7 +207,7 @@ ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), } else { - this->in_use_ = 1; + this->in_use_ = op_type; this->owner_ = thr_id; // Its mine! return 0; } @@ -212,7 +226,7 @@ int ACE_Token::acquire (ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Token::acquire"); - return this->shared_acquire (0, 0, timeout); + return this->shared_acquire (0, 0, timeout, ACE_Token::WRITE_TOKEN); } // Acquire the token, sleeping until it is obtained or until @@ -224,7 +238,7 @@ ACE_Token::acquire (void (*sleep_hook_func)(void *), ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Token::acquire"); - return this->shared_acquire (sleep_hook_func, arg, timeout); + return this->shared_acquire (sleep_hook_func, arg, timeout, ACE_Token::WRITE_TOKEN); } // Try to renew the token. @@ -241,38 +255,57 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout) // ACE_ASSERT (ACE_OS::thr_equal (ACE_Thread::self (), this->owner_)); // Check to see if there are any waiters. If not, we just keep the token. - if (this->head_ != 0) + if (this->writers_.head_ != 0 || + (this->in_use_ == ACE_Token::READ_TOKEN && this->readers_.head_ != 0)) { - ACE_Token::ACE_Queue_Entry my_entry (this->lock_, this->owner_); + // First we determine which queue should we the original threads + // back and then, from which queue to wake next thread. + ACE_Token::ACE_Token_Queue *old_q = (this->in_use_ == ACE_Token::READ_TOKEN ? + &this->readers_ : &this->writers_); + ACE_Token::ACE_Token_Queue *queue; + if (this->writers_.head_ != 0) + { + queue = &this->writers_; + this->in_use_ = ACE_Token::WRITE_TOKEN; + } + else + { + queue = &this->readers_; + this->in_use_ = ACE_Token::READ_TOKEN; + } + + ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_, this->owner_); int save_nesting_level_ = this->nesting_level_; - this->owner_ = this->head_->thread_id_; + this->owner_ = queue->head_->thread_id_; this->nesting_level_ = 0; // Wake up next waiter and make it runable. - this->head_->cv_.signal (); - this->head_->runable_ = 1; + queue->head_->runable_ = 1; + queue->head_->signal (); - this->head_ = this->head_->next_; + // And remove it from it's queue. + queue->remove_entry (queue->head_); - if (this->head_ == 0) // No other threads - just add me + // Now put the current thread back in its queue. + if (old_q->head_ == 0) // No other threads - just add me { - this->head_ = &my_entry; - this->tail_ = &my_entry; + old_q->head_ = &my_entry; + old_q->tail_ = &my_entry; } else if (requeue_position == -1) // Insert at the end of the queue. { - this->tail_->next_ = &my_entry; - this->tail_ = &my_entry; + old_q->tail_->next_ = &my_entry; + old_q->tail_ = &my_entry; } else if (requeue_position == 0) // Insert at head of queue. { - my_entry.next_ = this->head_; - this->head_ = &my_entry; + my_entry.next_ = old_q->head_; + old_q->head_ = &my_entry; } else // Insert in the middle of the queue somewhere. { - ACE_Token::ACE_Queue_Entry *insert_after = this->head_; + ACE_Token::ACE_Token_Queue_Entry *insert_after = old_q->head_; // Determine where our thread should go in the queue of // waiters. @@ -283,14 +316,13 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout) my_entry.next_ = insert_after->next_; if (my_entry.next_ == 0) - this->tail_ = &my_entry; + old_q->tail_ = &my_entry; insert_after->next_ = &my_entry; } // Sleep until we've got the token (ignore signals). - - while (my_entry.cv_.wait (timeout) == -1) + while (my_entry.wait (0, this->lock_) == -1) { // Note, this should obey whatever thread-specific // interrupt policy is currently in place... @@ -303,7 +335,7 @@ ACE_Token::renew (int requeue_position, ACE_Time_Value *timeout) #endif /* DEBUGGING */ // We come here if a timeout occurs or // some serious ACE_Condition object error. - this->remove_entry (&my_entry); + old_q->remove_entry (&my_entry); return -1; } @@ -333,21 +365,35 @@ ACE_Token::release (void) --this->nesting_level_; else { - if (this->head_ == 0) + if (this->writers_.head_ == 0 && this->readers_.head_ == 0) this->in_use_ = 0; // No more waiters... else { - this->owner_ = this->head_->thread_id_; + ACE_Token_Queue *queue; + + if (this->writers_.head_ != 0) + // Writer threads get priority to run first. + { + queue = &this->writers_; + this->in_use_ = ACE_Token::WRITE_TOKEN; + } + else + { + queue = &this->readers_; + this->in_use_ = ACE_Token::READ_TOKEN; + } + + this->owner_ = queue->head_->thread_id_; --this->waiters_; // Wake up waiter and make it runable. - this->head_->cv_.signal (); - this->head_->runable_ = 1; + queue->head_->runable_ = 1; + queue->head_->signal (); - this->head_ = this->head_->next_; + queue->head_ = queue->head_->next_; - if (this->head_ == 0) - this->tail_ = 0; + if (queue->head_ == 0) + queue->tail_ = 0; } } return 0; diff --git a/ace/Token.h b/ace/Token.h index 0364eaf4082..653af1c6f4e 100644 --- a/ace/Token.h +++ b/ace/Token.h @@ -22,6 +22,11 @@ #if defined (ACE_HAS_THREADS) +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || defined (VXWORKS) || defined (ACE_PSOS) +// If platforms support semaphores with timed wait, then we use semaphores instead of c.v. +# define ACE_TOKEN_USES_SEMAPHORE +#endif /* (ACE_WIN32 && !ACE_HAS_WINCE) || VXWORKS || ACE_PSOS */ + class ACE_Export ACE_Token { // = TITLE @@ -125,39 +130,68 @@ public: // Declare the dynamic allocation hooks. private: + enum ACE_Token_Op_Type + { + READ_TOKEN = 1, + WRITE_TOKEN + }; + // = The following structure implements a ACE_FIFO of waiter threads // that are asleep waiting to obtain the token. - struct ACE_Queue_Entry + struct ACE_Token_Queue_Entry { - ACE_Queue_Entry (ACE_Thread_Mutex &m, ACE_thread_t t_id); + ACE_Token_Queue_Entry (ACE_Thread_Mutex &m, ACE_thread_t t_id); - ACE_Queue_Entry *next_; + int wait (ACE_Time_Value *timeout, ACE_Thread_Mutex &lock); + // Entry blocks on the token. + + int signal (void); + // Notify (unblock) the entry. + + ACE_Token_Queue_Entry *next_; // Pointer to next waiter. ACE_thread_t thread_id_; // ACE_Thread id of this waiter. +#if defined (ACE_TOKEN_USES_SEMAPHORE) + ACE_Semaphore cv_; + // ACE_Semaphore object used to wake up waiter when it can run again. +#else ACE_Condition_Thread_Mutex cv_; // ACE_Condition object used to wake up waiter when it can run again. +#endif /* ACE_TOKEN_USES_SEMAPHORE */ int runable_; // Ok to run. }; + struct ACE_Token_Queue + { + ACE_Token_Queue (void); + + void remove_entry (ACE_Token_Queue_Entry *); + // Remove a waiter from the queue (used when a timeout occurs). + + ACE_Token_Queue_Entry *head_; + // Head of the list of waiting threads. + + ACE_Token_Queue_Entry *tail_; + // Tail of the list of waiting threads. + }; + int shared_acquire (void (*sleep_hook_func)(void *), void *arg, - ACE_Time_Value *timeout); + ACE_Time_Value *timeout, + ACE_Token_Op_Type op_type); // Implements the <acquire> and <tryacquire> methods above. - void remove_entry (ACE_Queue_Entry *); - // Remove a waiter from the queue (used when a timeout occurs). - - ACE_Queue_Entry *head_; - // Head of the list of waiting threads. + ACE_Token_Queue writers_; + // A queue of writer threads. - ACE_Queue_Entry *tail_; - // Tail of the list of waiting threads. + ACE_Token_Queue readers_; + // A queue of reader threads. ACE_Thread_Mutex lock_; // ACE_Thread_Mutex used to lock internal data structures. diff --git a/ace/Token.i b/ace/Token.i index 44e7b506f5f..9a61b19f176 100644 --- a/ace/Token.i +++ b/ace/Token.i @@ -16,7 +16,7 @@ ACE_Token::tryacquire (void) { ACE_TRACE ("ACE_Token::tryacquire"); return this->shared_acquire - (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero); + (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::WRITE_TOKEN); } ACE_INLINE int @@ -42,27 +42,57 @@ ACE_INLINE int ACE_Token::acquire_read (void) { ACE_TRACE ("ACE_Token::acquire_read"); - return this->acquire (); + return this->shared_acquire + (0, 0, 0, ACE_Token::READ_TOKEN); } ACE_INLINE int ACE_Token::acquire_write (void) { ACE_TRACE ("ACE_Token::acquire_write"); - return this->acquire (); + return this->shared_acquire + (0, 0, 0, ACE_Token::WRITE_TOKEN); } ACE_INLINE int ACE_Token::tryacquire_read (void) { ACE_TRACE ("ACE_Token::tryacquire_read"); - return this->tryacquire (); + return this->shared_acquire + (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::READ_TOKEN); } ACE_INLINE int ACE_Token::tryacquire_write (void) { ACE_TRACE ("ACE_Token::tryacquire_write"); - return this->tryacquire (); + return this->shared_acquire + (0, 0, (ACE_Time_Value *) &ACE_Time_Value::zero, ACE_Token::WRITE_TOKEN); } +ACE_INLINE int +ACE_Token::ACE_Token_Queue_Entry::wait (ACE_Time_Value *timeout, ACE_Thread_Mutex &lock) +{ +#if defined (ACE_TOKEN_USES_SEMAPHORE) + lock.release (); + int retv = (timeout == 0 ? + this->cv_.acquire () : + this->cv_.acquire (*timeout)); + lock.acquire (); + return retv; +#else + ACE_UNUSED_ARG (lock); + return this->cv_.wait (timeout); +#endif /* ACE_TOKEN_USES_SEMAPHORE */ +} + +ACE_INLINE int +ACE_Token::ACE_Token_Queue_Entry::signal (void) +{ + return +#if defined (ACE_TOKEN_USES_SEMAPHORE) + this->cv_.release (); +#else + this->cv_.signal (); +#endif /* ACE_TOKEN_USES_SEMAPHORE */ +} |