diff options
author | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
---|---|---|
committer | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
commit | a5fdebc5f6375078ec1763850a4ca23ec7fe6458 (patch) | |
tree | bcf0a25c3d45a209a6e3ac37b233a4812f29c732 /ace/Local_Tokens.cpp | |
download | ATCD-a5fdebc5f6375078ec1763850a4ca23ec7fe6458.tar.gz |
Initial revision
Diffstat (limited to 'ace/Local_Tokens.cpp')
-rw-r--r-- | ace/Local_Tokens.cpp | 1408 |
1 files changed, 1408 insertions, 0 deletions
diff --git a/ace/Local_Tokens.cpp b/ace/Local_Tokens.cpp new file mode 100644 index 00000000000..9b39c22f451 --- /dev/null +++ b/ace/Local_Tokens.cpp @@ -0,0 +1,1408 @@ +// Local_Tokens.cpp +// $Id$ + +#define ACE_BUILD_DLL +#include "ace/Log_Msg.h" +#include "ace/Thread.h" +#include "ace/Local_Tokens.h" +#include "ace/Token_Manager.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Local_Tokens.i" +#endif /* __ACE_INLINE__ */ + +void +ACE_Tokens::dump (void) const +{ + ACE_TRACE ("ACE_Tokens::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_Tokens::dump:\n" + " reference_cont_ = %d\n" + " token_name_ = %s\n", + reference_count_, token_name_)); + ACE_DEBUG ((LM_DEBUG, "waiters_\n")); + this->waiters_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_Tokens::ACE_Tokens (void) + : reference_count_ (0), + visited_ (0) +{ + ACE_TRACE ("ACE_Tokens::ACE_Tokens"); +} + +void +ACE_Tokens::make_owner (ACE_TPQ_Entry *caller) +{ + this->waiters_.remove (caller); + this->waiters_.enqueue (caller, 0); +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + + +void +ACE_TPQ_Entry::dump (void) const +{ + ACE_TRACE ("ACE_TPQ_Entry::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + "ACE_TPQ_Entry::dump:\n" + " nesting_level_ = %d\n" + " client_id_ = %s\n", + nesting_level_, + client_id_)); + + if (next_ != 0) + { + ACE_DEBUG ((LM_DEBUG, "next:.\n")); + next_->dump (); + } + + ACE_DEBUG ((LM_DEBUG, "ACE_TPQ_Entry::dump end.\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_TPQ_Entry::ACE_TPQ_Entry (const ACE_Token_Proxy *new_proxy, + const char *client_id) + : cond_var_ (lock_), + // This const typecast is safe. + proxy_ ((ACE_Token_Proxy *) new_proxy), + nesting_level_ (0), + sleep_hook_ (0), + next_ (0) +{ + ACE_TRACE ("ACE_TPQ_Entry::ACE_TPQ_Entry"); + + if (client_id != 0) + this->client_id (client_id); + else + { + // Just make sure we have enough space. + char host_name[MAXHOSTNAMELEN]; + char name[sizeof host_name + 256]; + ACE_OS::hostname (host_name, sizeof host_name); + + ACE_OS::sprintf (name, + "/%s/%u/%u", + host_name, + ACE_OS::getpid (), + ACE_Thread::self ()); + + this->client_id (name); + } +} + +ACE_TPQ_Entry::ACE_TPQ_Entry (void) + : proxy_ (0), + nesting_level_ (0), + sleep_hook_ (0), + cond_var_ (lock_) +{ + ACE_TRACE ("ACE_TPQ_Entry::ACE_TPQ_Entry null const."); +} + +ACE_TPQ_Entry::ACE_TPQ_Entry (const ACE_TPQ_Entry &rhs) +: cond_var_ (lock_) +{ + ACE_TRACE ("ACE_TPQ_Entry::ACE_TPQ_Entry copy const."); + *this = rhs; +} + +ACE_TPQ_Entry::~ACE_TPQ_Entry (void) +{ + ACE_TRACE ("ACE_TPQ_Entry::~ACE_TPQ_Entry"); +} + +void +ACE_TPQ_Entry::operator= (const ACE_TPQ_Entry& rhs) +{ + ACE_TRACE ("ACE_TPQ_Entry::operator="); + if (&rhs == this) + return; + this->proxy_ = rhs.proxy (); + this->nesting_level_ = rhs.nesting_level (); + this->client_id (rhs.client_id ()); + this->sleep_hook_ = rhs.sleep_hook (); +} + +void +ACE_TPQ_Entry::client_id (const char *id) +{ + ACE_TRACE ("ACE_TPQ_Entry::client_id"); + + if (id == 0) + return; + + int n = ACE_OS::strlen (id) + 1; + + if (n >= ACE_MAXCLIENTIDLEN) + n = ACE_MAXCLIENTIDLEN - 1; + + ACE_OS::strncpy (this->client_id_, (char *) id, n); + this->client_id_[ACE_MAXCLIENTIDLEN - 1] = '\0'; +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + +void +ACE_TSS_TPQ_Entry::dump (void) const +{ + ACE_TRACE ("ACE_TSS_TPQ_Entry::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_TSS_TPQ_Entry::dump:\n" + " client_id_ = %s\n", + client_id_ == 0 ? "0" : client_id_)); + ACE_DEBUG ((LM_DEBUG, "base:\n")); + ACE_TSS<ACE_TPQ_Entry>::dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_TSS_TPQ_Entry::ACE_TSS_TPQ_Entry (const ACE_Token_Proxy *proxy, + const char *client_id) +: proxy_ (proxy), + client_id_ (client_id) +{ + ACE_TRACE ("ACE_TSS_TPQ_Entry::ACE_TSS_TPQ_Entry"); +} + +ACE_TPQ_Entry * +ACE_TSS_TPQ_Entry::make_TSS_TYPE (void) const +{ + ACE_TRACE ("ACE_TSS_TPQ_Entry::make_TSS_TYPE"); + ACE_TPQ_Entry *temp; + + ACE_NEW_RETURN (temp, ACE_TPQ_Entry (this->proxy_, this->client_id_), 0); + return temp; +} + +ACE_TSS_TPQ_Entry::operator ACE_TPQ_Entry * (void) +{ + ACE_TRACE ("ACE_TSS_TPQ_Entry::operator"); + return (ACE_TPQ_Entry *) (*((ACE_TSS<ACE_TPQ_Entry> *) this)); +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + +ACE_TPQ_Iterator::ACE_TPQ_Iterator (ACE_Token_Proxy_Queue &q) +: current_ (q.head_) +{ + ACE_TRACE ("ACE_TPQ_Iterator::ACE_TPQ_Iterator"); +} + +int +ACE_TPQ_Iterator::next (ACE_TPQ_Entry *&next_item) +{ + ACE_TRACE ("ACE_TPQ_Iterator::next"); + + next_item = current_; + + if (current_ == 0) + return 0; + + return 1; +} + +void +ACE_TPQ_Iterator::advance (void) +{ + ACE_TRACE ("ACE_TPQ_Iterator::advance"); + if (current_ != 0) + current_ = current_->next_; +} + +void +ACE_TPQ_Iterator::dump (void) const +{ + ACE_TRACE ("ACE_TPQ_Iterator::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_TPQ_Iterator::dump:\n" + " current_ = %d\n", + (long) current_)); + ACE_DEBUG ((LM_DEBUG, "head_ and tail_\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + +void +ACE_Token_Proxy_Queue::dump (void) const +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_Token_Proxy_Queue::dump:\n" + " size_ = %d\n", + size_)); + ACE_DEBUG ((LM_DEBUG, "head_ and tail_\n")); + if (this->head_ != 0) + this->head_->dump (); + + ACE_DEBUG ((LM_DEBUG, "ACE_Token_Proxy_Queue::dump end.\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_Token_Proxy_Queue::ACE_Token_Proxy_Queue (void) + : head_ (0), + tail_ (0), + size_ (0) +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::ACE_Token_Proxy_Queue"); +} + +void +ACE_Token_Proxy_Queue::enqueue (ACE_TPQ_Entry *tpq, + int position) +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::enqueue"); + tpq->next_ = 0; + + ++this->size_; + + if (this->head_ == 0) + { + // make tpq the entire list + this->head_ = this->tail_ = tpq; + return; + } + + if (position == 0) + { + // make head of list + tpq->next_ = this->head_; + this->head_ = tpq; + return; + } + + if (position == -1) + { + // stick at back of list + this->tail_->next_ = tpq; + this->tail_ = tpq; + return; + } + + // walk through list to insertion point + ACE_TPQ_Entry *temp = head_; + + for (int x = position; x > 1; --x) + { + // end of queue? + if (temp->next_ == 0) + break; + // advance pointer + else + temp = temp->next_; + } + + // insert new tpq after temp + tpq->next_ = temp->next_; + temp->next_ = tpq; +} + +void +ACE_Token_Proxy_Queue::dequeue (void) +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::dequeue"); + + if (head_ == 0) + return; + + ACE_TPQ_Entry *temp = this->head_; + + this->head_ = this->head_->next_; + + temp->next_ = 0; + + --this->size_; + + if (this->head_ == 0 && this->size_ != 0) + ACE_ERROR ((LM_ERROR, "incorrect size = %d\n", this->size_)); +} + +/* +int +ACE_Token_Proxy_Queue::member (const char *id) +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::member"); + + for (ACE_TPQ_Entry *temp = this->head_; temp != 0; temp = temp->next_) + if (ACE_OS::strcmp (temp->client_id (), id) == 0) + // We found it! + return 1; + + // We didn't find it :-( + return 0; +} +*/ + +void +ACE_Token_Proxy_Queue::remove (const ACE_TPQ_Entry *remove_me) +{ + ACE_TRACE ("ACE_Token_Proxy_Queue::remove"); + // sanity + if ((remove_me == 0) || (this->head_ == 0)) + return; + + // is it the head? + if (this->head_ == remove_me) // pointer comparison. + { + this->head_ = this->head_->next_; + if (this->head_ == 0) + this->tail_ = 0; + + --this->size_; + return; + } + + ACE_TPQ_Entry *temp = this->head_; + ACE_TPQ_Entry *previous = 0; + + // is it in the middle or tail? + while (temp != 0) + { + if (temp == remove_me) + { + // previous should never be null since the first if + // conditional should always be false + previous->next_ = temp->next_; + // is it the tail? + if (this->tail_ == temp) + this->tail_ = previous; + + --this->size_; + return; + } + + previous = temp; + temp = temp->next_; + } + + // it wasn't in the list. + return; +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + +void +ACE_Mutex_Token::dump (void) const +{ + ACE_TRACE ("ACE_Mutex_Token::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_Mutex_Token::dump:\n")); + ACE_DEBUG ((LM_DEBUG, "lock_\n")); + lock_.dump (); + ACE_DEBUG ((LM_DEBUG, "base:\n")); + ACE_Tokens::dump (); + ACE_DEBUG ((LM_DEBUG, "ACE_Mutex_Token::dump end.\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_Mutex_Token::ACE_Mutex_Token (const char *name) +{ + ACE_TRACE ("ACE_Mutex_Token::ACE_Mutex_Token"); + + int n = ACE_OS::strlen (name) + 1; // + 1 for \0 + + if (n > ACE_MAXTOKENNAMELEN) + n = ACE_MAXTOKENNAMELEN - 1; + + ACE_OS::strncpy (this->token_name_, name, n); + this->token_name_[ACE_MAXTOKENNAMELEN - 1] = '\0'; +} + +ACE_Mutex_Token::~ACE_Mutex_Token (void) +{ + ACE_TRACE ("ACE_Mutex_Token::~ACE_Mutex_Token"); +} + +int +ACE_Mutex_Token::acquire (ACE_TPQ_Entry *caller, + int ignore_deadlock, + int notify) +{ + ACE_TRACE ("ACE_Mutex_Token::acquire"); + // We need to acquire two locks. This one to ensure that only one + // thread uses this token at a time. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon1, this->lock_, -1); + // This one to ensure an atomic transaction across all tokens. Note + // that this order is crucial too. It's resource coloring for other + // threads which may be calling this same token. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon2, ACE_Token_Manager::instance ()->mutex (), -1); + + // Does _anyone_ own the token? + if (this->owner () == 0) + { + // there are no waiters, so queue as the first waiter (the owner.) + this->waiters_.enqueue (caller, -1); + return 0; // success + } + + // Does the caller already own it? + if (this->is_owner (caller->client_id ())) + { + // Recursive acquisition. + caller->nesting_level (1); + return 0; // success + } + + // Check for deadlock. + if (!ignore_deadlock + && ACE_Token_Manager::instance ()->check_deadlock (caller->proxy ()) == 1) + { + errno = EDEADLK; + ACE_RETURN (-1); + } + + // Someone owns it. Sorry, you're getting queued up at the end of + // the waiter queue. + this->waiters_.enqueue (caller, -1); + + if (notify) + this->owner ()->call_sleep_hook (); + + errno = EWOULDBLOCK; + ACE_RETURN (-1); +} + +int +ACE_Mutex_Token::tryacquire (ACE_TPQ_Entry *caller) +{ + ACE_TRACE ("ACE_Mutex_Token::tryacquire"); + // We need to acquire two locks. This one to ensure that only one + // thread uses this token at a time. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon1, this->lock_, -1); + // This one to ensure an atomic transaction across all tokens. Note + // that this order is crucial too. It's resource coloring for other + // threads which may be calling this same token. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon2, ACE_Token_Manager::instance ()->mutex (), -1); + + // Does _anyone_ own the token? + if (this->owner () == 0) + { + this->waiters_.enqueue (caller, -1); + return 0; // success + } + // Does the caller already own it? + if (this->is_owner (caller->client_id ())) + { + // recursive acquisition + caller->nesting_level (1); + return 0; // success + } + else + // Someone owns it. Fail. + { + errno = EWOULDBLOCK; + ACE_RETURN (-1); + } +} + +int +ACE_Mutex_Token::renew (ACE_TPQ_Entry *caller, + int requeue_position) +{ + ACE_TRACE ("ACE_Mutex_Token::renew"); + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon, this->lock_, -1); + + // Verify that the caller is the owner. + if (this->is_owner (caller->client_id ()) == 0) + { + errno = EACCES; + ACE_RETURN (-1); + } + + // The caller is the owner, so check to see if there are any + // waiters. If not, we just keep the token. == 1 means that there + // is only the owner. + if (this->waiters_.size () == 1 || requeue_position == 0) + return 0; + + // Requeue the caller. + this->waiters_.dequeue (); + + this->waiters_.enqueue (caller, requeue_position); + + // Notify new owner. + if (this->owner () != 0) + this->owner ()->proxy ()->token_acquired (this->owner ()); + + // Tell the caller that the operation would block. + errno = EWOULDBLOCK; + ACE_RETURN (-1); +} + +// Release the current holder of the token (which had +// better be the caller's thread!). + +int +ACE_Mutex_Token::release (ACE_TPQ_Entry *caller) +{ + ACE_TRACE ("ACE_Mutex_Token::release"); + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon, this->lock_, -1); + + // Does anyone own the token? + if (this->owner () == 0) + { + errno = EACCES; + ACE_RETURN (-1); + } + + // Is the caller the owner. + if (this->is_owner (caller->client_id ())) + { + // Check the nesting level. + if (caller->nesting_level () > 0) + caller->nesting_level (-1); + else + { + this->waiters_.dequeue (); + // Notify new owner. + if (this->owner () != 0) + this->owner ()->proxy ()->token_acquired (this->owner ()); + } + } + else + this->remove (caller); + + return 0; +} + +int +ACE_Mutex_Token::owners (OWNER_STACK &stack, + const char *id) +{ + ACE_TRACE ("ACE_Mutex_Token::owners"); + if (this->owner () != 0) + { + stack.push (this->owner ()); + // If an <id> is specified, return whether it is the owner being + // returned. + if (id != 0) + return this->owner ()->equal_client_id (id); + } + + return 0; +} + +int +ACE_Mutex_Token::is_waiting_for (const char *id) +{ + ACE_TRACE ("ACE_Mutex_Token::is_waiting_for"); + // If there is no owner, or <id> is the owner, return false. + if ((this->owner () == 0) || this->is_owner (id)) + return 0; + + // Step through each waiter looking for <id>. + ACE_TPQ_Iterator iterator (waiters_); + iterator.advance (); + for (ACE_TPQ_Entry *temp = 0; + iterator.next (temp) != 0; + iterator.advance ()) + { + if (temp->equal_client_id (id)) + return 1; + } + + return 0; +} + +int +ACE_Mutex_Token::is_owner (const char *id) +{ + ACE_TRACE ("ACE_Mutex_Token::is_owner"); + // If there is an owner, return whether it is <id>. + if ((this->owner () != 0) && + this->owner ()->equal_client_id (id)) + return 1; + else + return 0; +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ + +void +ACE_RW_Token::dump (void) const +{ + ACE_TRACE ("ACE_RW_Token::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_RW_Token::dump:\n" + "num_writers_ = %d\n", num_writers_)); + ACE_DEBUG ((LM_DEBUG, "lock_\n")); + this->lock_.dump (); + ACE_DEBUG ((LM_DEBUG, "base:\n")); + ACE_Tokens::dump (); + ACE_DEBUG ((LM_DEBUG, "ACE_RW_Token::dump end.\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +ACE_RW_Token::ACE_RW_Token (const char *name) +: num_writers_ (0) +{ + ACE_TRACE ("ACE_RW_Token::ACE_RW_Token"); + + int n = ACE_OS::strlen (name) + 1; // + 1 for \0 + + if (n > ACE_MAXTOKENNAMELEN) + n = ACE_MAXTOKENNAMELEN; + + ACE_OS::strncpy (this->token_name_, name, n); + this->token_name_[ACE_MAXTOKENNAMELEN - 1] = '\0'; +} + +ACE_RW_Token::~ACE_RW_Token (void) +{ + ACE_TRACE ("ACE_RW_Token::~ACE_RW_Token"); +} + +int +ACE_RW_Token::acquire (ACE_TPQ_Entry *caller, + int ignore_deadlock, + int notify) +{ + ACE_TRACE ("ACE_RW_Token::acquire"); + // We need to acquire two locks. This one to ensure that only one + // thread uses this token at a time. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon1, this->lock_, -1); + // This one to ensure an atomic transaction across all tokens. Note + // that this order is crucial too. It's resource coloring for other + // threads which may be calling this same token. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon2, ACE_Token_Manager::instance ()->mutex (), -1); + + if (caller->proxy ()->type () == ACE_RW_Token::WRITER) + this->num_writers_++; + + // Does _anyone_ own the token? + if (this->owner () == 0) + { + // There are no waiters, so queue as the first waiter (the owner). + this->waiters_.enqueue (caller, -1); + return 0; + } + + // Check for recursive acquisition. + if (this->is_owner (caller->client_id ())) + { + caller->nesting_level (1); + return 0; // Success. + } + + // Reader. + if (caller->proxy ()->type () == ACE_RW_Token::READER) + { + // Are there any writers? + if (this->num_writers_ == 0) + { + // Queue the caller at the end of the queue. + this->waiters_.enqueue (caller, -1); + return 0; + } + // Else failure. + } + + // Failure code. + + // Check for deadlock. + if (!ignore_deadlock && + ACE_Token_Manager::instance ()->check_deadlock (caller->proxy ()) == 1) + { + if (caller->proxy ()->type () == ACE_RW_Token::WRITER) + this->num_writers_--; + errno = EDEADLK; + ACE_RETURN (-1); + } + + // Queue the caller at the end of the queue. + this->waiters_.enqueue (caller, -1); + + if (notify) + { + // If it's a writer, just notify it. + if (this->owner ()->proxy ()->type () == ACE_RW_Token::WRITER) + this->owner ()->call_sleep_hook (); + else + { + // Call back all reader owners. + ACE_TPQ_Entry *temp = this->owner (); + do + { + temp->call_sleep_hook (); + temp = temp->next_; + } + while (temp != 0 && + temp->proxy ()->type () == ACE_RW_Token::READER); + } + } + + errno = EWOULDBLOCK; + ACE_RETURN (-1); +} + + +int +ACE_RW_Token::tryacquire (ACE_TPQ_Entry *caller) +{ + ACE_TRACE ("ACE_RW_Token::tryacquire"); + // We need to acquire two locks. This one to ensure that only one + // thread uses this token at a time. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon1, this->lock_, -1); + // This one to ensure an atomic transaction across all tokens. Note + // that this order is crucial too. It's resource coloring for other + // threads which may be calling this same token. + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon2, ACE_Token_Manager::instance ()->mutex (), -1); + + if (caller->proxy ()->type () == ACE_RW_Token::WRITER) + { + this->num_writers_++; + } + + // Does _anyone_ own the token? + if (this->owner () == 0) + { + // There are no waiters, so queue as the first waiter (the owner). + this->waiters_.enqueue (caller, -1); + return 0; + } + + // Check for recursive acquisition. + if (this->is_owner (caller->client_id ())) + { + caller->nesting_level (1); + return 0; // Success. + } + + // Reader. + if (caller->proxy ()->type () == ACE_RW_Token::READER) + { + // Are there any writers? + if (this->num_writers_ == 0) + { + // queue the caller at the end of the queue. + this->waiters_.enqueue (caller, -1); + return 0; + } + // Else, fail. + } + else // Writer. + // We're going to fail, so decrement the num_writers. + { + this->num_writers_--; + } + + + errno = EWOULDBLOCK; + ACE_RETURN (-1); +} + +int +ACE_RW_Token::renew (ACE_TPQ_Entry *caller, + int requeue_position) +{ + ACE_TRACE ("ACE_RW_Token::renew"); + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon, this->lock_, -1); + + // Werify that the caller is the owner + if (this->is_owner (caller->client_id ()) == 0) + { + errno = EACCES; + ACE_RETURN (-1); + } + + // The caller is the owner, so check to see if there are any + // waiters. If not, we just keep the token. + if (this->waiters_.size () == 1 || requeue_position == 0) + return 0; + + // There are waiters, so remove the caller. + this->remove (caller); + + // Requeue the caller. + this->waiters_.enqueue (caller, requeue_position); + + if (caller->proxy ()->type () == ACE_RW_Token::READER) + { + // If the caller got queued before any writers, the caller is + // still the owner. + if (this->is_owner (caller->client_id ())) + return 0; // success + // else fallthrough and return would block. + } + // Writers will always have to block since waiters_.size () == 1 or + // requeue_position == 0. + + // Get a new owner. + this->notify_new_owner (caller); + + // Tell the caller that the operation would block. + errno = EWOULDBLOCK; + ACE_RETURN (-1); +} + +int +ACE_RW_Token::release (ACE_TPQ_Entry *caller) +{ + ACE_TRACE ("ACE_RW_Token::release"); + ACE_GUARD_RETURN (ACE_TOKEN_CONST::MUTEX, ace_mon, this->lock_, -1); + + // Check for errors. + if ((this->owner () == 0) || + (this->is_owner (caller->client_id ()) == 0)) + { + errno = EACCES; + ACE_RETURN (-1); + } + + if (caller->proxy ()->type () == ACE_RW_Token::WRITER) + num_writers_--; + + // Recursive release. + if (caller->nesting_level () > 0) + { + caller->nesting_level (-1); + return 0; + } + + // Remove the caller and notify the new owner(s). + this->remove (caller); + this->notify_new_owner (caller); + + return 0; +} + +void +ACE_RW_Token::notify_new_owner (ACE_TPQ_Entry *old_owner) +{ + ACE_TRACE ("ACE_RW_Token::new_owner"); + + if (this->owner () == 0) + return; + + if (this->owner ()->proxy ()->type () == ACE_RW_Token::READER) + { + if (old_owner->proxy ()->type () == ACE_RW_Token::READER) + // the owners already know that they're owners + return; + + // The current owner is a reader and the previous owner was a + // writer, so notify all waiting readers up to the first writer. + // call back all reader owners. + ACE_TPQ_Iterator iterator (waiters_); + for (ACE_TPQ_Entry *temp = 0; + iterator.next (temp) != 0; + iterator.advance ()) + { + if (temp->proxy ()->type () == WRITER) + // We've gone through all the readers. + break; + + temp->proxy ()->token_acquired (temp); + } + } + else // writer + this->owner ()->proxy ()->token_acquired (this->owner ()); +} + + +int +ACE_RW_Token::owners (OWNER_STACK &stack, + const char *id) +{ + ACE_TRACE ("ACE_RW_Token::owners"); + + if (this->owner () == 0) + return 0; + + int id_is_owner = 0; + + // The first waiter is a writer, so there is only one owner. + if (this->owner ()->proxy ()->type () == WRITER) + { + stack.push (this->owner ()); + // If an <id> is specified, return whether it is the owner being + // returned. + if ((id != 0) && + (ACE_OS::strcmp (id, this->owner ()->client_id ()) == 0)) + id_is_owner = 1; + } + // The first waiter is a reader, so there can be multiple owning + // readers. + else + { + ACE_TPQ_Iterator iterator (waiters_); + for (ACE_TPQ_Entry *temp = 0; + iterator.next (temp) != 0; + iterator.advance ()) + { + if (temp->proxy ()->type () == WRITER) + // We've gone through all the readers. + break; + + stack.push (temp); + + if (!id_is_owner && (id != 0) && + (ACE_OS::strcmp (id, temp->client_id ()) == 0)) + id_is_owner = 1; + } + } + + return id_is_owner; +} + +int +ACE_RW_Token::is_waiting_for (const char *id) +{ + ACE_TRACE ("ACE_RW_Token::is_waiting_for"); + // If there is no owner, or <id> is the owner, return false. + if ((this->owner () == 0) || + this->is_owner (id)) + return 0; + + // Step through each waiter looking for <id>. + ACE_TPQ_Iterator iterator (waiters_); + iterator.advance (); + for (ACE_TPQ_Entry *temp = 0; + iterator.next (temp) != 0; + iterator.advance ()) + { + if (temp->equal_client_id (id)) + return 1; + } + + return 0; +} + +int +ACE_RW_Token::is_owner (const char *id) +{ + ACE_TRACE ("ACE_Mutex_Token::is_owner"); + // If there is no owner, return false. + if (this->owner () == 0) + return 0; + + // A writer owns us. + if (this->owner ()->proxy ()->type () == ACE_RW_Token::WRITER) + return this->owner ()->equal_client_id (id); + + // Readers own us. + // Step through each owning reader looking for <id>. + ACE_TPQ_Iterator iterator (waiters_); + for (ACE_TPQ_Entry *temp = 0; + iterator.next (temp) != 0; + iterator.advance ()) + { + if (temp->proxy ()->type () != ACE_RW_Token::READER) + break; + + if (temp->equal_client_id (id)) + return 1; + } + + return 0; +} + +// ************************************************************ +// ************************************************************ +// ************************************************************ +// 7.. + +void +ACE_Token_Proxy::dump (void) const +{ + ACE_TRACE ("ACE_Token_Proxy::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_Token_Proxy::dump:\n" + " type = %d\n" + " ignore_deadlock_ = %d\n" + " debug_ = %d\n", + (int) this->type (), ignore_deadlock_, debug_)); + ACE_DEBUG ((LM_DEBUG, "mutex_, and waiter_\n")); + + if (this->token_ != 0) + this->token_->dump (); + + this->waiter_.dump (); + ACE_DEBUG ((LM_DEBUG, "ACE_Token_Proxy::dump end.\n")); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +const char* +ACE_Token_Proxy::client_id (void) const +{ + ACE_TRACE ("ACE_Token_Proxy::client_id"); + // Thread-specific. + const char *id = this->waiter_->client_id (); + + if (id == 0) + return "ERROR NO CLIENT ID"; + else + return id; +} + +void +ACE_Token_Proxy::client_id (const char *client_id) +{ + ACE_TRACE ("ACE_Token_Proxy::client_id"); + this->waiter_->client_id (client_id); +} + +const char * +ACE_Token_Proxy::owner_id (void) +{ + ACE_TRACE ("ACE_Token_Proxy::owner_id"); + return this->token_->owner_id (); +} + +const char * +ACE_Token_Proxy::name (void) const +{ + ACE_TRACE ("ACE_Token_Proxy::owner_id"); + return this->token_->name (); +} + +ACE_Token_Proxy::ACE_Token_Proxy (void) +: token_ (0), + waiter_ (this, 0) +{ + ACE_TRACE ("ACE_Token_Proxy::ACE_Token_Proxy"); +} + +// Notice the token_ (0). Do *not* copy the token pointer. This must +// be obtained through the token manager. Also, we don't copy any +// waiter info. A copied Proxy does *not* inherit client_id. +ACE_Token_Proxy::ACE_Token_Proxy (const ACE_Token_Proxy &p) +: token_ (0), + waiter_ (this, 0) +{ + ACE_TRACE ("ACE_Token_Proxy::ACE_Token_Proxy"); +} + +// @@ should I do a mutex_->release ()? +ACE_Token_Proxy::~ACE_Token_Proxy (void) +{ + ACE_TRACE ("ACE_Local_Mutex::~ACE_Local_Mutex"); + + if (token_ != 0) + // notify token manager that we are done with it so it can + // free it if necessary + ACE_Token_Manager::instance ()->release_token (token_); +} + +int +ACE_Token_Proxy::open (const char *token_name, + int ignore_deadlock, + int debug) +{ + ACE_TRACE ("ACE_Token_Proxy::open"); + + // Store some parameters. + this->ignore_deadlock_ = ignore_deadlock; + this->debug_ = debug; + + // Used in case a name was not specified. + char name[BUFSIZ]; + + // We must have a name. + if (token_name == 0) + { + ACE_OS::sprintf (name, "token %d", this); + token_name = name; + } + + // Get or create the underlying token. The Token Manager will call + // us back to set token_. + ACE_Token_Manager::instance ()->get_token (this, token_name); + + // Check for failed get or failed new. + if (this->token_ == 0) + { + errno = ENOMEM; + ACE_ERROR_RETURN ((LM_ERROR, "Can't allocate mutex"), -1); + } + + return 0; +} + +int +ACE_Token_Proxy::acquire (int notify, + void (*sleep_hook)(void *), + ACE_Synch_Options &options) +{ + ACE_TRACE ("ACE_Token_Proxy::acquire"); + if (this->token_ == 0) + { + errno = ENOENT; + ACE_ERROR_RETURN ((LM_ERROR, "Not open.\n"), -1); + } + + // Make sure no one calls our token_acquired until we have a chance + // to sleep first! If after we call an EWOULDBLOCK + // mutex_->acquire() below, but before we enter handle_options to + // wait on the cond_var, a thread tries to give take us off the + // waiter queue and signal us, IT WILL FIRST HAVE TO ACQUIRE THIS + // cond_var.mutex (). _This_ is why we acquire it. + this->waiter_->cond_var_.mutex ().acquire (); + + this->waiter_->sleep_hook (sleep_hook); + + if (this->token_->acquire (this->waiter_, this->ignore_deadlock_, notify) == -1) + // acquire failed + { + switch (errno) + { + case EDEADLK : + if (!ignore_deadlock_) + { + waiter_->cond_var_.mutex ().release (); + errno = EDEADLK; + ACE_RETURN (-1); + } + // Else, fallthrough and block! + + case EWOULDBLOCK : + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, + "(%t) waiting for %s, owner is %s, " + "total waiters == %d\n", + this->name (), + this->token_->owner_id (), + token_->no_of_waiters ())); + + // no error, but would block, + // if error, return error (-1), otherwise, return whether we + // called the holder or not. + int return_value; + if (this->handle_options (options, waiter_->cond_var_) == -1) + return_value = -1; + else + return_value = notify == 1; + + errno = EWOULDBLOCK; + ACE_RETURN (return_value); + + default : + waiter_->cond_var_.mutex ().release (); + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n","Token Proxy acquire."), -1); + } + } + else + // we have the token + { + if (debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) acquired %s\n", + this->name ())); + waiter_->cond_var_.mutex ().release (); + } + + return 0; +} + +int +ACE_Token_Proxy::tryacquire (void (*sleep_hook)(void *)) +{ + ACE_TRACE ("ACE_Token_Proxy::tryacquire"); + if (this->token_ == 0) + { + errno = ENOENT; + ACE_ERROR_RETURN ((LM_ERROR, "Not open.\n"), -1); + } + + this->waiter_->sleep_hook (sleep_hook); + + return this->token_->tryacquire (waiter_); +} + +int +ACE_Token_Proxy::renew (int requeue_position, + ACE_Synch_Options &options) +{ + ACE_TRACE ("ACE_Token_Proxy::renew"); + if (this->token_ == 0) + { + errno = ENOENT; + ACE_ERROR_RETURN ((LM_ERROR, "Not open.\n"), -1); + } + + // Make sure no one calls our token_acquired until we have a chance + // to sleep first! + this->waiter_->cond_var_.mutex ().acquire (); + + if (this->token_->renew (this->waiter_, requeue_position) == -1) + { + // check for error + if (errno != EWOULDBLOCK) + ACE_ERROR_RETURN ((LM_ERROR, + "%p renew failed\n", "ACE_Token_Proxy"), -1); + + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) renew blocking for %s, owner is %s\n", + this->name (), + token_->owner_id ())); + + // no error, but would block, so block or return + return this->handle_options (options, waiter_->cond_var_); + } + else + // we have the token + { + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) renewed %s\n", + this->name ())); + waiter_->cond_var_.mutex ().release (); + return 0; + } +} + +int +ACE_Token_Proxy::handle_options (ACE_Synch_Options &options, + ACE_TOKEN_CONST::COND_VAR &cv) +{ + // Some operation failed with EWOULDBLOCK. + ACE_TRACE ("ACE_Token_Proxy::handle_options"); + + if (options[ACE_Synch_Options::USE_REACTOR] == 1) + // Asynchronous. + { + int error = errno; + // if (options[ACE_Synch_Options::USE_TIMEOUT] == 1) + // ACE_ERROR_RETURN ((LM_ERROR, "Timeouts not yet supported" " + // with asynchronous operations."), -1); + cv.mutex ().release (); + errno = error; + ACE_RETURN (-1); + } + else + // Synchronous. + { + // Block on condition variable. + while (cv.wait ((ACE_Time_Value *) options.time_value ()) == -1) + { + // Note, this should obey whatever thread-specific + // interrupt policy is currently in place... + if (errno == EINTR) + continue; + // We come here if a timeout occurs or some serious + // ACE_Condition object error. + cv.mutex ().release (); + ACE_ERROR_RETURN ((LM_ERROR, "condition variable wait" + " bombed."), -1); + } + + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) unblocking.\n", + this->client_id ())); + cv.mutex ().release (); + return 0; // operation succeeded + } +} + +int +ACE_Token_Proxy::release (ACE_Synch_Options &options) +{ + ACE_TRACE ("ACE_Token_Proxy::release"); + if (this->token_ == 0) + { + errno = ENOENT; + if (debug_) + ACE_DEBUG ((LM_DEBUG, "Must open before releasing.\n")); + ACE_RETURN (-1); + } + + if (this->token_->release (waiter_) != 0) + { + // Release failed. + this->token_->remove (this->waiter_); + if (debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) %p.\n", "release failed")); + return -1; + } + else + { + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) released %s, owner is %s\n", + this->name (), + token_->owner_id ())); + + return 0; + } +} + +int +ACE_Token_Proxy::remove (ACE_Synch_Options &) +{ + ACE_TRACE ("ACE_Token_Proxy::remove"); + return 0; +} + +void +ACE_Token_Proxy::sleep_hook (void) +{ + ACE_TRACE ("ACE_Token_Proxy::sleep_hook"); + // Somebody wants our token! (Let'em wait...) + return; +} + +void +ACE_Token_Proxy::token_acquired (ACE_TPQ_Entry *e) +{ + ACE_TRACE ("ACE_Token_Proxy::token_acquired"); + e->cond_var_.mutex ().acquire (); + // We've been taken off the waiters list and given the token! + // This implementation signals the internal condition + // variable. Thus, if asynchronous acquires are used, this must be + // overriden to do something more useful! + e->cond_var_.signal (); + e->cond_var_.mutex ().release (); + + return; +} + +// ************************************************************ + +ACE_Token_Name::ACE_Token_Name (const char *token_name) +{ + ACE_TRACE ("ACE_Token_Name::ACE_Token_Name"); + this->name (token_name); +} + +ACE_Token_Name::ACE_Token_Name (const ACE_Token_Name &rhs) +{ + ACE_TRACE ("ACE_Token_Name::ACE_Token_Name"); + this->name (rhs.name ()); +} + +ACE_Token_Name::~ACE_Token_Name () +{ + ACE_TRACE ("ACE_Token_Name::~ACE_Token_Name"); +} + +void +ACE_Token_Name::dump (void) const +{ + ACE_TRACE ("ACE_Token_Name::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, "ACE_Token_Name::dump:\n" + " token_name_ = %s\n", + token_name_ == 0 ? "no name" : token_name_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +#if !defined (ACE_NO_TSS_TOKENS) +template class ACE_TSS <ACE_TPQ_Entry>; +#endif /* ACE_NO_TSS_TOKENS */ +template class ACE_Unbounded_Stack <ACE_TPQ_Entry *>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ |