diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-18 14:06:36 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-18 14:06:36 +0000 |
commit | 2605c072815f9ccd60436bd6882080bbd832c962 (patch) | |
tree | 88350bbebbcd9d12bce5cf65d6dcdca991c5944b | |
parent | 65b0ecc7a8b242cb09e98aa49911f2b8fd087d3f (diff) | |
download | ATCD-2605c072815f9ccd60436bd6882080bbd832c962.tar.gz |
foo
-rw-r--r-- | ChangeLog-96b | 20 | ||||
-rw-r--r-- | ace/OS.h | 12 | ||||
-rw-r--r-- | ace/OS.i | 225 | ||||
-rw-r--r-- | ace/Proactor.cpp | 17 | ||||
-rw-r--r-- | ace/Proactor.h | 2 | ||||
-rw-r--r-- | ace/README | 3 | ||||
-rw-r--r-- | ace/Registry.h | 5 |
7 files changed, 194 insertions, 90 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index 2751a707de8..75ae7d48792 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,3 +1,23 @@ +Wed Dec 18 06:37:22 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/OS.i (cond_wait): Added the new algorithm for condition + variable emulation on Win32. (and VxWorks). This should fix the + nasty problems we had with earlier version (which weren't + "fair"). Thanks to James Mansion, Karlheinz, Detlef, and Irfan + for helping with this. + + * ace/Registry.h: Removed the "ACE_TURN_NOMINMAX_OFF" stuff + in order to simplify the code. Thanks to Irfan for this. + + * ace/OS.i (sema_post): Added a new overloaded version of + ACE_OS::sema_post(), which takes a "release count." This is the + number of times to release the semaphore. Note that Win32 + supports this natively, whereas on POSIX we need to loop... + + * ace/Proactor.cpp (handle_events): Changed the Proactor logic so + that it will correctly propagate any errors that occur to the + handle_{input,output}_complete callback. + Tue Dec 17 20:56:56 1996 David L. Levine <levine@cs.wustl.edu> * ace/OS.{h,i}: on VxWorks: implemented ACE_OS::gethostbyname (), @@ -756,8 +756,19 @@ struct ACE_cond_t long waiters_; // Number of waiting threads. + ACE_thread_mutex waiters_lock_; + // Serialize access to the waiters count. + ACE_sema_t sema_; // Queue up threads waiting for the condition to become signaled. + + ACE_event_t waiters_done_; + // An auto reset event used by the broadcast/signal thread to wait + // for the waiting thread(s) to wake up and get a chance at the + // semaphore. + + size_t was_broadcast_; + // Keeps track of whether we were broadcasting or just signaling. }; struct ACE_rwlock_t @@ -2180,6 +2191,7 @@ public: LPCTSTR name = 0, void *arg = 0, int max = 0x7fffffff); static int sema_post (ACE_sema_t *s); + static int sema_post (ACE_sema_t *s, size_t release_count); static int sema_trywait (ACE_sema_t *s); static int sema_wait (ACE_sema_t *s); @@ -1102,8 +1102,16 @@ ACE_OS::cond_init (ACE_cond_t *cv, int type, LPCTSTR name, void *arg) int, -1); #elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS) cv->waiters_ = 0; + cv->was_broadcast_ = 0; - return ACE_OS::sema_init (&cv->sema_, 0, type, name, arg); + int result = 0; + if (ACE_OS::sema_init (&cv->sema_, 0, type, name, arg) == -1) + result = -1; + else if (ACE_OS::mutex_init (&cs->waiters_lock_) = -1) + result = -1; + else if (ACE_OS::event_init (&cv->waiters_done_) == -1) + result = -1; + return result; #endif /* ACE_HAS_STHREADS */ #else cv = cv; @@ -1131,7 +1139,7 @@ ACE_OS::cond_signal (ACE_cond_t *cv) if (cv->waiters_ > 0) return ACE_OS::sema_post (&cv->sema_); else - return 0; + return 0; // No-op #endif /* ACE_HAS_STHREADS */ #else cv = cv; @@ -1155,29 +1163,28 @@ ACE_OS::cond_broadcast (ACE_cond_t *cv) #elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS) // The <external_mutex> must be locked before this call is made. - int result = 0; - int error = 0; + if (this->waiters_ == 0) + return 0; // No-op + else // We are broadcasting, even if there is just one waiter... + { + int result = 0; + // Record the fact that we are broadcasting. This helps the + // cond_wait() method know how to optimize itself. + this->was_broadcast_ = 1; - // Keep track of the number of waiters. - // cv->signaled_waiters_ = cv->waiters_; - // ACE_OS::sema_init (cv->signaled_counter_, cv->signaled_waiters_); + // Wake up all the waiters. - // Wake up all the waiters. + if (ACE_OS::sema_post (&sv->sema_, this->waiters) == -1) + result = -1; - for (int i = cv->waiters_; i > 0; i--) - if (ACE_OS::sema_post (&cv->sema_) != 0) - { - error = errno; + // Wait for all the awakened threads to acquire their part of the + // counting semaphore. + else if (ACE_OS::event_wait (&cv->waiters_done_) == -1) result = -1; - break; - } - // Wait for all the awakened threads to acquire their part of the - // counting semaphore. - // ::WaitForSingleObject (cv->waiters_done_, INFINITE); - // ACE_OS::sema_destroy (cv->signaled_counter_); - errno = error; - return result; + this->was_broadcast_ = 0; + return result; + } #endif /* ACE_HAS_STHREADS */ #else cv = cv; @@ -1198,39 +1205,59 @@ ACE_OS::cond_wait (ACE_cond_t *cv, ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::cond_wait (cv, external_mutex), ace_result_), int, -1); #elif defined (ACE_HAS_WTHREADS) || defined (VXWORKS) - // It's ok to increment this because the <external_mutex> is locked. + // It's ok to increment this because the <external_mutex> must be + // locked by the caller. cv->waiters_++; - if (ACE_OS::mutex_unlock (external_mutex) != 0) - return -1; - int result = 0; int error = 0; - // Wait to be awakened by a ACE_OS::signal() or ACE_OS::broadcast(). - if (ACE_OS::sema_wait (&cv->sema_) != 0) +#if defined (ACE_HAS_SIGNAL_OBJECT_AND_WAIT) + // This call will automatically release the mutex and wait on the semaphore. + result = ::SignalObjectAndWait (*external_mutex, cv->sema_, INFINITE, FALSE); +#else + // We keep the lock held just long enough to increment the count of + // waiters by one. Note that we can't keep it held across the call + // to ACE_OS::sema_wait() since that will deadlock other calls to + // ACE_OS::cond_signal(). + if (ACE_OS::mutex_unlock (external_mutex) != 0) + return -1; + + // Wait to be awakened by a ACE_OS::cond_signal() or + // ACE_OS::cond_broadcast(). + result = ::WaitForSingleObject (cv->sema_, INFINITE); +#endif /* ACE_HAS_SIGNAL_OBJECT_AND_WAIT */ + if (result != WAIT_OBJECT_0) + // This is a hack, we need to find an appropriate mapping... + error = result == WAIT_TIMEOUT ? ETIME : ::GetLastError (); + else { - result = -1; - error = errno; + // If we are broadcasting, then we need to be smarter about + // locking since there can now be multiple threadsd in the + // crtical section. If we are signaling, however, we don't have + // to worry since there will just be 1 thread here. + if (cv->was_broadcast_) + { + if (ACE_OS::mutex_lock (cv->waiters_lock_) != -1) + { + // By making the waiter responsible for decrementing its count we + // don't have to worry about having an internal mutex. Thanks to + // Karlheinz for recognizing this optimization. + cv->waiters_--; + // Release the signaler/broadcaster if we're the last waiter. + if (cv->waiters_ == 0) + ::SetEvent (cv->waiters_done_); + ACE_OS::mutex_unlock (cv->internal_mutex_); + } + } + else + cv->waiters_--; } - - // ACE_OS::sema_wait (cv->signaled_counter_); - // ACE_OS::mutex_lock (cv->internal_mutex_); - // cv->signaled_waiters_--; - // Release the signaler. - // if (cv->signaled_waiters_ == 0) - // ::SetEvent (cv->waiters_done_); - // ACE_OS::mutex_unlock (cv->internal_mutex_); - - // We must always regain the mutex, even when errors occur so that - // we can atomically decrement the count of the waiters. + // We must always regain the external mutex, even when errors + // occur because that's the guarantee that we give to our + // callers. ACE_OS::mutex_lock (external_mutex); - // By making the waiter responsible for decrementing its count we - // don't have to worry about having an internal mutex. Thanks to - // Karlheinz for recognizing this optimization. - cv->waiters_--; - // Reset errno in case mutex_lock() also fails... errno = error; return result; @@ -1248,55 +1275,81 @@ ACE_OS::cond_timedwait (ACE_cond_t *cv, // ACE_TRACE ("ACE_OS::cond_timedwait"); #if defined (ACE_HAS_THREADS) #if defined (ACE_HAS_WTHREADS) - // Note that it is ok to increment this because the <external_mutex> - // is locked! + + // Handle the easy case first. + if (timeout == 0) + return ACE_OS::cond_wait (cv, external_mutex); + + // It's ok to increment this because the <external_mutex> must be + // locked by the caller. cv->waiters_++; + + int result = 0; + int error = 0; + int msec_timeout; + + if (timeout->sec () == 0 && timeout->usec () == 0) + msec_timeout = 0; // Do a "poll." + else + { + // Note that we must convert between absolute time (which is + // passed as a parameter) and relative time (which is what + // WaitForSingleObjects() expects). + ACE_Time_Value relative_time (*timeout - ACE_OS::gettimeofday ()); + msec_timeout = relative_time.msec (); + } +#if defined (ACE_HAS_SIGNAL_OBJECT_AND_WAIT) + // This call will automatically release the mutex and wait on the semaphore. + result = ::SignalObjectAndWait (*external_mutex, cv->sema_, msec_timeout, FALSE); +#else // We keep the lock held just long enough to increment the count of // waiters by one. Note that we can't keep it held across the call // to WaitForSingleObject since that will deadlock other calls to // ACE_OS::cond_signal(). if (ACE_OS::mutex_unlock (external_mutex) != 0) return -1; - - DWORD result; - - if (timeout == 0) - // Wait forever. - result = ::WaitForSingleObject (cv->sema_, INFINITE); - else if (timeout->sec () == 0 && timeout->usec () == 0) - // Do a "poll". - result = ::WaitForSingleObject (cv->sema_, 0); - else - { - // Wait for upto <relative_time> number of milliseconds. Note - // that we must convert between absolute time (which is passed - // as a parameter) and relative time (which is what - // WaitForSingleObjects() expects). - ACE_Time_Value relative_time (*timeout - ACE_OS::gettimeofday ()); - result = ::WaitForSingleObject (cv->sema_, relative_time.msec ()); - } - - // Reacquire the lock before we decrement the count of waiters. - ACE_OS::mutex_lock (external_mutex); - - cv->waiters_--; - if (result == WAIT_OBJECT_0) - return 0; - else + // Wait to be awakened by a ACE_OS::signal() or ACE_OS::broadcast(). + result = ::WaitForSingleObject (cv->sema_, msec_timeout); +#endif /* ACE_HAS_SIGNAL_OBJECT_AND_WAIT */ + if (result != WAIT_OBJECT_0) + // This is a hack, we need to find an appropriate mapping... + error = result == WAIT_TIMEOUT ? ETIME : ::GetLastError (); + else { - errno = result == WAIT_TIMEOUT ? ETIME : ::GetLastError (); - // This is a hack, we need to find an appropriate mapping... - return -1; + // If we are broadcasting, then we need to be smarter about + // locking since there can now be multiple threadsd in the + // crtical section. If we are signaling, however, we don't have + // to worry since there will just be 1 thread here. + if (cv->was_broadcast_) + { + if (ACE_OS::mutex_lock (cv->waiters_lock_) != -1) + { + // By making the waiter responsible for decrementing its count we + // don't have to worry about having an internal mutex. Thanks to + // Karlheinz for recognizing this optimization. + cv->waiters_--; + // Release the signaler/broadcaster if we're the last waiter. + if (cv->waiters_ == 0) + ::SetEvent (cv->waiters_done_); + ACE_OS::mutex_unlock (cv->internal_mutex_); + } + } + else + cv->waiters_--; } - + // We must always regain the external mutex, even when errors + // occur because that's the guarantee that we give to our + // callers. + ACE_OS::mutex_lock (external_mutex); + errno = error; + return result; #elif defined (VXWORKS) // POSIX semaphores don't have a timed wait. Should implement conds with // VxWorks semaphores instead, they do have a timed wait. But all of the // other cond operations would have to be modified. ACE_NOTSUP_RETURN (-1); - #else /* PTHREADS or STHREADS or DCETHREADS */ int result; timestruc_t ts = *timeout; // Calls ACE_Time_Value::operator timestruc_t(). @@ -2901,6 +2954,24 @@ ACE_OS::sema_post (ACE_sema_t *s) #endif /* ACE_HAS_POSIX_SEM */ } +ACE_INLINE int +ACE_OS::sema_post (ACE_sema_t *s, size_t release_count) +{ +#if defined (ACE_WIN32) + // Win32 supports this natively. + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::ReleaseSemaphore (*, release_count, 0), + ace_result_), + int, -1); +#else + // On POSIX platforms we need to emulate this ourselves. + for (size_t i = 0; i < release_count; i++) + if (ACE_OS::sema_post (s) == -1) + return -1; + + return 0; +#endif /* ACE_WIN32 */ +} + ACE_INLINE int ACE_OS::sema_trywait (ACE_sema_t *s) { diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index c2b640e1607..b2c9ed505d6 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -261,8 +261,13 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long) (ACE_OVERLAPPED **) &overlapped, timeout); - // Check for a failed dequeue. Stash the error value. - if (result == FALSE && overlapped == 0) + // Check for a failed dequeue. This can happen either because + // of problems with the IO completion port (in which case + // overlapped == 0) or due to problems with the completion + // operation (in which case overlapped != 0). In either case, + // we'll stash the error value so that we can update errno + // appropriate later on. + if (result == FALSE) error = ::GetLastError (); } @@ -276,7 +281,7 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long) // the caller! // GetQueued returned because of a error or timer. - if (error != 0) + if (error != 0 && overlapped == 0) { // @@ What's the WIN32 constant for 258?!?!?! if (error == ACE_TIMEOUT_OCCURRED) @@ -287,14 +292,14 @@ ACE_Proactor::handle_events (ACE_Time_Value *how_long) "%p GetQueuedCompletionStatus failed errno = %d.\n", "ACE_Proactor::handle_events", error), -1); } - #endif /* ACE_WIN32 */ // Dequeued a failed or successful operation. Dispatch the // Event_Handler. Note that GetQueuedCompletionStatus returns false // when operations fail, but they still need to be dispatched. - // Should we propogate this to the handler somehow? Maybe an extra - // failed/succeeded flag in the dispatch call? + // We propagate the error status to the callee by setting errno = + // error (which is the value returned by ::GetLastError(). + errno = error; int dispatch_result = this->dispatch (overlapped, bytes_transferred); // Return -1 (failure), or return 1. Remember that 0 is reserved diff --git a/ace/Proactor.h b/ace/Proactor.h index 45ba0f8be7f..602e4f64503 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -174,7 +174,7 @@ protected: ACE_Manual_Event shared_event_; // Win32 HANDLE associated with every operation that signals when // any operation completes (used to transparently integrate the - // <ACE_Proactor> with the <ACE_Dispatcher>). + // <ACE_Proactor> with the <ACE_ReactorEx>). }; class ACE_Export ACE_Overlapped_File diff --git a/ace/README b/ace/README index f7de1dd8489..01b65c546cb 100644 --- a/ace/README +++ b/ace/README @@ -71,6 +71,7 @@ ACE_HAS_RTLD_LAZY_V Explicit dynamic linking permits "lazy" symbol resolution ACE_HAS_SELECT_H Platform has special header for select(). ACE_HAS_SEMUN Compiler/platform defines a union semun for SysV shared memory ACE_HAS_SIGINFO_T Platform supports SVR4 extended signals +ACE_HAS_SIGNAL_OBJECT_AND_WAIT Platform supports the Win32 SignalObjectAndWait() function (WinNT 4.0 and beyond). ACE_HAS_SIGNAL_SAFE_OS_CALLS Automatically restart OS system calls when EINTR occurs ACE_HAS_SIGWAIT Platform/compiler has the sigwait(2) prototype ACE_HAS_SIG_ATOMIC_T Compiler/platform defines the sig_atomic_t typedef @@ -125,7 +126,7 @@ ACE_HAS_UNICODE Platform/compiler supports UNICODE ACE_HAS_USING_KEYWORD Compiler supports the new using keyword for C++ namespaces. ACE_HAS_VOIDPTR_MMAP Platform requires void * for mmap(). ACE_HAS_VOIDPTR_SOCKOPT OS/compiler uses void * arg 4 setsockopt() rather than const char * -ACE_HAS_WIN32_TRYLOCK The Win32 platform support TryEnterCriticalSection() +ACE_HAS_WIN32_TRYLOCK The Win32 platform support TryEnterCriticalSection() (WinNT 4.0 and beyond) ACE_HAS_WINSOCK2 The Win32 platform supports WinSock 2.0 ACE_HAS_XLI Platform has the XLI version of TLI ACE_HAS_XT Platform has Xt and Motif diff --git a/ace/Registry.h b/ace/Registry.h index 347c760f394..061c5e16547 100644 --- a/ace/Registry.h +++ b/ace/Registry.h @@ -503,10 +503,5 @@ private: }; -#if defined (ACE_TURN_NOMINMAX_OFF) -#undef NOMINMAX -#undef ACE_TURN_NOMINMAX_OFF -#endif - #endif /* ACE_WIN32 */ #endif /* ACE_REGISTRY_H */ |