summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-23 16:34:11 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-23 16:34:11 +0000
commit5a7724c9534fdf46409f6c1edf8bb0b8408d6385 (patch)
tree0ad7c4784ff6671e0f16b19e1b899b950c03656c
parent35051f07c407b4e462960a0c86b97605f0ac3bbb (diff)
downloadATCD-5a7724c9534fdf46409f6c1edf8bb0b8408d6385.tar.gz
ChangeLog Entry: Sun May 23 11:33:07 1999 Alexander Babu Arulanthu <alex@cs.wustl.edu>
-rw-r--r--ace/Asynch_IO.cpp11
-rw-r--r--ace/Asynch_IO.h7
-rw-r--r--ace/Object_Manager.cpp4
-rw-r--r--ace/Object_Manager.h1
-rw-r--r--ace/POSIX_Asynch_IO.cpp9
-rw-r--r--ace/POSIX_Proactor.cpp80
-rw-r--r--ace/POSIX_Proactor.h10
-rw-r--r--ace/Proactor.cpp141
-rw-r--r--ace/Proactor_Impl.h9
-rw-r--r--ace/WIN32_Proactor.cpp76
-rw-r--r--ace/WIN32_Proactor.h8
11 files changed, 323 insertions, 33 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 9a87f63c893..373356337eb 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -969,11 +969,14 @@ ACE_Handler::handle_write_file (const ACE_Asynch_Write_File::Result &result)
}
void
-ACE_Handler::handle_time_out (const ACE_Time_Value &tv,
- const void *act)
+ACE_Handler::handle_time_out (const ACE_Time_Value & /* tv */,
+ const void * /* act */)
+{
+}
+
+void
+ACE_Handler::handle_wakeup (void)
{
- ACE_UNUSED_ARG (tv);
- ACE_UNUSED_ARG (act);
}
ACE_Proactor *
diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h
index 4e1c72d18b6..fb555a90610 100644
--- a/ace/Asynch_IO.h
+++ b/ace/Asynch_IO.h
@@ -1002,7 +1002,12 @@ public:
virtual void handle_time_out (const ACE_Time_Value &tv,
const void *act = 0);
// Called when timer expires. <tv> was the requested time value and
- // <act> is the ACT passed when scheduling the timer
+ // <act> is the ACT passed when scheduling the timer.
+
+ virtual void handle_wakeup (void);
+ // This is method works with the <run_event_loop> of the
+ // ACE_Proactor. A special <Wake_Up_Completion> is used to wake up
+ // all the threads that are blocking for completions.
ACE_Proactor *proactor (void);
// Get the proactor associated with this handler.
diff --git a/ace/Object_Manager.cpp b/ace/Object_Manager.cpp
index 7f2c89f859d..c6f5e15b430 100644
--- a/ace/Object_Manager.cpp
+++ b/ace/Object_Manager.cpp
@@ -213,6 +213,8 @@ ACE_Object_Manager::init (void)
ACE_TOKEN_MANAGER_CREATION_LOCK)
ACE_PREALLOCATE_OBJECT (ACE_TOKEN_CONST::MUTEX,
ACE_TOKEN_INVARIANTS_CREATION_LOCK)
+ ACE_PREALLOCATE_OBJECT (ACE_Thread_Mutex,
+ ACE_PROACTOR_EVENT_LOOP_LOCK)
# endif /* ACE_MT_SAFE */
}
@@ -664,6 +666,8 @@ ACE_Object_Manager::fini (void)
ACE_TOKEN_MANAGER_CREATION_LOCK)
ACE_DELETE_PREALLOCATED_OBJECT (ACE_TOKEN_CONST::MUTEX,
ACE_TOKEN_INVARIANTS_CREATION_LOCK)
+ ACE_DELETE_PREALLOCATED_OBJECT (ACE_Thread_Mutex,
+ ACE_PROACTOR_EVENT_LOOP_LOCK)
# endif /* ACE_MT_SAFE */
#endif /* ! ACE_HAS_STATIC_PREALLOCATION */
diff --git a/ace/Object_Manager.h b/ace/Object_Manager.h
index 1dfc2cb20cc..96bd39daf79 100644
--- a/ace/Object_Manager.h
+++ b/ace/Object_Manager.h
@@ -256,6 +256,7 @@ public:
ACE_THREAD_EXIT_LOCK,
ACE_TOKEN_MANAGER_CREATION_LOCK,
ACE_TOKEN_INVARIANTS_CREATION_LOCK,
+ ACE_PROACTOR_EVENT_LOOP_LOCK,
#endif /* ACE_MT_SAFE */
// Hook for preallocated objects provided by application.
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index 573fd72ac3c..6c3683ed4fb 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -78,6 +78,7 @@ ACE_POSIX_Asynch_Result::signal_number (void) const
{
return this->aio_sigevent.sigev_signo;
}
+
int
ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl)
{
@@ -2105,8 +2106,6 @@ ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void)
void*
ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
{
- ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n"));
-
// Retrieve the reactor pointer from the argument.
ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *,
arg_reactor);
@@ -2126,13 +2125,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
while (result != -1)
{
result = reactor->handle_events ();
- ACE_DEBUG ((LM_DEBUG,
- "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n",
- result));
}
-
- ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n"));
-
return 0;
}
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index bf84e9ebff2..68c8aaf7be0 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -13,6 +13,38 @@
#include "ace/POSIX_Proactor.i"
#endif /* __ACE_INLINE__ */
+class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
+{
+ // = TITLE
+ //
+ // This is result object is used by the <end_event_loop> of the
+ // ACE_Proactor interface to wake up all the threads blocking
+ // for completions.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
+ const void *act = 0,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
+ // Constructor.
+
+ virtual ~ACE_POSIX_Wakeup_Completion (void);
+ // Destructor.
+
+
+ virtual void complete (u_long bytes_transferred = 0,
+ int success = 1,
+ const void *completion_key = 0,
+ u_long error = 0);
+ // This method calls the <handler>'s <handle_wakeup> method.
+};
+
+// *********************************************************************
+
ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
{
this->close ();
@@ -308,6 +340,23 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r
}
}
+int
+ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
+{
+ ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
+ for (ssize_t ci = 0; ci < how_many; ci++)
+ {
+ ACE_NEW_RETURN (wakeup_completion,
+ ACE_POSIX_Wakeup_Completion (this->wakeup_handler_),
+ -1);
+
+ if (wakeup_completion->post_completion (this) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
// *********************************************************************
class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
@@ -982,9 +1031,9 @@ ACE_POSIX_SIG_Proactor::null_handler (int signal_number,
void * /* context */)
{
ACE_ERROR ((LM_ERROR,
- "Error:(%P | %t):%s:Signal number %d\n"
- "Mask all the RT signals for this thread",
- "ACE_POSIX_SIG_Proactor::null_handler called",
+ "Error:(%P | %t):ACE_POSIX_SIG_Proactor::null_handler called,"
+ "Signal number %d,"
+ "Mask all the RT signals for this thread\n",
signal_number));
}
@@ -1156,4 +1205,29 @@ ACE_POSIX_Asynch_Timer::complete (u_long bytes_transferred,
this->handler_.handle_time_out (this->time_, this->act ());
}
+// *********************************************************************
+
+ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler,
+ const void *act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+ : ACE_Asynch_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
+{
+}
+
+ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
+{
+}
+
+void
+ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */,
+ int /* success */,
+ const void * /* completion_key */,
+ u_long /* error */)
+{
+ this->handler_.handle_wakeup ();
+}
+
#endif /* ACE_HAS_AIO_CALLS */
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index 5b84bfcaa44..ee3b4af9c01 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -174,6 +174,16 @@ protected:
// compared to <AST> that can be associated each asynchronous
// operation. <completion_key> is implemented right now for the
// POSIX Proators.
+
+ virtual int post_wakeup_completions (int how_many);
+ // Post <how_many> completions to the completion port so that all
+ // threads can wake up. This is used in conjunction with the
+ // <run_event_loop>.
+
+protected:
+ ACE_Handler wakeup_handler_;
+ // Handler to handle the wakeups. This works in conjunction with the
+ // <ACE_Proactor::run_event_loop>.
};
// Forward declarations.
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 82a8d62dc51..8801109ffef 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -34,6 +34,9 @@ int ACE_Proactor::delete_proactor_ = 0;
// Terminate the eventloop.
sig_atomic_t ACE_Proactor::end_event_loop_ = 0;
+// Number of threads in the event loop.
+sig_atomic_t ACE_Proactor::event_loop_thread_count_ = 0;
+
class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH>
{
// = TITLE
@@ -339,20 +342,60 @@ ACE_Proactor::close_singleton (void)
int
ACE_Proactor::run_event_loop (void)
{
- ACE_TRACE ("ACE_Proactor::run_event_loop");
+ int result = 0;
- while (ACE_Proactor::end_event_loop_ == 0)
+ // Declaring the lock variable.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_Thread_Mutex *lock =
+ ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
+ (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
+#endif /* ACE_MT_SAFE */
+
+ // Early check. It is ok to do this without lock, since we care just
+ // whether it is zero or non-zero.
+ if (ACE_Proactor::end_event_loop_ != 0)
+ return 0;
+
+ // First time you are in. Increment the thread count.
+ {
+ // Obtain the lock in the MT environments.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
+#endif /* ACE_MT_SAFE */
+
+ // Increment the thread count.
+ ACE_Proactor::event_loop_thread_count_ ++;
+ }
+
+ // Run the event loop.
+ while (1)
{
- int result = ACE_Proactor::instance ()->handle_events ();
+ // Check the end loop flag. It is ok to do this without lock,
+ // since we care just whether it is zero or non-zero.
+ if (ACE_Proactor::end_event_loop_ != 0)
+ break;
+
+ // <end_event_loop> is not set. Ready to do <handle_events>.
+ result = ACE_Proactor::instance ()->handle_events ();
if (ACE_Service_Config::reconfig_occurred ())
ACE_Service_Config::reconfigure ();
-
+
else if (result == -1)
- return -1;
+ break;
}
+
+ // Leaving the event loop. Decrement the thread count.
- return 0;
+ // Obtain the lock in the MT environments.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
+#endif /* ACE_MT_SAFE */
+
+ // Decrement the thread count.
+ ACE_Proactor::event_loop_thread_count_ --;
+
+ return result;
}
// Handle events for -tv- time. handle_events updates -tv- to reflect
@@ -362,29 +405,93 @@ ACE_Proactor::run_event_loop (ACE_Time_Value &tv)
{
ACE_TRACE ("ACE_Proactor::run_event_loop");
- while (ACE_Proactor::end_event_loop_ == 0
- && tv != ACE_Time_Value::zero)
+ int result = 0;
+
+ // Declaring the lock variable.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_Thread_Mutex *lock =
+ ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
+ (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
+#endif /* ACE_MT_SAFE */
+
+ // Early check. It is ok to do this without lock, since we care just
+ // whether it is zero or non-zero.
+ if (ACE_Proactor::end_event_loop_ != 0 ||
+ tv == ACE_Time_Value::zero)
+ return 0;
+
+ // First time you are in. Increment the thread count.
+ {
+ // Obtain the lock in the MT environments.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
+#endif /* ACE_MT_SAFE */
+
+ // Increment the thread count.
+ ACE_Proactor::event_loop_thread_count_ ++;
+ }
+
+ // Run the event loop.
+ while (1)
{
- int result = ACE_Proactor::instance ()->handle_events (tv);
+ // Check for end of loop. It is ok to do this without lock,
+ // since we care just whether it is zero or non-zero.
+ if (ACE_Proactor::end_event_loop_ != 0 ||
+ tv == ACE_Time_Value::zero)
+ break;
+
+ // <end_event_loop> is not set. Ready to do <handle_events>.
+ result = ACE_Proactor::instance ()->handle_events (tv);
if (ACE_Service_Config::reconfig_occurred ())
ACE_Service_Config::reconfigure ();
-
+
// An error has occurred.
else if (result == -1)
- return result;
+ break;
}
- return 0;
+ // Leaving the event loop. Decrement the thread count.
+
+ // Obtain the lock in the MT environments.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
+#endif /* ACE_MT_SAFE */
+
+ // Decrement the thread count.
+ ACE_Proactor::event_loop_thread_count_ --;
+
+ return result;
}
int
ACE_Proactor::end_event_loop (void)
{
ACE_TRACE ("ACE_Proactor::end_event_loop");
+
+ // Obtain the lock, set the end flag and post the wakeup
+ // completions.
+
+ // Obtain the lock in the MT environments.
+#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
+ ACE_Thread_Mutex *lock =
+ ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
+ (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
+#endif /* ACE_MT_SAFE */
+
+ // Set the end flag.
ACE_Proactor::end_event_loop_ = 1;
- // ACE_Proactor::instance()->notify ();
- return 0;
+
+ // Number of completions to post.
+ int how_many = ACE_Proactor::event_loop_thread_count_;
+
+ // Reset the thread count.
+ ACE_Proactor::event_loop_thread_count_ = 0;
+
+ // Post completions to all the threads so that they will all wake
+ // up.
+ return ACE_Proactor::post_wakeup_completions (how_many);
}
int
@@ -802,6 +909,12 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler,
signal_number);
}
+int
+ACE_Proactor::post_wakeup_completions (int how_many)
+{
+ return ACE_Proactor::instance ()->implementation ()->post_wakeup_completions (how_many);
+}
+
void
ACE_Proactor::implementation (ACE_Proactor_Impl *implementation)
{
diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h
index edae3452e1a..16d5b65d08f 100644
--- a/ace/Proactor_Impl.h
+++ b/ace/Proactor_Impl.h
@@ -85,7 +85,7 @@ public:
// methods.
virtual ACE_Asynch_Read_Stream_Impl *create_asynch_read_stream (void) = 0;
- // Create the correct implementation class for doing Asynch_Read_Stream.
+ // Create the correct implementation class for doing Asynch_Read_Stream.
virtual ACE_Asynch_Write_Stream_Impl *create_asynch_write_stream (void) = 0;
// Create the correct implementation class for doing Asynch_Write_Stream.
@@ -186,7 +186,12 @@ public:
int signal_number = 0) = 0;
// Create the correct implementation object for the Timer
// result. POSIX_SIG_Proactor will create a Timer object with a
- // meaningful signal number, if you leave the signal number as 0.
+ // meaningful signal number, if you leave the signal number as 0.
+
+ virtual int post_wakeup_completions (int how_many) = 0;
+ // Post <how_many> completions to the completion port so that all
+ // threads can wake up. This is used in conjunction with the
+ // <run_event_loop>.
};
#endif /* (ACE_WIN32 && ACE_HAS_WINCE) || ACE_HAS_AIO_CALLS */
diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp
index 9e94e4699ec..fc87933c28b 100644
--- a/ace/WIN32_Proactor.cpp
+++ b/ace/WIN32_Proactor.cpp
@@ -16,6 +16,38 @@
#include "ace/WIN32_Proactor.i"
#endif /* __ACE_INLINE__ */
+class ACE_Export ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result
+{
+ // = TITLE
+ //
+ // This is result object is used by the <end_event_loop> of the
+ // ACE_Proactor interface to wake up all the threads blocking
+ // for completions.
+ //
+ // = DESCRIPTION
+ //
+
+public:
+ ACE_WIN32_Wakeup_Completion (ACE_Handler &handler,
+ const void *act = 0,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
+ // Constructor.
+
+ virtual ~ACE_WIN32_Wakeup_Completion (void);
+ // Destructor.
+
+
+ virtual void complete (u_long bytes_transferred = 0,
+ int success = 1,
+ const void *completion_key = 0,
+ u_long error = 0);
+ // This method calls the <handler>'s <handle_wakeup> method.
+};
+
+// *********************************************************************
+
ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads,
int used_with_reactor_event_loop)
: completion_port_ (0),
@@ -469,6 +501,23 @@ ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result)
}
int
+ACE_WIN32_Proactor::post_wakeup_completions (int how_many)
+{
+ ACE_WIN32_Wakeup_Completion *wakeup_completion = 0;
+ for (ssize_t ci = 0; ci < how_many; ci++)
+ {
+ ACE_NEW_RETURN (wakeup_completion,
+ ACE_WIN32_Wakeup_Completion (this->wakeup_handler_),
+ -1);
+
+ if (wakeup_completion->post_completion (this) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+int
ACE_WIN32_Proactor::wake_up_dispatch_threads (void)
{
return 0;
@@ -513,7 +562,7 @@ ACE_WIN32_Asynch_Timer::complete (u_long bytes_transferred,
const void *completion_key,
u_long error)
{
- ACE_UNUSED_ARG (error);
+ ACE_UNUSED_ARG (error);
ACE_UNUSED_ARG (completion_key);
ACE_UNUSED_ARG (success);
ACE_UNUSED_ARG (bytes_transferred);
@@ -521,4 +570,29 @@ ACE_WIN32_Asynch_Timer::complete (u_long bytes_transferred,
this->handler_.handle_time_out (this->time_, this->act ());
}
+// *********************************************************************
+
+ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion (ACE_Handler &handler,
+ const void *act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+ : ACE_Asynch_Result_Impl (),
+ ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
+{
+}
+
+ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void)
+{
+}
+
+void
+ACE_WIN32_Wakeup_Completion::complete (u_long /* bytes_transferred */,
+ int /* success */,
+ const void * /* completion_key */,
+ u_long /* error */)
+{
+ this->handler_.handle_wakeup ();
+}
+
#endif /* ACE_WIN32 */
diff --git a/ace/WIN32_Proactor.h b/ace/WIN32_Proactor.h
index f46999dbf9e..cc7dec6e120 100644
--- a/ace/WIN32_Proactor.h
+++ b/ace/WIN32_Proactor.h
@@ -201,6 +201,10 @@ protected:
// Protect against structured exceptions caused by user code when
// dispatching handles.
+ virtual int post_wakeup_completions (int how_many);
+ // Post <how_many> completions to the completion port so that all
+ // threads can wake up. This is used in conjunction with the
+ // <run_event_loop>.
ACE_HANDLE completion_port_;
// Handle for the completion port. Unix doesnt have completion
@@ -217,6 +221,10 @@ protected:
int used_with_reactor_event_loop_;
// Flag that indicates whether we are used in conjunction with
// Reactor.
+
+ ACE_Handler wakeup_handler_;
+ // Handler to handle the wakeups. This works in conjunction with the
+ // <ACE_Proactor::run_event_loop>.
};
class ACE_Export ACE_WIN32_Asynch_Timer : public ACE_WIN32_Asynch_Result