summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-18 14:06:36 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-18 14:06:36 +0000
commit2605c072815f9ccd60436bd6882080bbd832c962 (patch)
tree88350bbebbcd9d12bce5cf65d6dcdca991c5944b
parent65b0ecc7a8b242cb09e98aa49911f2b8fd087d3f (diff)
downloadATCD-2605c072815f9ccd60436bd6882080bbd832c962.tar.gz
foo
-rw-r--r--ChangeLog-96b20
-rw-r--r--ace/OS.h12
-rw-r--r--ace/OS.i225
-rw-r--r--ace/Proactor.cpp17
-rw-r--r--ace/Proactor.h2
-rw-r--r--ace/README3
-rw-r--r--ace/Registry.h5
7 files changed, 194 insertions, 90 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index 2751a707de8..75ae7d48792 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,3 +1,23 @@
+Wed Dec 18 06:37:22 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * ace/OS.i (cond_wait): Added the new algorithm for condition
+ variable emulation on Win32. (and VxWorks). This should fix the
+ nasty problems we had with earlier version (which weren't
+ "fair"). Thanks to James Mansion, Karlheinz, Detlef, and Irfan
+ for helping with this.
+
+ * ace/Registry.h: Removed the "ACE_TURN_NOMINMAX_OFF" stuff
+ in order to simplify the code. Thanks to Irfan for this.
+
+ * ace/OS.i (sema_post): Added a new overloaded version of
+ ACE_OS::sema_post(), which takes a "release count." This is the
+ number of times to release the semaphore. Note that Win32
+ supports this natively, whereas on POSIX we need to loop...
+
+ * ace/Proactor.cpp (handle_events): Changed the Proactor logic so
+ that it will correctly propagate any errors that occur to the
+ handle_{input,output}_complete callback.
+
Tue Dec 17 20:56:56 1996 David L. Levine <levine@cs.wustl.edu>
* ace/OS.{h,i}: on VxWorks: implemented ACE_OS::gethostbyname (),
diff --git a/ace/OS.h b/ace/OS.h
index fbd318a9853..d637e640fba 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -756,8 +756,19 @@ struct ACE_cond_t
long waiters_;
// Number of waiting threads.
+ ACE_thread_mutex waiters_lock_;
+ // Serialize access to the waiters count.
+
ACE_sema_t sema_;
// Queue up threads waiting for the condition to become signaled.
+
+ ACE_event_t waiters_done_;
+ // An auto reset event used by the broadcast/signal thread to wait
+ // for the waiting thread(s) to wake up and get a chance at the
+ // semaphore.
+
+ size_t was_broadcast_;
+ // Keeps track of whether we were broadcasting or just signaling.
};
struct ACE_rwlock_t
@@ -2180,6 +2191,7 @@ public:
LPCTSTR name = 0, void *arg = 0,
int max = 0x7fffffff);
static int sema_post (ACE_sema_t *s);
+ static int sema_post (ACE_sema_t *s, size_t release_count);
static int sema_trywait (ACE_sema_t *s);
static int sema_wait (ACE_sema_t *s);
diff --git a/ace/OS.i b/ace/OS.i
index 5840ac83fcf..62db79ff63a 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -1102,8 +1102,16 @@ ACE_OS::cond_init (ACE_cond_t *cv, int type, LPCTSTR name, void *arg)
int, -1);
#elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS)
cv->waiters_ = 0;
+ cv->was_broadcast_ = 0;
- return ACE_OS::sema_init (&cv->sema_, 0, type, name, arg);
+ int result = 0;
+ if (ACE_OS::sema_init (&cv->sema_, 0, type, name, arg) == -1)
+ result = -1;
+ else if (ACE_OS::mutex_init (&cs->waiters_lock_) = -1)
+ result = -1;
+ else if (ACE_OS::event_init (&cv->waiters_done_) == -1)
+ result = -1;
+ return result;
#endif /* ACE_HAS_STHREADS */
#else
cv = cv;
@@ -1131,7 +1139,7 @@ ACE_OS::cond_signal (ACE_cond_t *cv)
if (cv->waiters_ > 0)
return ACE_OS::sema_post (&cv->sema_);
else
- return 0;
+ return 0; // No-op
#endif /* ACE_HAS_STHREADS */
#else
cv = cv;
@@ -1155,29 +1163,28 @@ ACE_OS::cond_broadcast (ACE_cond_t *cv)
#elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS)
// The <external_mutex> must be locked before this call is made.
- int result = 0;
- int error = 0;
+ if (this->waiters_ == 0)
+ return 0; // No-op
+ else // We are broadcasting, even if there is just one waiter...
+ {
+ int result = 0;
+ // Record the fact that we are broadcasting. This helps the
+ // cond_wait() method know how to optimize itself.
+ this->was_broadcast_ = 1;
- // Keep track of the number of waiters.
- // cv->signaled_waiters_ = cv->waiters_;
- // ACE_OS::sema_init (cv->signaled_counter_, cv->signaled_waiters_);
+ // Wake up all the waiters.
- // Wake up all the waiters.
+ if (ACE_OS::sema_post (&sv->sema_, this->waiters) == -1)
+ result = -1;
- for (int i = cv->waiters_; i > 0; i--)
- if (ACE_OS::sema_post (&cv->sema_) != 0)
- {
- error = errno;
+ // Wait for all the awakened threads to acquire their part of the
+ // counting semaphore.
+ else if (ACE_OS::event_wait (&cv->waiters_done_) == -1)
result = -1;
- break;
- }
- // Wait for all the awakened threads to acquire their part of the
- // counting semaphore.
- // ::WaitForSingleObject (cv->waiters_done_, INFINITE);
- // ACE_OS::sema_destroy (cv->signaled_counter_);
- errno = error;
- return result;
+ this->was_broadcast_ = 0;
+ return result;
+ }
#endif /* ACE_HAS_STHREADS */
#else
cv = cv;
@@ -1198,39 +1205,59 @@ ACE_OS::cond_wait (ACE_cond_t *cv,
ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::cond_wait (cv, external_mutex), ace_result_),
int, -1);
#elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS)
- // It's ok to increment this because the <external_mutex> is locked.
+ // It's ok to increment this because the <external_mutex> must be
+ // locked by the caller.
cv->waiters_++;
- if (ACE_OS::mutex_unlock (external_mutex) != 0)
- return -1;
-
int result = 0;
int error = 0;
- // Wait to be awakened by a ACE_OS::signal() or ACE_OS::broadcast().
- if (ACE_OS::sema_wait (&cv->sema_) != 0)
+#if defined (ACE_HAS_SIGNAL_OBJECT_AND_WAIT)
+ // This call will automatically release the mutex and wait on the semaphore.
+ result = ::SignalObjectAndWait (*external_mutex, cv->sema_, INFINITE, FALSE);
+#else
+ // We keep the lock held just long enough to increment the count of
+ // waiters by one. Note that we can't keep it held across the call
+ // to ACE_OS::sema_wait() since that will deadlock other calls to
+ // ACE_OS::cond_signal().
+ if (ACE_OS::mutex_unlock (external_mutex) != 0)
+ return -1;
+
+ // Wait to be awakened by a ACE_OS::cond_signal() or
+ // ACE_OS::cond_broadcast().
+ result = ::WaitForSingleObject (cv->sema_, INFINITE);
+#endif /* ACE_HAS_SIGNAL_OBJECT_AND_WAIT */
+ if (result != WAIT_OBJECT_0)
+ // This is a hack, we need to find an appropriate mapping...
+ error = result == WAIT_TIMEOUT ? ETIME : ::GetLastError ();
+ else
{
- result = -1;
- error = errno;
+ // If we are broadcasting, then we need to be smarter about
+ // locking since there can now be multiple threadsd in the
+ // crtical section. If we are signaling, however, we don't have
+ // to worry since there will just be 1 thread here.
+ if (cv->was_broadcast_)
+ {
+ if (ACE_OS::mutex_lock (cv->waiters_lock_) != -1)
+ {
+ // By making the waiter responsible for decrementing its count we
+ // don't have to worry about having an internal mutex. Thanks to
+ // Karlheinz for recognizing this optimization.
+ cv->waiters_--;
+ // Release the signaler/broadcaster if we're the last waiter.
+ if (cv->waiters_ == 0)
+ ::SetEvent (cv->waiters_done_);
+ ACE_OS::mutex_unlock (cv->internal_mutex_);
+ }
+ }
+ else
+ cv->waiters_--;
}
-
- // ACE_OS::sema_wait (cv->signaled_counter_);
- // ACE_OS::mutex_lock (cv->internal_mutex_);
- // cv->signaled_waiters_--;
- // Release the signaler.
- // if (cv->signaled_waiters_ == 0)
- // ::SetEvent (cv->waiters_done_);
- // ACE_OS::mutex_unlock (cv->internal_mutex_);
-
- // We must always regain the mutex, even when errors occur so that
- // we can atomically decrement the count of the waiters.
+ // We must always regain the external mutex, even when errors
+ // occur because that's the guarantee that we give to our
+ // callers.
ACE_OS::mutex_lock (external_mutex);
- // By making the waiter responsible for decrementing its count we
- // don't have to worry about having an internal mutex. Thanks to
- // Karlheinz for recognizing this optimization.
- cv->waiters_--;
-
// Reset errno in case mutex_lock() also fails...
errno = error;
return result;
@@ -1248,55 +1275,81 @@ ACE_OS::cond_timedwait (ACE_cond_t *cv,
// ACE_TRACE ("ACE_OS::cond_timedwait");
#if defined (ACE_HAS_THREADS)
#if defined (ACE_HAS_WTHREADS)
- // Note that it is ok to increment this because the <external_mutex>
- // is locked!
+
+ // Handle the easy case first.
+ if (timeout == 0)
+ return ACE_OS::cond_wait (cv, external_mutex);
+
+ // It's ok to increment this because the <external_mutex> must be
+ // locked by the caller.
cv->waiters_++;
+
+ int result = 0;
+ int error = 0;
+ int msec_timeout;
+
+ if (timeout->sec () == 0 && timeout->usec () == 0)
+ msec_timeout = 0; // Do a "poll."
+ else
+ {
+ // Note that we must convert between absolute time (which is
+ // passed as a parameter) and relative time (which is what
+ // WaitForSingleObjects() expects).
+ ACE_Time_Value relative_time (*timeout - ACE_OS::gettimeofday ());
+ msec_timeout = relative_time.msec ();
+ }
+#if defined (ACE_HAS_SIGNAL_OBJECT_AND_WAIT)
+ // This call will automatically release the mutex and wait on the semaphore.
+ result = ::SignalObjectAndWait (*external_mutex, cv->sema_, msec_timeout, FALSE);
+#else
// We keep the lock held just long enough to increment the count of
// waiters by one. Note that we can't keep it held across the call
// to WaitForSingleObject since that will deadlock other calls to
// ACE_OS::cond_signal().
if (ACE_OS::mutex_unlock (external_mutex) != 0)
return -1;
-
- DWORD result;
-
- if (timeout == 0)
- // Wait forever.
- result = ::WaitForSingleObject (cv->sema_, INFINITE);
- else if (timeout->sec () == 0 && timeout->usec () == 0)
- // Do a "poll".
- result = ::WaitForSingleObject (cv->sema_, 0);
- else
- {
- // Wait for upto <relative_time> number of milliseconds. Note
- // that we must convert between absolute time (which is passed
- // as a parameter) and relative time (which is what
- // WaitForSingleObjects() expects).
- ACE_Time_Value relative_time (*timeout - ACE_OS::gettimeofday ());
- result = ::WaitForSingleObject (cv->sema_, relative_time.msec ());
- }
-
- // Reacquire the lock before we decrement the count of waiters.
- ACE_OS::mutex_lock (external_mutex);
-
- cv->waiters_--;
- if (result == WAIT_OBJECT_0)
- return 0;
- else
+ // Wait to be awakened by a ACE_OS::signal() or ACE_OS::broadcast().
+ result = ::WaitForSingleObject (cv->sema_, msec_timeout);
+#endif /* ACE_HAS_SIGNAL_OBJECT_AND_WAIT */
+ if (result != WAIT_OBJECT_0)
+ // This is a hack, we need to find an appropriate mapping...
+ error = result == WAIT_TIMEOUT ? ETIME : ::GetLastError ();
+ else
{
- errno = result == WAIT_TIMEOUT ? ETIME : ::GetLastError ();
- // This is a hack, we need to find an appropriate mapping...
- return -1;
+ // If we are broadcasting, then we need to be smarter about
+ // locking since there can now be multiple threadsd in the
+ // crtical section. If we are signaling, however, we don't have
+ // to worry since there will just be 1 thread here.
+ if (cv->was_broadcast_)
+ {
+ if (ACE_OS::mutex_lock (cv->waiters_lock_) != -1)
+ {
+ // By making the waiter responsible for decrementing its count we
+ // don't have to worry about having an internal mutex. Thanks to
+ // Karlheinz for recognizing this optimization.
+ cv->waiters_--;
+ // Release the signaler/broadcaster if we're the last waiter.
+ if (cv->waiters_ == 0)
+ ::SetEvent (cv->waiters_done_);
+ ACE_OS::mutex_unlock (cv->internal_mutex_);
+ }
+ }
+ else
+ cv->waiters_--;
}
-
+ // We must always regain the external mutex, even when errors
+ // occur because that's the guarantee that we give to our
+ // callers.
+ ACE_OS::mutex_lock (external_mutex);
+ errno = error;
+ return result;
#elif defined (VXWORKS)
// POSIX semaphores don't have a timed wait. Should implement conds with
// VxWorks semaphores instead, they do have a timed wait. But all of the
// other cond operations would have to be modified.
ACE_NOTSUP_RETURN (-1);
-
#else /* PTHREADS or STHREADS or DCETHREADS */
int result;
timestruc_t ts = *timeout; // Calls ACE_Time_Value::operator timestruc_t().
@@ -2901,6 +2954,24 @@ ACE_OS::sema_post (ACE_sema_t *s)
#endif /* ACE_HAS_POSIX_SEM */
}
+ACE_INLINE int
+ACE_OS::sema_post (ACE_sema_t *s, size_t release_count)
+{
+#if defined (ACE_WIN32)
+ // Win32 supports this natively.
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::ReleaseSemaphore (*, release_count, 0),
+ ace_result_),
+ int, -1);
+#else
+ // On POSIX platforms we need to emulate this ourselves.
+ for (size_t i = 0; i < release_count; i++)
+ if (ACE_OS::sema_post (s) == -1)
+ return -1;
+
+ return 0;
+#endif /* ACE_WIN32 */
+}
+
ACE_INLINE int
ACE_OS::sema_trywait (ACE_sema_t *s)
{
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index c2b640e1607..b2c9ed505d6 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -261,8 +261,13 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long)
(ACE_OVERLAPPED **) &overlapped,
timeout);
- // Check for a failed dequeue. Stash the error value.
- if (result == FALSE && overlapped == 0)
+ // Check for a failed dequeue. This can happen either because
+ // of problems with the IO completion port (in which case
+ // overlapped == 0) or due to problems with the completion
+ // operation (in which case overlapped != 0). In either case,
+ // we'll stash the error value so that we can update errno
+ // appropriate later on.
+ if (result == FALSE)
error = ::GetLastError ();
}
@@ -276,7 +281,7 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long)
// the caller!
// GetQueued returned because of a error or timer.
- if (error != 0)
+ if (error != 0 && overlapped == 0)
{
// @@ What's the WIN32 constant for 258?!?!?!
if (error == ACE_TIMEOUT_OCCURRED)
@@ -287,14 +292,14 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long)
"%p GetQueuedCompletionStatus failed errno = %d.\n",
"ACE_Proactor::handle_events", error), -1);
}
-
#endif /* ACE_WIN32 */
// Dequeued a failed or successful operation. Dispatch the
// Event_Handler. Note that GetQueuedCompletionStatus returns false
// when operations fail, but they still need to be dispatched.
- // Should we propogate this to the handler somehow? Maybe an extra
- // failed/succeeded flag in the dispatch call?
+ // We propagate the error status to the callee by setting errno =
+ // error (which is the value returned by ::GetLastError().
+ errno = error;
int dispatch_result = this->dispatch (overlapped, bytes_transferred);
// Return -1 (failure), or return 1. Remember that 0 is reserved
diff --git a/ace/Proactor.h b/ace/Proactor.h
index 45ba0f8be7f..602e4f64503 100644
--- a/ace/Proactor.h
+++ b/ace/Proactor.h
@@ -174,7 +174,7 @@ protected:
ACE_Manual_Event shared_event_;
// Win32 HANDLE associated with every operation that signals when
// any operation completes (used to transparently integrate the
- // <ACE_Proactor> with the <ACE_Dispatcher>).
+ // <ACE_Proactor> with the <ACE_ReactorEx>).
};
class ACE_Export ACE_Overlapped_File
diff --git a/ace/README b/ace/README
index f7de1dd8489..01b65c546cb 100644
--- a/ace/README
+++ b/ace/README
@@ -71,6 +71,7 @@ ACE_HAS_RTLD_LAZY_V Explicit dynamic linking permits "lazy" symbol resolution
ACE_HAS_SELECT_H Platform has special header for select().
ACE_HAS_SEMUN Compiler/platform defines a union semun for SysV shared memory
ACE_HAS_SIGINFO_T Platform supports SVR4 extended signals
+ACE_HAS_SIGNAL_OBJECT_AND_WAIT Platform supports the Win32 SignalObjectAndWait() function (WinNT 4.0 and beyond).
ACE_HAS_SIGNAL_SAFE_OS_CALLS Automatically restart OS system calls when EINTR occurs
ACE_HAS_SIGWAIT Platform/compiler has the sigwait(2) prototype
ACE_HAS_SIG_ATOMIC_T Compiler/platform defines the sig_atomic_t typedef
@@ -125,7 +126,7 @@ ACE_HAS_UNICODE Platform/compiler supports UNICODE
ACE_HAS_USING_KEYWORD Compiler supports the new using keyword for C++ namespaces.
ACE_HAS_VOIDPTR_MMAP Platform requires void * for mmap().
ACE_HAS_VOIDPTR_SOCKOPT OS/compiler uses void * arg 4 setsockopt() rather than const char *
-ACE_HAS_WIN32_TRYLOCK The Win32 platform support TryEnterCriticalSection()
+ACE_HAS_WIN32_TRYLOCK The Win32 platform support TryEnterCriticalSection() (WinNT 4.0 and beyond)
ACE_HAS_WINSOCK2 The Win32 platform supports WinSock 2.0
ACE_HAS_XLI Platform has the XLI version of TLI
ACE_HAS_XT Platform has Xt and Motif
diff --git a/ace/Registry.h b/ace/Registry.h
index 347c760f394..061c5e16547 100644
--- a/ace/Registry.h
+++ b/ace/Registry.h
@@ -503,10 +503,5 @@ private:
};
-#if defined (ACE_TURN_NOMINMAX_OFF)
-#undef NOMINMAX
-#undef ACE_TURN_NOMINMAX_OFF
-#endif
-
#endif /* ACE_WIN32 */
#endif /* ACE_REGISTRY_H */