diff options
63 files changed, 4041 insertions, 222 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index 241ce377e59..3f71dc57132 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,12 +1,12 @@ -Sun Dec 8 14:39:53 1996 Tim H. Harrison <harrison@lambada.cs.wustl.edu> +Sun Dec 8 10:27:19 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> - * ace/Timer_Queue.i: Added a timer_skew_ data member and accessor - methods to ACE_Timer_Queue. Also added an expire method that - calls gettimeofday and adds the timer_skew_ for you. These - changes will help simplify Reactor, Proactor, and ReactorEx - code. + * netsvcs/servers/main.cpp (main): Fixed several small bugs in the + netsvcs main test program. Thanks to Alexandre Karev + <Alexandre.Karev@cern.ch> for finding this. -Sun Dec 8 10:27:19 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * ace/Reactor: Removed the timer_skew_ data member from the + ACE_Reactor since this is now covered by the + ACE_Timer_Queue. * ace/Reactor.cpp (handle_events): Added the new version of the ACE_Countdown_Time to simplify the code. @@ -59,6 +59,14 @@ Sun Dec 8 10:27:19 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> find_thread() and find_hthread(). Thanks to Hamutal Yanay <Hamutal_Yanay@mail.icomverse.com> for suggesting this. +Sun Dec 8 14:39:53 1996 Tim H. Harrison <harrison@lambada.cs.wustl.edu> + + * ace/Timer_Queue: Added a timer_skew_ data member and accessor + methods to ACE_Timer_Queue. Also added an expire method + that calls gettimeofday and adds the timer_skew_ for you. These + changes will help simplify Reactor, Proactor, and ReactorEx + code. + Sat Dec 7 16:55:37 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * ace/Thread_Manager.cpp: Because thread_descriptor_i() only returns diff --git a/ace/Module.cpp b/ace/Module.cpp index 09b562497ab..eefac725ae7 100644 --- a/ace/Module.cpp +++ b/ace/Module.cpp @@ -144,6 +144,8 @@ ACE_Module<ACE_SYNCH_2>::ACE_Module (void) ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::ACE_Module"); this->name ("<unknown>"); // Do nothing... + this->q_pair_[0] = 0; + this->q_pair_[1] = 0; } template <ACE_SYNCH_1> ACE_INLINE diff --git a/ace/OS.cpp b/ace/OS.cpp index 1bf65517830..fcaf0342a2a 100644 --- a/ace/OS.cpp +++ b/ace/OS.cpp @@ -53,7 +53,7 @@ ACE_OS::mutex_lock_cleanup (void *mutex) // as structures... ACE_thread_t ACE_OS::NULL_thread; ACE_hthread_t ACE_OS::NULL_hthread; -ACE_thread_key_t ACE_OS::NULL_key +ACE_thread_key_t ACE_OS::NULL_key; ACE_OS::ACE_OS (void) { @@ -3368,10 +3368,10 @@ ACE_OS::thr_self (ACE_hthread_t &self) { // ACE_TRACE ("ACE_OS::thr_self"); #if defined (ACE_HAS_THREADS) -#if defined (ACE_HAS_PTHREADS) || defined (ACE_HAS_SETKIND_NP) || defined (ACE_HAS_IRIX62_THREADS) - self = ::pthread_self (); -#elif defined (ACE_HAS_DCETHREADS) +#if defined (ACE_HAS_THREAD_SELF) self = ::thread_self (); +#elif defined (ACE_HAS_PTHREADS) || defined (ACE_HAS_SETKIND_NP) + self = ::pthread_self (); #elif defined (ACE_HAS_STHREADS) self = ::thr_self (); #elif defined (ACE_HAS_WTHREADS) @@ -3380,7 +3380,7 @@ ACE_OS::thr_self (ACE_hthread_t &self) self = ::taskIdSelf (); #endif /* ACE_HAS_STHREADS */ #else - self = 1; // Might as well make it the first thread ;-) + self = 1; // Might as well make it the main thread ;-) #endif /* ACE_HAS_THREADS */ } diff --git a/ace/README b/ace/README index 195a38f4691..1ac29929215 100644 --- a/ace/README +++ b/ace/README @@ -107,6 +107,7 @@ ACE_HAS_SYS_SIGLIST Compiler/platform supports sys_siglist array ACE_HAS_TEMPLATE_TYPEDEFS Compiler implements templates that support typedefs inside of classes used as formal arguments to a template class. ACE_HAS_TERM_IOCTLS Platform has terminal ioctl flags like TCGETS and TCSETS. ACE_HAS_THREADS Platform supports threads +ACE_HAS_THREAD_SELF Platform has thread_self() rather than pthread_self() (e.g., DCETHREADS and AIX) ACE_HAS_THREAD_SPECIFIC_STORAGE Compiler/platform has thread-specific storage ACE_HAS_THR_C_DEST The pthread_keycreate() routine *must* take extern C functions. ACE_HAS_THR_C_FUNC The pthread_create() routine *must* take extern C functions. diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp index 6055275c362..d3873ffbb57 100644 --- a/ace/Reactor.cpp +++ b/ace/Reactor.cpp @@ -417,7 +417,6 @@ ACE_Reactor::dump (void) const this->token_.dump (); #endif /* ACE_MT_SAFE */ - this->timer_skew_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } @@ -881,9 +880,8 @@ ACE_Reactor::ACE_Reactor (ACE_Sig_Handler *sh, requeue_position_ (-1), // Requeue at end of waiters by default. initialized_ (0), #if defined (ACE_MT_SAFE) - token_ (*this), + token_ (*this) #endif /* ACE_MT_SAFE */ - timer_skew_ (0, ACE_TIMER_SKEW) { ACE_TRACE ("ACE_Reactor::ACE_Reactor"); if (this->open (ACE_Reactor::DEFAULT_SIZE, 0, sh, tq)) @@ -901,9 +899,8 @@ ACE_Reactor::ACE_Reactor (size_t size, requeue_position_ (-1), // Requeue at end of waiters by default. initialized_ (0), #if defined (ACE_MT_SAFE) - token_ (*this), + token_ (*this) #endif /* ACE_MT_SAFE */ - timer_skew_ (0, ACE_TIMER_SKEW) { ACE_TRACE ("ACE_Reactor::ACE_Reactor"); @@ -1028,30 +1025,15 @@ ACE_Reactor::schedule_timer (ACE_Event_Handler *handler, (handler, arg, ACE_OS::gettimeofday () + delta_time, interval); } -// Main event loop driver that blocks for <how_long> before returning -// (will return earlier if I/O or signal events occur). +// Main event loop driver that blocks for <max_wait_time> before +// returning (will return earlier if I/O or signal events occur). int ACE_Reactor::handle_events (ACE_Time_Value &max_wait_time) { ACE_TRACE ("ACE_Reactor::handle_events"); - // Stash the current time. - ACE_Time_Value prev_time = ACE_OS::gettimeofday (); - - int result = this->handle_events (&max_wait_time); - - // Compute the time while the Reactor is processing. - ACE_Time_Value elapsed_time = ACE_OS::gettimeofday () - prev_time; - - if (max_wait_time > elapsed_time) - max_wait_time = max_wait_time - elapsed_time; - else - { - max_wait_time = ACE_Time_Value::zero; // Used all of timeout. - errno = ETIME; - } - return result; + return this->handle_events (&max_wait_time); } int @@ -1447,9 +1429,7 @@ ACE_Reactor::dispatch (int nfound, // Handle timers first since they may have higher latency // constraints... - if (!this->timer_queue_->is_empty ()) - // Fudge factor accounts for problems with Solaris timers... - this->timer_queue_->expire (ACE_OS::gettimeofday () + this->timer_skew_); + this->timer_queue_->expire (); #if defined (ACE_MT_SAFE) // Check to see if the notify ACE_HANDLE is enabled. If so, it @@ -1532,6 +1512,11 @@ ACE_Reactor::handle_events (ACE_Time_Value *max_wait_time) return -1; #endif /* ACE_MT_SAFE */ + // Stash the current time -- the destructor of this object will + // automatically compute how much time elpased since this method was + // called. + ACE_Countdown_Time countdown (max_wait_time); + ACE_Handle_Set rmask; ACE_Handle_Set wmask; ACE_Handle_Set emask; diff --git a/ace/Reactor.h b/ace/Reactor.h index 913419a8dbb..c3d4280a9cf 100644 --- a/ace/Reactor.h +++ b/ace/Reactor.h @@ -304,14 +304,14 @@ public: // makes it possible to free up the memory and avoid memory leaks. // = Event loop drivers. - virtual int handle_events (ACE_Time_Value *how_long = 0); - // Main event loop driver that blocks for <how_long> before + virtual int handle_events (ACE_Time_Value *max_wait_time = 0); + // Main event loop driver that blocks for <max_wait_time> before // returning (will return earlier if I/O or signal events occur). - // Note that <how_long> can be 0, in which case this method blocks + // Note that <max_wait_time> can be 0, in which case this method blocks // until I/O events or signals occur. - virtual int handle_events (ACE_Time_Value &how_long); - // Main event loop driver that blocks for <how_long> before + virtual int handle_events (ACE_Time_Value &max_wait_time); + // Main event loop driver that blocks for <max_wait_time> before // returning (will return earlier if I/O or signal events occur). // = Register and remove Handlers. @@ -639,10 +639,6 @@ protected: #endif /* ACE_USE_POLL */ private: - ACE_Time_Value timer_skew_; - // Adjusts for skew that occurs in certain OS timers (e.g., - // Solaris). - // Deny access since member-wise won't work... ACE_Reactor (const ACE_Reactor &); ACE_Reactor &operator = (const ACE_Reactor &); diff --git a/ace/Thread_Manager.cpp b/ace/Thread_Manager.cpp index fa2267a443c..f67498db3ce 100644 --- a/ace/Thread_Manager.cpp +++ b/ace/Thread_Manager.cpp @@ -25,23 +25,10 @@ ACE_Thread_Descriptor::ACE_Thread_Descriptor (void) ACE_TRACE ("ACE_Thread_Descriptor::ACE_Thread_Descriptor"); } -// Return the thread descriptor (indexed by ACE_thread_t). - -int -ACE_Thread_Manager::thread_descriptor_i (ACE_thread_t thr_id, - ACE_Thread_Descriptor &descriptor) -{ - ACE_TRACE ("ACE_Thread_Descriptor::thread_descriptor_i"); - - for (size_t i = 0; i < this->current_count_; i++) - if (ACE_OS::thr_equal (thr_id, this->thr_table_[i].thr_id_)) - { - descriptor = this->thr_table_[i]; - return 0; - } - - return -1; -} +// The following macro simplifies subsequence code. +#define ACE_FIND(OP,INDEX) \ + int INDEX = OP; \ + if (INDEX == -1) return -1 int ACE_Thread_Manager::thread_descriptor (ACE_thread_t thr_id, @@ -50,26 +37,11 @@ ACE_Thread_Manager::thread_descriptor (ACE_thread_t thr_id, ACE_TRACE ("ACE_Thread_Descriptor::thread_descriptor"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); - return this->thread_descriptor_i (thr_id, descriptor); + ACE_FIND (this->find_thread (thr_id), index); + descriptor = this->thr_table_[index]; + return 0; } -// Return the thread descriptor (indexed by ACE_hthread_t). - -int -ACE_Thread_Manager::hthread_descriptor_i (ACE_hthread_t thr_handle, - ACE_Thread_Descriptor &descriptor) -{ - ACE_TRACE ("ACE_Thread_Descriptor::hthread_descriptor_i"); - - for (size_t i = 0; i < this->current_count_; i++) - if (ACE_OS::thr_cmp (thr_handle, this->thr_table_[i].thr_handle_)) - { - descriptor = this->thr_table_[i]; - return 0; - } - - return -1; -} int ACE_Thread_Manager::hthread_descriptor (ACE_hthread_t thr_handle, @@ -78,7 +50,9 @@ ACE_Thread_Manager::hthread_descriptor (ACE_hthread_t thr_handle, ACE_TRACE ("ACE_Thread_Descriptor::hthread_descriptor"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); - return this->hthread_descriptor_i (thr_handle, descriptor); + ACE_FIND (this->find_hthread (thr_handle), index); + descriptor = this->thr_table_[index]; + return 0; } // Return the thread descriptor (indexed by ACE_hthread_t). @@ -95,13 +69,10 @@ ACE_Thread_Manager::thr_self (ACE_hthread_t &self) // Wasn't in the cache, so we'll have to look it up and cache it. if (handle == 0) { - ACE_Thread_Descriptor td; ACE_thread_t id = ACE_OS::thr_self (); - if (this->thread_descriptor_i (id, td) == -1) - return -1; - - handle = &td.thr_handle_; + ACE_FIND (this->find_thread (id), index); + handle = &this->thr_table_[index].thr_handle_; // Update the TSS cache. ACE_LOG_MSG->thr_handle (handle); @@ -346,6 +317,35 @@ ACE_Thread_Manager::append_thr (ACE_thread_t t_id, } } +// Return the thread descriptor (indexed by ACE_hthread_t). + +int +ACE_Thread_Manager::find_hthread (ACE_hthread_t h_id) +{ + ACE_TRACE ("ACE_Thread_Descriptor::find_hthread"); + + for (size_t i = 0; i < this->current_count_; i++) + if (ACE_OS::thr_cmp (h_id, this->thr_table_[i].thr_handle_)) + return i; + + return -1; +} + +// Locate the index in the table associated with <t_id>. Must be +// called with the lock held. + +int +ACE_Thread_Manager::find_thread (ACE_thread_t t_id) +{ + ACE_TRACE ("ACE_Thread_Manager::find_thread"); + + for (size_t i = 0; i < this->current_count_; i++) + if (ACE_OS::thr_equal (t_id, this->thr_table_[i].thr_id_)) + return i; + + return -1; +} + // Insert a thread into the pool (checks for duplicates and doesn't // allow them to be inserted twice). @@ -359,7 +359,7 @@ ACE_Thread_Manager::insert_thr (ACE_thread_t t_id, // Check for duplicates and bail out if they're already // registered... - if (this->find (t_id) != -1) + if (this->find_thread (t_id) != -1) return -1; if (grp_id == -1) @@ -451,26 +451,10 @@ ACE_Thread_Manager::kill_thr (int i, int signum) return 0; } -// Locate the index in the table associated with <t_id>. Must be -// called with the lock held. - -int -ACE_Thread_Manager::find (ACE_thread_t t_id) -{ - ACE_TRACE ("ACE_Thread_Manager::find"); - - for (size_t i = 0; i < this->current_count_; i++) - if (ACE_OS::thr_equal (t_id, this->thr_table_[i].thr_id_)) - return i; - - return -1; -} - #define ACE_EXECUTE_OP(OP) \ ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); \ - int i = this->find (t_id); \ - if (i == -1) return -1; \ - return OP (i); + ACE_FIND (this->find_thread (t_id), index); \ + return OP (index); // Suspend a single thread. @@ -527,11 +511,8 @@ ACE_Thread_Manager::check_state (ACE_Thread_State state, // Wasn't in the cache, so we'll have to look it up. if (thr_state == 0) { - int i = this->find (id); - - if (i == -1) - return -1; - thr_state = &this->thr_table_[i].thr_state_; + ACE_FIND (this->find_thread (id), index); + thr_state = &this->thr_table_[index].thr_state_; if (self_check) // Update the TSS cache. ACE_LOG_MSG->thr_state (thr_state); @@ -539,11 +520,8 @@ ACE_Thread_Manager::check_state (ACE_Thread_State state, #else // Turn off caching for the time being until we figure out // how to do it correctly in the face of deletions... - int i = this->find (id); - - if (i == -1) - return -1; - thr_state = &this->thr_table_[i].thr_state_; + ACE_FIND (this->find_thread (id), index); + thr_state = &this->thr_table_[index].thr_state_; #endif /* 0 */ return *thr_state == state; } @@ -583,10 +561,8 @@ ACE_Thread_Manager::get_grp (ACE_thread_t t_id, int &grp_id) ACE_TRACE ("ACE_Thread_Manager::get_grp"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); - int i = this->find (t_id); - if (i == -1) - return -1; - grp_id = this->thr_table_[i].grp_id_; + ACE_FIND (this->find_thread (t_id), index); + grp_id = this->thr_table_[index].grp_id_; return 0; } @@ -598,10 +574,8 @@ ACE_Thread_Manager::set_grp (ACE_thread_t t_id, int grp_id) ACE_TRACE ("ACE_Thread_Manager::set_grp"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); - int i = this->find (t_id); - if (i == -1) - return -1; - this->thr_table_[i].grp_id_ = grp_id; + ACE_FIND (this->find_thread (t_id), index); + this->thr_table_[index].grp_id_ = grp_id; return 0; } @@ -717,7 +691,7 @@ ACE_Thread_Manager::exit (void *status, int do_thr_exit) ACE_TRACE ("ACE_Thread_Manager::exit"); ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0)); - int i = this->find (ACE_Thread::self ()); + int i = this->find_thread (ACE_Thread::self ()); // Locate thread id. if (i != -1) @@ -993,12 +967,8 @@ ACE_Thread_Manager::get_grp (ACE_Task_Base *task, int &grp_id) ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1)); - int i = this->find_task (task); - - if (i == -1) - return -1; - - grp_id = this->thr_table_[i].grp_id_; + ACE_FIND (this->find_task (task), index); + grp_id = this->thr_table_[index].grp_id_; return 0; } diff --git a/ace/Thread_Manager.h b/ace/Thread_Manager.h index 547fa0285b1..74595235013 100644 --- a/ace/Thread_Manager.h +++ b/ace/Thread_Manager.h @@ -286,10 +286,14 @@ private: ACE_Task_Base *task = 0); // Create a new thread (must be called with locks held). - int find (ACE_thread_t t_id); + int find_thread (ACE_thread_t t_id); // Locate the index of the table slot occupied by <t_id>. Returns // -1 if <t_id> is not in the table doesn't contain <t_id>. + int find_hthread (ACE_hthread_t h_id); + // Locate the index of the table slot occupied by <h_id>. Returns + // -1 if <h_id> is not in the table doesn't contain <h_id>. + int find_task (ACE_Task_Base *task, int index = -1); // Locate the index of the table slot occupied by <task>. Returns // -1 if <task> is not in the table doesn't contain <task>. @@ -307,18 +311,6 @@ private: void remove_thr (int i); // Remove thread from the table. - int thread_descriptor_i (ACE_thread_t, ACE_Thread_Descriptor &); - // Implements the lookup function for the <thread_descriptor>. Note - // that this version assumes that the lock is held. We need this to - // avoid intra-class method deadlock on systems that lack recursive - // mutexes. - - int hthread_descriptor_i (ACE_hthread_t, ACE_Thread_Descriptor &); - // Implements the lookup function for the <hthread_descriptor>. - // Note that this version assumes that the lock is held. We need - // this to avoid intra-class method deadlock on systems that lack - // recursive mutexes. - // = The following four methods implement a simple scheme for operating on a collection of threads atomically. typedef int (ACE_Thread_Manager::*THR_FUNC)(int, int); diff --git a/ace/Time_Value.cpp b/ace/Time_Value.cpp index 75406542c14..d265e4513fe 100644 --- a/ace/Time_Value.cpp +++ b/ace/Time_Value.cpp @@ -14,7 +14,7 @@ const ACE_Time_Value ACE_Time_Value::zero; ACE_ALLOC_HOOK_DEFINE(ACE_Time_Value) -/* Initializes the ACE_Time_Value object from a timeval. */ +// Initializes the ACE_Time_Value object from a timeval. ACE_Time_Value::ACE_Time_Value (const timeval &tv) { @@ -229,3 +229,46 @@ ACE_Time_Value::normalize (void) } #endif } + +int +ACE_Countdown_Time::start (void) +{ + this->start_time_ = ACE_OS::gettimeofday (); + this->stopped_ = 0; + return 0; +} + +int +ACE_Countdown_Time::stop (void) +{ + if (this->max_wait_time_ != 0 && this->stopped_ == 0) + { + ACE_Time_Value elapsed_time = + ACE_OS::gettimeofday () - this->start_time_; + + if (*this->max_wait_time_ > elapsed_time) + *this->max_wait_time_ -= elapsed_time; + else + { + // Used all of timeout. + *this->max_wait_time_ = ACE_Time_Value::zero; + errno = ETIME; + } + this->stopped_ = 1; + } + return 0; +} + +ACE_Countdown_Time::ACE_Countdown_Time (ACE_Time_Value *max_wait_time) + : max_wait_time_ (max_wait_time), + stopped_ (0) +{ + if (max_wait_time != 0) + this->start (); +} + +ACE_Countdown_Time::~ACE_Countdown_Time (void) +{ + this->stop (); +} + diff --git a/ace/Time_Value.h b/ace/Time_Value.h index ec2d5aebabf..823d87d94c5 100644 --- a/ace/Time_Value.h +++ b/ace/Time_Value.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -248,6 +247,40 @@ private: // Microseconds. }; +class ACE_Export ACE_Countdown_Time + // = TITLE + // Keeps track of the amount of elapsed time. + // + // = DESCRIPTION + // This class has a side-effect on the <max_wait_time> -- every + // time the <stop> method is called the <max_wait_time> is + // updated. +{ +public: + // = Initialization and termination methods. + ACE_Countdown_Time (ACE_Time_Value *max_wait_time); + // Cache the <max_wait_time> and call <start>. + + ~ACE_Countdown_Time (void); + // Call <stop>. + + int start (void); + // Cache the start value. + + int stop (void); + // Compute the elapsed time. + +private: + ACE_Time_Value *max_wait_time_; + // Maximum time we were willing to wait. + + ACE_Time_Value start_time_; + // Beginning of the start time. + + int stopped_; + // Keeps track of whether we've already been stopped. +}; + #if defined (__ACE_INLINE__) #include "ace/Time_Value.i" #endif /* __ACE_INLINE__ */ diff --git a/ace/config-aix-4.1.x.h b/ace/config-aix-4.1.x.h index 3c3bc4c866d..fd7801cbaea 100644 --- a/ace/config-aix-4.1.x.h +++ b/ace/config-aix-4.1.x.h @@ -30,6 +30,7 @@ #define ACE_HAS_TIUSER_H #define ACE_TEMPLATES_REQUIRE_PRAGMA #define ACE_HAS_THREAD_SPECIFIC_STORAGE +#define ACE_HAS_THREAD_SELF #define ACE_HAS_AUTOMATIC_INIT_FINI #define ACE_HAS_CHARPTR_DL #define ACE_HAS_SVR4_DYNAMIC_LINKING @@ -92,7 +93,7 @@ // EYE assume it does for now. #define ACE_LACKS_PTHREAD_THR_SIGSETMASK -#define ACE_HAS_DCETHREADS +#define ACE_HAS_PTHREADS #define ACE_PTHREADS_MAP // include there diff --git a/ace/config-osf1-3.2.h b/ace/config-osf1-3.2.h index 13e627d3b7a..776b85d17b5 100644 --- a/ace/config-osf1-3.2.h +++ b/ace/config-osf1-3.2.h @@ -100,6 +100,7 @@ #define ACE_LACKS_PTHREAD_THR_SIGSETMASK // ACE supports POSIX Pthreads. #define ACE_HAS_DCETHREADS +#define ACE_HAS_THREAD_SELF // Explicit dynamic linking permits "lazy" symbol resolution #define ACE_HAS_RTLD_LAZY_V diff --git a/ace/config-win32-msvc4.0.h b/ace/config-win32-msvc4.0.h index 3661671180c..cd76b85a3d2 100644 --- a/ace/config-win32-msvc4.0.h +++ b/ace/config-win32-msvc4.0.h @@ -80,12 +80,18 @@ #if defined (ACE_HAS_WINSOCK2) #if !defined (_WINSOCK2API_) #include /**/ <winsock2.h> // will also include windows.h, if not present +#if defined (_MSC_VER) +#pragma comment(lib, "ws2_32.lib") +#endif /* _MSC_VER */ #endif /* _WINSOCK2API */ #define ACE_WSOCK_VERSION 2, 0 #else #if !defined (_WINSOCKAPI_) #include /**/ <winsock.h> // will also include windows.h, if not present +#if defined (_MSC_VER) +#pragma comment(lib, "wsock32.lib") +#endif /* _MSC_VER */ #endif /* _WINSOCKAPI */ // Version 1.1 of WinSock diff --git a/examples/Threads/Makefile b/examples/Threads/Makefile index 98019d0accd..3ac8e6ac171 100644 --- a/examples/Threads/Makefile +++ b/examples/Threads/Makefile @@ -8,26 +8,26 @@ # Local macros #---------------------------------------------------------------------------- -BIN = test_auto_event \ - test_barrier1 \ - test_barrier2 \ - test_future1 \ - test_future2 \ - test_manual_event \ - test_process_mutex \ - test_process_semaphore \ - test_reader_writer \ - test_recursive_mutex \ - test_task_one \ - test_task_two \ - test_task_three \ - test_task_four \ - test_thread_manager \ - test_thread_pool \ - test_thread_specific \ - test_tss1 \ - test_tss2 \ - test_token +BIN = auto_event \ + barrier1 \ + barrier2 \ + future1 \ + future2 \ + manual_event \ + process_mutex \ + process_semaphore \ + reader_writer \ + recursive_mutex \ + task_one \ + task_two \ + task_three \ + task_four \ + thread_manager \ + thread_pool \ + thread_specific \ + tss1 \ + tss2 \ + token LSRC = $(addsuffix .cpp,$(BIN)) VLDLIBS = $(LDLIBS:%=%$(VAR)) @@ -57,7 +57,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/test_auto_event.o .shobj/test_auto_event.so: test_auto_event.cpp \ +.obj/auto_event.o .shobj/auto_event.so: auto_event.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -110,7 +110,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Singleton.h -.obj/test_barrier1.o .shobj/test_barrier1.so: test_barrier1.cpp \ +.obj/barrier1.o .shobj/barrier1.so: barrier1.cpp \ $(WRAPPER_ROOT)/ace/Synch.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -162,7 +162,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_barrier2.o .shobj/test_barrier2.so: test_barrier2.cpp \ +.obj/barrier2.o .shobj/barrier2.so: barrier2.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -218,7 +218,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_future1.o .shobj/test_future1.so: test_future1.cpp \ +.obj/future1.o .shobj/future1.so: future1.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -256,7 +256,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Method_Object.h \ $(WRAPPER_ROOT)/ace/Activation_Queue.h \ $(WRAPPER_ROOT)/ace/Auto_Ptr.h -.obj/test_future2.o .shobj/test_future2.so: test_future2.cpp \ +.obj/future2.o .shobj/future2.so: future2.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -294,7 +294,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Method_Object.h \ $(WRAPPER_ROOT)/ace/Activation_Queue.h \ $(WRAPPER_ROOT)/ace/Auto_Ptr.h -.obj/test_manual_event.o .shobj/test_manual_event.so: test_manual_event.cpp \ +.obj/manual_event.o .shobj/manual_event.so: manual_event.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -346,7 +346,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_process_mutex.o .shobj/test_process_mutex.so: test_process_mutex.cpp \ +.obj/process_mutex.o .shobj/process_mutex.so: process_mutex.cpp \ $(WRAPPER_ROOT)/ace/Synch.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -367,7 +367,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Set.h -.obj/test_process_semaphore.o .shobj/test_process_semaphore.so: test_process_semaphore.cpp \ +.obj/process_semaphore.o .shobj/process_semaphore.so: process_semaphore.cpp \ $(WRAPPER_ROOT)/ace/Synch.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -388,7 +388,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Set.h -.obj/test_reader_writer.o .shobj/test_reader_writer.so: test_reader_writer.cpp \ +.obj/reader_writer.o .shobj/reader_writer.so: reader_writer.cpp \ $(WRAPPER_ROOT)/ace/Synch.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -410,7 +410,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Thread.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Get_Opt.h -.obj/test_recursive_mutex.o .shobj/test_recursive_mutex.so: test_recursive_mutex.cpp \ +.obj/recursive_mutex.o .shobj/recursive_mutex.so: recursive_mutex.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -463,7 +463,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Get_Opt.h -.obj/test_task_one.o .shobj/test_task_one.so: test_task_one.cpp \ +.obj/task_one.o .shobj/task_one.so: task_one.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -519,7 +519,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_task_two.o .shobj/test_task_two.so: test_task_two.cpp \ +.obj/task_two.o .shobj/task_two.so: task_two.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -575,7 +575,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_task_three.o .shobj/test_task_three.so: test_task_three.cpp \ +.obj/task_three.o .shobj/task_three.so: task_three.cpp \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ $(WRAPPER_ROOT)/ace/ACE.h \ @@ -631,7 +631,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Message_Queue.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h -.obj/test_task_four.o .shobj/test_task_four.so: test_task_four.cpp \ +.obj/task_four.o .shobj/task_four.so: task_four.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -687,7 +687,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_thread_manager.o .shobj/test_thread_manager.so: test_thread_manager.cpp \ +.obj/thread_manager.o .shobj/thread_manager.so: thread_manager.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -739,7 +739,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_thread_pool.o .shobj/test_thread_pool.so: test_thread_pool.cpp \ +.obj/thread_pool.o .shobj/thread_pool.so: thread_pool.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -795,7 +795,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_thread_specific.o .shobj/test_thread_specific.so: test_thread_specific.cpp \ +.obj/thread_specific.o .shobj/thread_specific.so: thread_specific.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -847,7 +847,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h -.obj/test_tss1.o .shobj/test_tss1.so: test_tss1.cpp \ +.obj/tss1.o .shobj/tss1.so: tss1.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -903,7 +903,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Message_Queue.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h -.obj/test_tss2.o .shobj/test_tss2.so: test_tss2.cpp \ +.obj/tss2.o .shobj/tss2.so: tss2.cpp \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -938,7 +938,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Mem_Map.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ $(WRAPPER_ROOT)/ace/Token.h -.obj/test_token.o .shobj/test_token.so: test_token.cpp \ +.obj/token.o .shobj/token.so: token.cpp \ $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Synch.h \ $(WRAPPER_ROOT)/ace/ACE.h \ diff --git a/examples/Threads/auto_event.cpp b/examples/Threads/auto_event.cpp new file mode 100644 index 00000000000..4f83d50db71 --- /dev/null +++ b/examples/Threads/auto_event.cpp @@ -0,0 +1,113 @@ +// $Id$ + +// This test shows the use of an ACE_Auto_Event as a signaling +// mechanism. Two threads are created (one a reader, the other a +// writer). The reader waits till the writer has completed +// calculations. Upon waking up the reader prints the data calculated +// by the writer. The writer thread calculates the value and signals +// the reader when the calculation completes. + +#include "ace/Service_Config.h" +#include "ace/Synch.h" +#include "ace/Singleton.h" +#include "ace/Thread_Manager.h" + +#if defined (ACE_HAS_THREADS) +// Shared event between reader and writer. The ACE_Thread_Mutex is +// necessary to make sure that only one ACE_Auto_Event is created. +// The default constructor for ACE_Auto_Event sets it initially into +// the non-signaled state. + +typedef ACE_Singleton <ACE_Auto_Event, ACE_Thread_Mutex> EVENT; + +// work time for writer +static int work_time; + +// Reader thread. +static void * +reader (void *arg) +{ + // Shared data via a reference. + int& data = *(int *) arg; + + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + + // Wait for writer to complete. + + ACE_DEBUG ((LM_DEBUG, "(%t) reader: waiting...... \n")); + + if (EVENT::instance ()->wait () == -1) + { + ACE_ERROR ((LM_ERROR, "thread wait failed")); + ACE_OS::exit (0); + } + + // Read shared data. + ACE_DEBUG ((LM_DEBUG, "(%t) reader: value of data is: %d \n", data)); + + return 0; +} + +// Writer thread. +static void * +writer (void *arg) +{ + int& data = *(int *) arg; + + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + + // Calculate (work). + ACE_DEBUG ((LM_DEBUG, "(%t) writer: working for %d secs\n", work_time)); + ACE_OS::sleep (work_time); + + // Write shared data. + data = 42; + + // Wake up reader. + ACE_DEBUG ((LM_DEBUG, "(%t) writer: calculation complete, waking reader\n")); + + if (EVENT::instance ()->signal () == -1) + { + ACE_ERROR ((LM_ERROR, "thread wait failed")); + ACE_OS::exit (0); + } + + return 0; +} + +int +main (int argc, char **argv) +{ + // Shared data: set by writer, read by reader. + int data; + + // Work time for writer. + work_time = argc == 2 ? atoi (argv[1]) : 5; + + // threads manager + ACE_Thread_Manager& tm = *ACE_Service_Config::thr_mgr (); + + // Create reader thread. + if (tm.spawn (reader, (void *) &data) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "thread create for reader failed"), -1); + + // Create writer thread. + if (tm.spawn (writer, (void *) &data) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "thread create for writer failed"), -1); + + // Wait for both. + if (tm.wait () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "thread wait failed"), -1); + else + ACE_DEBUG ((LM_ERROR, "graceful exit\n")); + + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/barrier1.cpp b/examples/Threads/barrier1.cpp new file mode 100644 index 00000000000..6b213819ca0 --- /dev/null +++ b/examples/Threads/barrier1.cpp @@ -0,0 +1,84 @@ +// This test program illustrates how the ACE barrier synchronization +// $Id$ + +// mechanisms work. + + +#include "ace/Synch.h" +#include "ace/Thread_Manager.h" +#include "ace/Service_Config.h" + +#if defined (ACE_HAS_THREADS) + +struct Tester_Args + // = TITLE + // These arguments are passed into each test thread. +{ + Tester_Args (ACE_Barrier &tb, int i) + : tester_barrier_ (tb), + n_iterations_ (i) {} + + ACE_Barrier &tester_barrier_; + // Reference to the tester barrier. This controls each miteration of + // the tester function running in every thread. + + int n_iterations_; + // Number of iterations to run. +}; + +// Iterate <n_iterations> time printing off a message and "waiting" +// for all other threads to complete this iteration. + +static void * +tester (Tester_Args *args) +{ + // Keeps track of thread exit. + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + + for (int iterations = 1; + iterations <= args->n_iterations_; + iterations++) + { + ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d\n", iterations)); + + // Block until all other threads have waited, then continue. + args->tester_barrier_.wait (); + } + + return 0; +} + +// Default number of threads to spawn. +static const int DEFAULT_ITERATIONS = 5; + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; + int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS; + + ACE_Barrier tester_barrier (n_threads); + + Tester_Args args (tester_barrier, n_iterations); + + if (ACE_Service_Config::thr_mgr ()->spawn_n + (n_threads, ACE_THR_FUNC (tester), + (void *) &args, THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1); + + // Wait for all the threads to reach their exit point. + ACE_Service_Config::thr_mgr ()->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%t) done\n")); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/barrier2.cpp b/examples/Threads/barrier2.cpp new file mode 100644 index 00000000000..30190ace443 --- /dev/null +++ b/examples/Threads/barrier2.cpp @@ -0,0 +1,269 @@ +// $Id$ + +// generic_worker_task.cpp +// +// This test program illustrates how the ACE task workers/barrier +// synchronization mechanisms work in conjunction with the ACE_Task +// and the ACE_Thread_Manager. The manual flag not set simulates +// user input, if set input comes from stdin until RETURN only is +// entered which stops all workers via a message block of length +// 0. This is an alernative shutdown of workers compared to queue +// deactivate. The delay_put flag simulates a delay between the +// shutdown puts. All should work with this flag disabled! The +// BARRIER_TYPE is supposed to enable/disable barrier sync on each svc +// a worker has done. + +#include <iostream.h> +#include "ace/Task.h" +#include "ace/Service_Config.h" + +#if defined (ACE_HAS_THREADS) + +#define BARRIER_TYPE ACE_Null_Barrier +//#define BARRIER_TYPE ACE_Barrier +//#ifdef delay_put +//#define manual + +template <class BARRIER> +class Worker_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + + Worker_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int inp_serialize = 1); + + virtual int Producer (void); + // produce input for workers + + virtual int input (ACE_Message_Block *mb); + // Fill one message block via a certain input strategy. + + virtual int output (ACE_Message_Block *mb); + // Forward one message block via a certain output strategy to the + // next task if any. + + virtual int service (ACE_Message_Block *mb, int iter); + // Perform one message block dependant service. + +private: + virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0); + + virtual int svc (void); + // Iterate <n_iterations> time printing off a message and "waiting" + // for all other threads to complete this iteration. + + // = Not needed for this test. + virtual int open (void *) { return 0; } + virtual int close (u_long) {ACE_DEBUG ((LM_DEBUG,"(%t) in close of worker\n")); return 0; } + + int nt_; + // Number of worker threads to run. + int inp_serialize_; + + BARRIER barrier_; +}; + +template <class BARRIER> +Worker_Task<BARRIER>::Worker_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int inp_serialize) + : ACE_Task<ACE_MT_SYNCH> (thr_mgr), + barrier_ (n_threads) +{ + nt_ = n_threads; + // Create worker threads. + inp_serialize_ = inp_serialize; + + // Use the task's message queue for serialization (default) or run + // service in the context of the caller thread. + + if (nt_ > 0 && inp_serialize == 1) + if (this->activate (THR_NEW_LWP, n_threads) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); +} + +// Simply enqueue the Message_Block into the end of the queue. + +template <class BARRIER> int +Worker_Task<BARRIER>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +{ + int result; + if (this->inp_serialize_) + result = this->putq (mb, tv); + else + { + static int iter = 0; + result = this->service (mb, iter++); + + if (this->output (mb) < 0) + ACE_DEBUG ((LM_DEBUG, "(%t) output not connected!\n")); + + delete mb; + } + return result; +} + +template <class BARRIER> int +Worker_Task<BARRIER>::service (ACE_Message_Block *mb, int iter) +{ + int length = mb->length (); + + if (length > 0) + { + ACE_DEBUG ((LM_DEBUG,"(%t) in iteration %d len=%d text got:\n",iter,length)); + ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); + ACE_DEBUG ((LM_DEBUG,"\n")); + } + return 0; +} + +// Iterate <n_iterations> time printing off a message and "waiting" +// for all other threads to complete this iteration. + +template <class BARRIER> int +Worker_Task<BARRIER>::svc (void) +{ + // Note that the ACE_Task::svc_run () method automatically adds us + // to the Thread_Manager when the thread begins. + + // Keep looping, reading a message out of the queue, until we get a + // message with a length == 0, which signals us to quit. + + for (int iter = 1; ;iter++) + { + ACE_Message_Block *mb = 0; + + int result = this->getq (mb); + + if (result == -1) + { + ACE_ERROR ((LM_ERROR, + "(%t) in iteration %d\n", "error waiting for message in iteration", iter)); + break; + } + + int length = mb->length (); + this->service (mb,iter); + + if (length == 0) + { + ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d got quit, exit!\n", iter)); + delete mb; + break; + } + + this->barrier_.wait (); + this->output (mb); + + delete mb; + } + + // Note that the ACE_Task::svc_run () method automatically removes + // us from the Thread_Manager when the thread exits. + + return 0; +} + +template <class BARRIER> int +Worker_Task<BARRIER>::Producer (void) +{ + // Keep reading stdin, until we reach EOF. + + for (;;) + { + // Allocate a new message. + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + + if (this->input (mb) == -1) + return -1; + } + + return 0; +} + +template <class BARRIER>int +Worker_Task<BARRIER>::output (ACE_Message_Block *mb) +{ + return this->put_next (mb); +} + +template <class BARRIER>int +Worker_Task<BARRIER>::input (ACE_Message_Block *mb) +{ + ACE_Message_Block *mb1; + +#ifndef manual + static int l= 0; + char str[]="kalle"; + strcpy (mb->rd_ptr (),str); + int n=strlen (str); + if (l==1000) + n=1; + l++; + if (l==0 || (l%100 == 0)) ACE_OS::sleep (5); + if (n <= 1) +#else + ACE_DEBUG ((LM_DEBUG,"(%t) press chars and enter to put a new message into task queue ...\n")); + if ((n = read (0, mb->rd_ptr (), mb->size ())) <= 1) +#endif // manual + { + // Send a shutdown message to the waiting threads and exit. + // cout << "\nvor loop, dump of task msg queue:\n" << endl; + // this->msg_queue ()->dump (); + for (int i=0;i<nt_;i++) + { + ACE_DEBUG ((LM_DEBUG,"(%t) eof, sending block for thread=%d\n",i+1)); + mb1 = new ACE_Message_Block (2); + mb1->length (0); + if (this->put (mb1) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put")); +#ifdef delay_put + ACE_OS::sleep (1); // this sleep helps to shutdown correctly -> was an error! +#endif /* delay_put */ + } + // cout << "\nnach loop, dump of task msg queue:\n" << endl; + // this->msg_queue ()->dump (); + return (-1); + } + else + { + // Send a normal message to the waiting threads and continue producing. + mb->wr_ptr (n); + if (this->put (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put")); + } + return 0; +} + +int +main (int argc, char *argv[]) +{ + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; + + ACE_DEBUG ((LM_DEBUG,"(%t) worker threads running=%d\n",n_threads)); + + + Worker_Task<BARRIER_TYPE> *worker_task = + new Worker_Task<BARRIER_TYPE> (ACE_Service_Config::thr_mgr (), + /*n_threads*/ 0,0); + + worker_task->Producer (); + + // Wait for all the threads to reach their exit point. + ACE_DEBUG ((LM_DEBUG,"(%t) waiting with thread manager ...\n")); + ACE_Service_Config::thr_mgr ()->wait (); + ACE_DEBUG ((LM_DEBUG,"(%t) delete worker task ...\n")); + + delete worker_task; + ACE_DEBUG ((LM_DEBUG,"(%t) done correct!\n")); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/cancel.cpp b/examples/Threads/cancel.cpp new file mode 100644 index 00000000000..a9d12bea579 --- /dev/null +++ b/examples/Threads/cancel.cpp @@ -0,0 +1,72 @@ +// Test out the cooperative thread cancellation mechanisms provided by +// $Id$ + +// the ACE_Thread_Manager. + +#include "ace/Service_Config.h" +#include "ace/Thread_Manager.h" + +#if defined (ACE_HAS_THREADS) + +static void * +worker (int iterations) +{ + for (int i = 0; i < iterations; i++) + { + if ((i % 10) == 0 + && (ACE_Service_Config::thr_mgr ()->testcancel (ACE_Thread::self ()) != 0)) + { + ACE_DEBUG ((LM_DEBUG, "(%t) has been cancelled before iteration!\n", i)); + break; + } + } + + return 0; +} + +static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS; +static const int DEFAULT_ITERATIONS = 100000; + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon; + + daemon.open (argv[0]); + + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS; + int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS; + + ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr (); + + int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker), + (void *) n_iterations, + THR_NEW_LWP | THR_DETACHED); + + // Wait for 2 seconds and then suspend every thread in the group. + ACE_OS::sleep (2); + thr_mgr->suspend_grp (grp_id); + + // Wait for 2 more seconds and then resume every thread in the + // group. + ACE_OS::sleep (ACE_Time_Value (2)); + thr_mgr->resume_grp (grp_id); + + // Wait for 2 more seconds and then send a SIGINT to every thread in + // the group. + ACE_OS::sleep (ACE_Time_Value (2)); + thr_mgr->kill_grp (grp_id, SIGINT); + + // Wait for 2 more seconds and then exit (which should kill all the + // threads)! + ACE_OS::sleep (ACE_Time_Value (2)); + + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, "threads not supported on this platform\n"), -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/future1.cpp b/examples/Threads/future1.cpp new file mode 100644 index 00000000000..ea295e487e1 --- /dev/null +++ b/examples/Threads/future1.cpp @@ -0,0 +1,420 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Test_Future.cpp +// +// = DESCRIPTION +// This example tests the ACE Future. +// +// = AUTHOR +// Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt +// <schmidt@cs.wustl.edu> +// +// ============================================================================ + +#include <math.h> +#include "ace/Task.h" + +#include "ace/Synch.h" +#include "ace/Message_Queue.h" +#include "ace/Future.h" +#include "ace/Method_Object.h" +#include "ace/Activation_Queue.h" +#include "ace/Auto_Ptr.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Atomic_Op<ACE_Thread_Mutex, u_long> ATOMIC_INT; + +// a counter for the tasks.. +static ATOMIC_INT task_count (0); + +// a counter for the futures.. +static ATOMIC_INT future_count (0); +static ATOMIC_INT future_no (0); + +// a counter for the capsules.. +static ATOMIC_INT capsule_count (0); +static ATOMIC_INT capsule_no (0); + +// a counter for the method objects... +static ATOMIC_INT methodobject_count (0); +static ATOMIC_INT methodobject_no (0); + +class Scheduler : public ACE_Task<ACE_MT_SYNCH> + // = TITLE + // Active Object Scheduler. +{ + friend class Method_ObjectWork; +public: + Scheduler (const char *, Scheduler * = 0); + ~Scheduler (void); + + virtual int open (void *args = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); + virtual int svc (void); + + ACE_Future<double> work (double param, int count); + ACE_Future<const char*> name (void); + void end (void); + + double work_i (double, int); + const char *name_i (void); + +private: + char *name_; + ACE_Activation_Queue activation_queue_; + Scheduler *scheduler_; + +}; + +class Method_Object_work : public ACE_Method_Object + // = TITLE + // Reification of the <work> method. +{ +public: + Method_Object_work (Scheduler *, double, int, ACE_Future<double> &); + ~Method_Object_work (void); + virtual int call (void); + +private: + Scheduler *scheduler_; + double param_; + int count_; + ACE_Future<double> future_result_; +}; + +Method_Object_work::Method_Object_work (Scheduler* new_Scheduler, + double new_param, + int new_count, + ACE_Future<double> &new_result) + : scheduler_ (new_Scheduler), + param_ (new_param), + count_ (new_count), + future_result_ (new_result) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) Method_Object_work created\n")); +} + +Method_Object_work::~Method_Object_work (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Method_Object_work will be deleted.\n")); +} + + +int +Method_Object_work::call (void) +{ + return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_)); +} + +class Method_Object_name : public ACE_Method_Object + // = TITLE + // Reification of the <name> method. +{ +public: + Method_Object_name (Scheduler *, ACE_Future<const char*> &); + ~Method_Object_name (void); + virtual int call (void); + +private: + Scheduler *scheduler_; + ACE_Future<const char*> future_result_; +}; + +Method_Object_name::Method_Object_name (Scheduler *new_scheduler, + ACE_Future<const char*> &new_result) + : scheduler_ (new_scheduler), + future_result_ (new_result) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) Method_Object_name created\n")); +}; + +Method_Object_name::~Method_Object_name (void) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) Method_Object_name will be deleted.\n")); +} + +int +Method_Object_name::call (void) +{ + return future_result_.set (scheduler_->name_i ()); +} + +class Method_Object_end : public ACE_Method_Object + // = TITLE + // Reification of the <end> method. +{ +public: + Method_Object_end (Scheduler *new_Scheduler): scheduler_ (new_Scheduler) {} + ~Method_Object_end (void) {} + virtual int call (void) { this->scheduler_->close (); return -1; } + +private: + Scheduler *scheduler_; +}; + +// constructor +Scheduler::Scheduler (const char *newname, Scheduler *new_Scheduler) +{ + ACE_NEW (this->name_, char[ACE_OS::strlen (newname) + 1]); + ACE_OS::strcpy ((char *) this->name_, newname); + this->scheduler_ = new_Scheduler; + ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s created\n", this->name_)); +} + +// Destructor +Scheduler::~Scheduler (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s will be destroyed\n", this->name_)); +} + +// open +int +Scheduler::open (void *) +{ + task_count++; + ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s open\n", this->name_)); + return this->activate (THR_BOUND); +} + +// close +int +Scheduler::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Scheduler %s close\n", this->name_)); + task_count--; + return 0; +} + +// put... ?? +int +Scheduler::put (ACE_Message_Block *, ACE_Time_Value *) +{ + return 0; +} + +// service.. +int +Scheduler::svc (void) +{ + for (;;) + { + // Dequeue the next method object (we use an auto pointer in + // case an exception is thrown in the <call>). + auto_ptr<ACE_Method_Object> mo (this->activation_queue_.dequeue ()); + + ACE_DEBUG ((LM_DEBUG, "(%t) calling method object\n")); + // Call it. + if (mo->call () == -1) + break; + // Destructor automatically deletes it. + } + + /* NOTREACHED */ + return 0; +} + +void +Scheduler::end (void) +{ + this->activation_queue_.enqueue (new Method_Object_end (this)); +} + + +// Here's where the Work takes place. +double +Scheduler::work_i (double param, + int count) +{ + double x = 0.0, y = 0.0; + + // @@ We should probably do something fun here, like compute the + // Fibonacci sequence or something. + + for (int j = 0; j < count; j++) + { + x = x + param; + y = y + double(::sin (x)); + } + + return y; +} + +const char * +Scheduler::name_i (void) +{ + char *the_name; + + the_name = new char[ACE_OS::strlen (this->name_) + 1]; + ACE_OS::strcpy (the_name, this->name_); + + return the_name; +} + +ACE_Future<const char *> +Scheduler::name (void) +{ + if (this->scheduler_) + // Delegate to the Scheduler. + return this->scheduler_->name (); + else + { + ACE_Future<const char*> new_future; + + // @@ What happens if new fails here? + this->activation_queue_.enqueue + (new Method_Object_name (this, new_future)); + + return new_future; + } +} + +ACE_Future<double> +Scheduler::work (double newparam, int newcount) +{ + if (this->scheduler_) { + return this->scheduler_->work (newparam, newcount); + } + else { + ACE_Future<double> new_future; + + this->activation_queue_.enqueue + (new Method_Object_work (this, newparam, newcount, new_future)); + return new_future; + } +} + +// @@ These values should be set by the command line options! + +// Total number of iterations to <work> +static int n_iterations = 50000; + +// Total number of loops. +static int n_loops = 100; + +int +main (int, char *[]) +{ + Scheduler *andres, *peter, *helmut, *matias; + + // Create active objects.. + // @@ Should "open" be subsumed within the constructor of + // Scheduler()? + andres = new Scheduler ("andres"); + andres->open (); + peter = new Scheduler ("peter"); + peter->open (); + helmut = new Scheduler ("helmut"); + helmut->open (); + + // Matias passes all asynchronous method calls on to Andres... + matias = new Scheduler ("matias", andres); + matias->open (); + + for (int i = 0; i < n_loops; i++) + { + { + ACE_Future<double> fresulta, fresultb, fresultc, fresultd, fresulte; + ACE_Future<const char*> fname; + + ACE_DEBUG ((LM_DEBUG, "(%t) going to do a non-blocking call\n")); + + fresulta = andres->work (0.01, 100 + (n_iterations * (i % 2))); + fresultb = peter->work (0.01, 100 + (n_iterations * (i % 2))); + fresultc = helmut->work (0.01, 100 + (n_iterations * (i % 2))); + fresultd = matias->work (0.02, 100 + (n_iterations * (i % 2))); + fname = andres->name (); + + // see if the result is available... + if (fresulta.ready ()) + ACE_DEBUG ((LM_DEBUG, "(%t) wow.. work is ready.....\n")); + + ACE_DEBUG ((LM_DEBUG, "(%t) non-blocking call done... now blocking...\n")); + + // Save the result of fresulta. + + fresulte = fresulta; + + if (i % 3 == 0) + { + // Every 3rd time... disconnect the futures... + // but "fresulte" should still contain the result... + fresulta.cancel (10.0); + fresultb.cancel (20.0); + fresultc.cancel (30.0); + fresultd.cancel (40.0); + } + + double resulta = 0, resultb = 0, resultc = 0, resultd = 0, resulte = 0; + + fresulta.get (resulta); + fresultb.get (resultb); + fresultc.get (resultc); + fresultd.get (resultd); + fresulte.get (resulte); + + ACE_DEBUG ((LM_DEBUG, "(%t) result a %f\n", resulte)); + ACE_DEBUG ((LM_DEBUG, "(%t) result b %f\n", resulta)); + ACE_DEBUG ((LM_DEBUG, "(%t) result c %f\n", resultb)); + ACE_DEBUG ((LM_DEBUG, "(%t) result d %f\n", resultc)); + ACE_DEBUG ((LM_DEBUG, "(%t) result e %f\n", resultd)); + + const char *name; + + fname.get (name); + + ACE_DEBUG ((LM_DEBUG, "(%t) name %s\n", name)); + + } + + ACE_DEBUG ((LM_DEBUG, + "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n", + (u_long) task_count, + (u_long) future_count, + (u_long) capsule_count, + (u_long) methodobject_count)); + } + + // Close things down. + andres->end (); + peter->end (); + helmut->end (); + matias->end (); + + ACE_OS::sleep (2); + + ACE_DEBUG ((LM_DEBUG, + "(%t) task_count %d future_count %d capsule_count %d methodobject_count %d\n", + (u_long) task_count, + (u_long) future_count, + (u_long) capsule_count, + (u_long) methodobject_count)); + + ACE_DEBUG ((LM_DEBUG,"(%t) th' that's all folks!\n")); + + ACE_OS::sleep (5); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/future2.cpp b/examples/Threads/future2.cpp new file mode 100644 index 00000000000..55ce8c05a40 --- /dev/null +++ b/examples/Threads/future2.cpp @@ -0,0 +1,524 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Test_Future.cpp +// +// = DESCRIPTION +// This example tests the ACE Future. +// +// = AUTHOR +// Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt +// <schmidt@cs.wustl.edu> +// +// Modification History +// Aug. 96; A.Kruse; dev. +// Aug. 96; D.Schmidt; complete workover +// 08/27/96; A.Kruse; - the friends of Scheduler are "Method_Object_name" +// and "Method_Object_work". +// - make the methods "work_i" and "name_i" private +// 09/2/96; D.Schmidt; Integrate with new ACE_Future API and rearrange +// the tests so they are more modular. +// ============================================================================ + +#include <math.h> +#include "ace/Task.h" + +#include "ace/Synch.h" +#include "ace/Message_Queue.h" +#include "ace/Future.h" +#include "ace/Method_Object.h" +#include "ace/Activation_Queue.h" +#include "ace/Auto_Ptr.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Atomic_Op<ACE_Thread_Mutex, u_long> ATOMIC_INT; + +// a counter for the tasks.. +static ATOMIC_INT scheduler_open_count (0); + +// forward declarations +class Method_Object_work; +class Method_Object_name; + +class Scheduler : public ACE_Task<ACE_MT_SYNCH> + // = TITLE + // Active Object Scheduler. +{ + // Every method object has to be able to access the private methods. + + friend class Method_Object_work; + friend class Method_Object_name; + friend class Method_Object_end; +public: + + Scheduler (const char *, Scheduler * = 0); + ~Scheduler (void); + + virtual int open (void *args = 0); + // The method that is used to start the active object. + + // = Here are the methods exported by the class. They return an + // <ACE_Future>. + ACE_Future<double> work (double param, int count); + ACE_Future<char*> name (void); + void end (void); + +private: + virtual int close (u_long flags = 0); + // Should not be accessible from outside... (use end () instead). + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) + { return 0; }; + // Doesn't have any use for this example. + + virtual int svc (void); + // Here the actual servicing of all requests is happening.. + + // = Implementation methods. + double work_i (double, int); + char *name_i (void); + + char *name_; + ACE_Activation_Queue activation_queue_; + Scheduler *scheduler_; +}; + +class Method_Object_work : public ACE_Method_Object + // = TITLE + // Reification of the <work> method. +{ +public: + Method_Object_work (Scheduler *, double, int, ACE_Future<double> &); + ~Method_Object_work (void); + virtual int call (void); + +private: + Scheduler *scheduler_; + double param_; + int count_; + ACE_Future<double> future_result_; +}; + +Method_Object_work::Method_Object_work (Scheduler* new_Scheduler, + double new_param, + int new_count, + ACE_Future<double> &new_result) + : scheduler_ (new_Scheduler), + param_ (new_param), + count_ (new_count), + future_result_ (new_result) +{ +} + +Method_Object_work::~Method_Object_work (void) +{ +} + +int +Method_Object_work::call (void) +{ + return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_)); +} + +class Method_Object_name : public ACE_Method_Object + // = TITLE + // Reification of the <name> method. +{ +public: + Method_Object_name (Scheduler *, ACE_Future<char*> &); + ~Method_Object_name (void); + virtual int call (void); + +private: + Scheduler *scheduler_; + ACE_Future<char*> future_result_; +}; + + +Method_Object_name::Method_Object_name (Scheduler *new_scheduler, + ACE_Future<char*> &new_result) + : scheduler_ (new_scheduler), + future_result_ (new_result) +{ + ACE_DEBUG ((LM_DEBUG, + " (%t) Method_Object_name created\n")); +}; + +Method_Object_name::~Method_Object_name (void) +{ + ACE_DEBUG ((LM_DEBUG, + " (%t) Method_Object_name will be deleted.\n")); +} + +int +Method_Object_name::call (void) +{ + return future_result_.set (scheduler_->name_i ()); +} + +class Method_Object_end : public ACE_Method_Object + // = TITLE + // Reification of the <end> method. +{ +public: + Method_Object_end (Scheduler *new_Scheduler): scheduler_ (new_Scheduler) {} + ~Method_Object_end (void) {} + virtual int call (void) { this->scheduler_->close (); return -1; } + +private: + Scheduler *scheduler_; +}; + +// constructor +Scheduler::Scheduler (const char *newname, Scheduler *new_Scheduler) +{ + ACE_NEW (this->name_, char[ACE_OS::strlen (newname) + 1]); + ACE_OS::strcpy ((char *) this->name_, newname); + this->scheduler_ = new_Scheduler; + ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s created\n", this->name_)); +} + +// Destructor +Scheduler::~Scheduler (void) +{ + ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s will be destroyed\n", this->name_)); +} + +int +Scheduler::open (void *) +{ + scheduler_open_count++; + ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s open\n", this->name_)); + return this->activate (THR_BOUND); +} + +int +Scheduler::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s close\n", this->name_)); + scheduler_open_count--; + return 0; +} + +int +Scheduler::svc (void) +{ + // Main event loop for this active object. + for (;;) + { + // Dequeue the next method object (we use an auto pointer in + // case an exception is thrown in the <call>). + auto_ptr<ACE_Method_Object> mo (this->activation_queue_.dequeue ()); + + ACE_DEBUG ((LM_DEBUG, " (%t) calling method object\n")); + // Call it. + if (mo->call () == -1) + break; + // Smart pointer destructor automatically deletes mo. + } + + /* NOTREACHED */ + return 0; +} + +void +Scheduler::end (void) +{ + this->activation_queue_.enqueue (new Method_Object_end (this)); +} + +// Here's where the Work takes place. +double +Scheduler::work_i (double param, + int count) +{ + double x = 0, y = 0; + + for (int j = 0; j < count; j++) + { + x = x + param; + y = y + ::sin (x); + } + + return y; +} + +char * +Scheduler::name_i (void) +{ + char *the_name; + + the_name = new char[ACE_OS::strlen (this->name_) + 1]; + ACE_OS::strcpy (the_name, this->name_); + + return the_name; +} + +ACE_Future<char *> +Scheduler::name (void) +{ + if (this->scheduler_) + // Delegate to the other scheduler + return this->scheduler_->name (); + else + { + ACE_Future<char*> new_future; + + if (this->thr_count () == 0) + { + // This scheduler is inactive... so we execute the user + // request right away... + + auto_ptr<ACE_Method_Object> mo (new Method_Object_name (this, new_future)); + + mo->call (); + // Smart pointer destructor automatically deletes mo. + } + else + // @@ What happens if new fails here? + this->activation_queue_.enqueue + (new Method_Object_name (this, new_future)); + + return new_future; + } +} + +ACE_Future<double> +Scheduler::work (double newparam, int newcount) +{ + if (this->scheduler_) + return this->scheduler_->work (newparam, newcount); + else + { + ACE_Future<double> new_future; + + if (this->thr_count () == 0) + { + auto_ptr<ACE_Method_Object> mo + (new Method_Object_work (this, newparam, newcount, new_future)); + mo->call (); + // Smart pointer destructor automatically deletes it. + } + else + this->activation_queue_.enqueue + (new Method_Object_work (this, newparam, newcount, new_future)); + + return new_future; + } +} + +static int +determine_iterations (void) +{ + int n_iterations; + + ACE_DEBUG ((LM_DEBUG," (%t) determining the number of iterations...\n")); + Scheduler *worker_a = new Scheduler ("worker A"); + + ACE_Time_Value tstart (ACE_OS::gettimeofday ()); + ACE_Time_Value tend (ACE_OS::gettimeofday ()); + + // Determine the number of iterations... we want so many that the + // work () takes about 1 second... + + for (n_iterations = 1; + (tend.sec () - tstart.sec ()) < 1; + n_iterations *= 2) + { + tstart = ACE_OS::gettimeofday (); + + worker_a->work (0.1, n_iterations); + + tend = ACE_OS::gettimeofday (); + } + + ACE_DEBUG ((LM_DEBUG," (%t) n_iterations %d\n", + (u_long) n_iterations)); + + worker_a->end (); + // @@ Can we safely delete worker_a here? + return n_iterations; +} + +static void +test_active_object (int n_iterations) +{ + ACE_DEBUG ((LM_DEBUG," (%t) testing active object pattern...\n")); + // A simple example for the use of the active object pattern and + // futures to return values from an active object. + + Scheduler *worker_a = new Scheduler ("worker A"); + Scheduler *worker_b = new Scheduler ("worker B"); + + // Have worker_c delegate his work to worker_a. + Scheduler *worker_c = new Scheduler ("worker C", worker_a); + + // loop 0: + // test the Schedulers when they are not active. + // now the method objects will be created but since + // there is no active thread they will also be + // immediately executed, in the "main" thread. + // loop 1: + // do the same test but with the schedulers + // activated + for (int i = 0; i < 2; i++) + { + if (i == 1) + { + worker_a->open (); + worker_b->open (); + worker_c->open (); + } + + ACE_Future<double> fresulta = worker_a->work (0.01, n_iterations); + ACE_Future<double> fresultb = worker_b->work (0.02, n_iterations); + ACE_Future<double> fresultc = worker_c->work (0.03, n_iterations); + + if (i == 0) + { + if (!fresulta.ready ()) + ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker A is should be ready!!!\n")); + if (!fresultb.ready ()) + ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker B is should be ready!!!\n")); + if (!fresultc.ready ()) + ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker C is should be ready!!!\n")); + } + + // When the workers are active we will block here until the + // results are available. + + double resulta = fresulta; + double resultb = fresultb; + double resultc = fresultc; + + ACE_Future<char *> fnamea = worker_a->name (); + ACE_Future<char *> fnameb = worker_b->name (); + ACE_Future<char *> fnamec = worker_c->name (); + + char *namea = fnamea; + char *nameb = fnameb; + char *namec = fnamec; + + ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n", + namea, resulta)); + ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n", + nameb, resultb)); + ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %f\n", + namec, resultc)); + } + + ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d before end ()\n", + (u_long) scheduler_open_count)); + + worker_a->end (); + worker_b->end (); + worker_c->end (); + + ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d immediately after end ()\n", + (u_long) scheduler_open_count)); + + ACE_OS::sleep (2); + + ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d after waiting\n", + (u_long) scheduler_open_count)); + // @@ Can we safely delete worker_a, worker_b, and worker_c? +} + +static void +test_cancellation (int n_iterations) +{ + ACE_DEBUG ((LM_DEBUG," (%t) testing cancellation of a future...\n")); + + // Now test the cancelling a future. + + Scheduler *worker_a = new Scheduler ("worker A"); + worker_a->open (); + + ACE_Future<double> fresulta = worker_a->work (0.01, n_iterations); + + // save the result by copying the future + ACE_Future<double> fresultb = fresulta; + + // now we cancel the first future.. but the + // calculation will still go on... + fresulta.cancel (10.0); + + if (!fresulta.ready ()) + ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should be ready!!!\n")); + + double resulta = fresulta; + + ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result %f\n", resulta)); + + if (resulta != 10.0) + ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result should be 10.0!!\n", resulta)); + + resulta = fresultb; + + ACE_DEBUG ((LM_DEBUG, " (%t) true result %f\n", resulta)); + + worker_a->end (); + // @@ Can we safely delete worker_a here? +} + +static void +test_timeout (int n_iterations) +{ + ACE_DEBUG ((LM_DEBUG," (%t) testing timeout on waiting for the result...\n")); + Scheduler *worker_a = new Scheduler ("worker A"); + worker_a->open (); + + ACE_Future<double> fresulta = worker_a->work (0.01, 2 * n_iterations); + + // Should immediately return... and we should see an error... + ACE_Time_Value *delay = new ACE_Time_Value (1); + + double resulta; + fresulta.get (resulta, delay); + + if (fresulta.ready ()) + ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should not be ready!!!\n")); + else + ACE_DEBUG ((LM_DEBUG," (%t) timed out on future A\n")); + + // now we wait until we are done... + fresulta.get (resulta); + ACE_DEBUG ((LM_DEBUG, " (%t) result %f\n", resulta)); + + worker_a->end (); + // @@ Can we safely delete worker_a here? +} + +int +main (int, char *[]) +{ + int n_iterations = determine_iterations (); + + test_active_object (n_iterations); + test_cancellation (n_iterations); + test_timeout (n_iterations); + + ACE_DEBUG ((LM_DEBUG," (%t) that's all folks!\n")); + + ACE_OS::sleep (5); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/manual_event.cpp b/examples/Threads/manual_event.cpp new file mode 100644 index 00000000000..26d477fabc0 --- /dev/null +++ b/examples/Threads/manual_event.cpp @@ -0,0 +1,108 @@ +// The test shows the use of an ACE_Manual_Event to create a +// $Id$ + +// Pseudo_Barrier. Multiple threads are created which do the +// following: +// +// 1. work +// 2. synch with other threads +// 3. more work +// +// ACE_Manual_Event is use to synch with other +// threads. ACE_Manual_Event::signal() is used for broadcasting. + +#include "ace/Service_Config.h" +#include "ace/Synch.h" +#include "ace/Thread_Manager.h" + +#if defined (ACE_HAS_THREADS) +static ACE_Atomic_Op <ACE_Thread_Mutex, u_long> amount_of_work = (u_long) 0; + +class Pseudo_Barrier + // = TITLE + // A barrier class using ACE manual-reset events. + // + // = DESCRIPTION + // This is *not* a real barrier. + // Pseudo_Barrier is more like a ``one shot'' barrier. + // All waiters after the Nth waiter are allowed to go. + // The barrier does not reset after the Nth waiter. + // For an example of a real barrier, please see class ACE_Barrier. +{ +public: + Pseudo_Barrier (u_long count); + + int wait (void); + +private: + ACE_Atomic_Op <ACE_Thread_Mutex, u_long> counter_; + ACE_Manual_Event event_; +}; + +Pseudo_Barrier::Pseudo_Barrier (u_long count) + : counter_ (count) +{ +} + +int +Pseudo_Barrier::wait (void) +{ + if (--this->counter_ == 0) + return this->event_.signal (); + else + return this->event_.wait (); +} + +static void * +worker (void *arg) +{ + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + Pseudo_Barrier &barrier = *(Pseudo_Barrier *) arg; + + // work + ACE_DEBUG ((LM_DEBUG, "(%t) working (%d secs)\n", ++::amount_of_work)); + ACE_OS::sleep (::amount_of_work); + + // synch with everybody else + ACE_DEBUG ((LM_DEBUG, "(%t) waiting to synch with others \n")); + barrier.wait (); + + // more work + ACE_DEBUG ((LM_DEBUG, "(%t) more work (%d secs)\n", ++::amount_of_work)); + ACE_OS::sleep (amount_of_work); + + ACE_DEBUG ((LM_DEBUG, "(%t) dying \n")); + + return 0; +} + +int +main (int argc, char **argv) +{ + int n_threads = argc == 2 ? atoi (argv[1]) : 5; + + ACE_Thread_Manager &tm = *ACE_Service_Config::thr_mgr (); + + // synch object shared by all threads + Pseudo_Barrier barrier (n_threads); + + // create workers + if (tm.spawn_n (n_threads, worker, &barrier) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "thread creates for worker failed"), -1); + + // wait for all workers to exit + if (tm.wait () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "thread wait failed"), -1); + else + ACE_DEBUG ((LM_ERROR, "graceful exit\n")); + + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/process_mutex.cpp b/examples/Threads/process_mutex.cpp new file mode 100644 index 00000000000..fb23c8b73be --- /dev/null +++ b/examples/Threads/process_mutex.cpp @@ -0,0 +1,68 @@ +// $Id$ + +// This program tests ACE_Process_Mutexes. To run it, open 3 or 4 +// windows and run this program in each window... + +#include "ace/Synch.h" +#include "ace/Signal.h" + +#if defined (ACE_HAS_THREADS) + +static sig_atomic_t done; + +extern "C" void +handler (int) +{ + done = 1; +} + +int +main (int argc, char *argv[]) +{ + char *name = argc > 1 ? argv[1] : "hello"; + int iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 100; + + ACE_Process_Mutex pm (name); + + // Register a signal handler. + ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT); + + for (int i = 0; i < iterations && !done; i++) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquiring\n")); + if (pm.acquire () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "acquire failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquired\n")); + + ACE_OS::sleep (3); + + if (pm.release () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n")); + + if (pm.tryacquire () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "tryacquire failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = tryacquire\n")); + + if (pm.release () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n")); + } + + if (argc > 2) + pm.remove (); + return 0; +} +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "ACE doesn't support support threads on this platform (yet)\n"), + -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/process_semaphore.cpp b/examples/Threads/process_semaphore.cpp new file mode 100644 index 00000000000..d7933897f8e --- /dev/null +++ b/examples/Threads/process_semaphore.cpp @@ -0,0 +1,56 @@ +// $Id$ + +// This program tests ACE_Process_Semaphore. To run it, open 3 or 4 +// windows and run this program in each window... + +#include "ace/Synch.h" +#include "ace/Signal.h" + +static sig_atomic_t done; + +extern "C" void +handler (int) +{ + done = 1; +} + +int +main (int argc, char *argv[]) +{ + char *name = argc == 1 ? "hello" : argv[1]; + + ACE_Process_Semaphore pm (1, name); + + ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT); + + for (int i = 0; i < 100 && !done; i++) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquiring\n")); + if (pm.acquire () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "acquire failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = acquired\n")); + + ACE_OS::sleep (3); + + if (pm.release () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n")); + + if (pm.tryacquire () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "tryacquire failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = tryacquire\n")); + + if (pm.release () == -1) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = %p\n", "release failed")); + else + ACE_DEBUG ((LM_DEBUG, "(%P|%t) = released\n")); + } + + if (argc > 2) + pm.remove (); + return 0; +} + diff --git a/examples/Threads/reader_writer.cpp b/examples/Threads/reader_writer.cpp new file mode 100644 index 00000000000..32ef262c67e --- /dev/null +++ b/examples/Threads/reader_writer.cpp @@ -0,0 +1,187 @@ +// This test program verifies the functionality of the ACE_OS +// $Id$ + +// implementation of readers/writer locks on Win32 and Posix pthreads. + + +#include "ace/Synch.h" +#include "ace/Thread.h" +#include "ace/Thread_Manager.h" +#include "ace/Get_Opt.h" + +#if defined (ACE_HAS_THREADS) + +// Default number of iterations. +static int n_iterations = 1000; + +// Default number of loops. +static int n_loops = 100; + +// Default number of readers. +static int n_readers = 6; + +// Default number of writers. +static int n_writers = 2; + +// Thread id of last writer. +volatile static int shared_data; + +// Lock for shared_data. +static ACE_RW_Mutex rw_mutex; + +// Count of the number of readers and writers. +ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers, current_writers; + +// Thread manager +static ACE_Thread_Manager thr_mgr; + +// Explain usage and exit. +static void +print_usage_and_die (void) +{ + ACE_DEBUG ((LM_DEBUG, + "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n")); + ACE_OS::exit (1); +} + +// Parse the command-line arguments and set options. +static void +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "r:w:n:l:"); + + int c; + + while ((c = get_opt ()) != -1) + switch (c) + { + case 'r': + n_readers = ACE_OS::atoi (get_opt.optarg); + break; + case 'w': + n_writers = ACE_OS::atoi (get_opt.optarg); + break; + case 'n': + n_iterations = ACE_OS::atoi (get_opt.optarg); + break; + case 'l': + n_loops = ACE_OS::atoi (get_opt.optarg); + break; + default: + print_usage_and_die (); + break; + } +} + +// Iterate <n_iterations> each time checking that nobody modifies the data +// while we have a read lock. + +static void * +reader (void *) +{ + ACE_Thread_Control tc (&thr_mgr); + ACE_DEBUG ((LM_DEBUG, "(%t) reader starting\n")); + + for (int iterations = 1; iterations <= n_iterations; iterations++) + { + ACE_Read_Guard<ACE_RW_Mutex> g(rw_mutex); + int n = ++current_readers; + //ACE_DEBUG ((LM_DEBUG, "(%t) I'm reader number %d\n", n)); + + if (current_writers > 0) + ACE_DEBUG ((LM_DEBUG, "(%t) writers found!!!\n")); + + int data = shared_data; + + for (int loop = 1; loop <= n_loops; loop++) + { + ACE_Thread::yield(); + if (shared_data != data) + ACE_DEBUG ((LM_DEBUG, + "(%t) somebody changed %d to %d\n", + data, shared_data)); + } + + --current_readers; + //ACE_DEBUG ((LM_DEBUG, "(%t) done with reading guarded data\n")); + + ACE_Thread::yield (); + } + return 0; +} + +// Iterate <n_iterations> each time modifying the global data +// and checking that nobody steps on it while we can write it. + +static void * +writer (void *) +{ + ACE_Thread_Control tc (&thr_mgr); + ACE_DEBUG ((LM_DEBUG, "(%t) writer starting\n")); + + for (int iterations = 1; iterations <= n_iterations; iterations++) + { + ACE_Write_Guard<ACE_RW_Mutex> g(rw_mutex); + + ++current_writers; + //ACE_DEBUG ((LM_DEBUG, "(%t) writing to guarded data\n")); + + if (current_writers > 1) + ACE_DEBUG ((LM_DEBUG, "(%t) other writers found!!!\n")); + + if (current_readers > 0) + ACE_DEBUG ((LM_DEBUG, "(%t) readers found!!!\n")); + + int self = (int) ACE_Thread::self (); + shared_data = self; + + for (int loop = 1; loop <= n_loops; loop++) + { + ACE_Thread::yield(); + if (shared_data != self) + ACE_DEBUG ((LM_DEBUG, "(%t) somebody wrote on my data %d\n", shared_data)); + } + + --current_writers; + + //ACE_DEBUG ((LM_DEBUG, "(%t) done with guarded data\n")); + ACE_Thread::yield (); + } + return 0; +} + +// Spawn off threads. + +int main (int argc, char *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + parse_args (argc, argv); + + current_readers = 0; // Possibly already done + current_writers = 0; // Possibly already done + + ACE_DEBUG ((LM_DEBUG, "(%t) main thread starting\n")); + + if (thr_mgr.spawn_n (n_readers, reader, 0, THR_NEW_LWP) == -1 || + thr_mgr.spawn_n (n_writers, writer, 0, THR_NEW_LWP) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1); + + thr_mgr.wait (); + + ACE_DEBUG ((LM_DEBUG, "(%t) exiting main thread\n")); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Atomic_Op<ACE_Thread_Mutex, int>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ + diff --git a/examples/Threads/recursive_mutex.cpp b/examples/Threads/recursive_mutex.cpp new file mode 100644 index 00000000000..1cc2892b2a8 --- /dev/null +++ b/examples/Threads/recursive_mutex.cpp @@ -0,0 +1,108 @@ +// $Id$ + +// This test program verifies the functionality of the ACE_OS +// implementation of recursive mutexes on Win32 and Posix pthreads. + +#include "ace/Service_Config.h" +#include "ace/Get_Opt.h" +#include "ace/Synch.h" + +#if defined (ACE_HAS_THREADS) + +// Total number of iterations. +static size_t n_iterations = 1000; +static size_t n_threads = 4; + +// Explain usage and exit. +static void +print_usage_and_die (void) +{ + ACE_DEBUG ((LM_DEBUG, + "usage: %n [-t n_threads] [-n iteration_count]\n")); + ACE_OS::exit (1); +} + +// Parse the command-line arguments and set options. + +static void +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "n:t:"); + + int c; + + while ((c = get_opt ()) != -1) + switch (c) + { + case 'n': + n_iterations = ACE_OS::atoi (get_opt.optarg); + break; + case 't': + n_threads = ACE_OS::atoi (get_opt.optarg); + break; + default: + print_usage_and_die (); + break; + } +} + +static void +recursive_worker (size_t nesting_level, + ACE_Recursive_Thread_Mutex *rm) +{ + if (nesting_level < n_iterations) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) = trying to acquire, nesting = %d, thread id = %u\n", + rm->get_nesting_level (), rm->get_thread_id ())); + { + // This illustrates the use of the ACE_Guard<LOCK> with an + // ACE_Recursive_Thread_Mutex. + ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, *rm); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) = acquired, nesting = %d, thread id = %u\n", + rm->get_nesting_level (), rm->get_thread_id ())); + + recursive_worker (nesting_level + 1, rm); + } + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) = released, nesting = %d, thread id = %u\n", + rm->get_nesting_level (), rm->get_thread_id ())); + } +} + +static void * +worker (void *arg) +{ + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + + ACE_Recursive_Thread_Mutex *rm = (ACE_Recursive_Thread_Mutex *) arg; + + recursive_worker (0, rm); + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + parse_args (argc, argv); + ACE_Recursive_Thread_Mutex rm; + + ACE_Service_Config::thr_mgr ()->spawn_n (n_threads, + ACE_THR_FUNC (worker), + (void *) &rm); + + ACE_Service_Config::thr_mgr ()->wait (); + return 0; +} +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "ACE doesn't support support process mutexes on this platform (yet)\n"), + -1); +} +#endif /* ACE_WIN32 */ diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp new file mode 100644 index 00000000000..64209cb3430 --- /dev/null +++ b/examples/Threads/task_four.cpp @@ -0,0 +1,248 @@ +// $Id$ + +// The following test was written by Hamutal Yanay & Ari Erev's +// (Ari_Erev@comverse.com). +// +// This test program test enhancements to the thread_manager and task +// classes. The purpose of these enhancements was to allow the +// thread_manager to recognize the concept of an ACE_Task and to be +// able to group ACE_Tasks in groups. +// +// There are two main ACE_Tasks in this sample: +// +// Invoker_Task - is run from main (). It's purpose is to run a number of +// ACE_Tasks of type Worker_Task. The number can be specified +// on the command line. +// After starting the tasks, the Invoker_Task groups all the tasks +// in one group and then uses the +// num_tasks_in_group () to find out if the real number of tasks +// that are now running (should be the same as the number of tasks +// started). +// It also, suspends and resumes all the threads in the group to +// test the suspend_grp () and resume_grp () methods. +// Then it waits for all the tasks to end. +// Worker_Task - ACE_Tasks that are started by the Invoker_Task. +// Each Worker_Task can start a number of threads. +// The Worker_Task threads perform some work (iteration). The number +// of the iterations can be specified on the command line. +// +// The command line syntax is: +// +// test_task [num_tasks] [num_threads] [num_iterations] + +#include "ace/Task.h" +#include "ace/Service_Config.h" + + +#if defined (ACE_HAS_THREADS) + +#include "ace/Task.h" + +class Invoker_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Invoker_Task (ACE_Thread_Manager *thr_mgr, + int n_tasks, + int n_threads, + int n_iterations); + virtual int svc (void); + // creats <n_tasks> and wait for them to finish + +private: + int n_tasks_; + // Number of tasks to start. + int n_threads_; + // Number of threads per task. + int n_iterations_; + // Number of iterations per thread. + + // = Not needed for this test. + virtual int open (void *) { return 0; } + virtual int close (u_long) { return 0; } + virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } +}; + +class Worker_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Worker_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int n_iterations); + virtual int svc (void); + // Does a small work... + virtual int open (void * = NULL); +private: + static int workers_count_; + int index_; + int n_threads_; + int n_iterations_; + + // = Not needed for this test. + virtual int close (u_long) { return 0; } + virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } +}; + +int Worker_Task::workers_count_ = 1; + +Worker_Task::Worker_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int n_iterations) + : n_threads_ (n_threads), + n_iterations_ (n_iterations), + ACE_Task<ACE_MT_SYNCH> (thr_mgr) +{ + index_ = workers_count_++; +} + +int +Worker_Task::open (void *) +{ + // Create worker threads. + int rc = this->activate (THR_NEW_LWP, n_threads_, 0, 0, -1, this); + + if (rc == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); + + return rc; +} + +int +Worker_Task::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, " (%t) in worker %d\n", index_)); + + for (int iterations = 1; + iterations <= this->n_iterations_; + iterations++) + { + ACE_DEBUG ((LM_DEBUG, " (%t) in iteration %d\n", iterations)); + ACE_OS::sleep (0); + } + + ACE_DEBUG ((LM_DEBUG, " (%t) worker %d ends\n", index_)); + + return 0; +} + +Invoker_Task::Invoker_Task (ACE_Thread_Manager *thr_mgr, + int n_tasks, + int n_threads, + int n_iterations) + : n_tasks_ (n_tasks), + n_threads_ (n_threads), + n_iterations_ (n_iterations), + ACE_Task<ACE_MT_SYNCH> (thr_mgr) +{ + // Create worker threads. + if (this->activate (THR_NEW_LWP, 1, 0, 0, -1, this) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); +} + +// Iterate <n_iterations> time printing off a message and "waiting" +// for all other threads to complete this iteration. + +int +Invoker_Task::svc (void) +{ + // Note that the ACE_Task::svc_run () method automatically adds us to + // the Thread_Manager when the thread begins. + + ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr (); + Worker_Task **pTask = new Worker_Task* [n_tasks_]; + + for (int task = 0; + task < this->n_tasks_; + task++) + { + ACE_DEBUG ((LM_DEBUG, " (%t) in task %d\n", task+1)); + pTask[task] = new Worker_Task (thr_mgr, n_threads_, n_iterations_); + pTask[task]->open (); + } + + // Set all tasks to be one group + ACE_DEBUG ((LM_DEBUG, " (%t) setting tasks group id\n")); + for (task = 0; + task < this->n_tasks_; + task++) + if (thr_mgr->set_grp (pTask[task], 1) == -1) + ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "set_grp")); + + int nTasks = thr_mgr->num_tasks_in_group (1); + cout << "Number of tasks in group 1: " << nTasks << endl; + + // Wait for 1 second and then suspend every thread in the group. + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, " (%t) suspending group\n")); + if (thr_mgr->suspend_grp (1) == -1) + ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_grp")); + + // Wait for 5 more second and then resume every thread in the + // group. + ACE_OS::sleep (ACE_Time_Value (5)); + + // @QTSK This ACE_DEBUG statement blows us away! can't understand why + ACE_DEBUG ((LM_DEBUG, " (%t) resuming group\n")); + if (thr_mgr->resume_grp (1) == -1) + ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_grp")); + + + // Wait for all the tasks to reach their exit point. + thr_mgr->wait (); + + // Note that the ACE_Task::svc_run () method automatically removes us + // from the Thread_Manager when the thread exits. + + return 0; +} + +// Default number of tasks and iterations. +static const int DEFAULT_TASKS = 4; +static const int DEFAULT_ITERATIONS = 5; + +int +main (int argc, char *argv[]) +{ + int n_tasks = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_TASKS; + int n_threads = argc > 2 ? ACE_OS::atoi (argv[2]) : ACE_DEFAULT_THREADS; + int n_iterations = argc > 3 ? ACE_OS::atoi (argv[3]) : DEFAULT_ITERATIONS; + + // Since ACE_Thread_Manager can only wait for all threads, we'll have + // special manager for the Invoker_Task. + ACE_Thread_Manager invoker_manager; + + Invoker_Task invoker (&invoker_manager, + n_tasks, + n_threads, + n_iterations); + + // Wait for 1 second and then suspend the invoker task + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, " (%t) suspending invoker task\n")); + + if (invoker_manager.suspend_task (&invoker) == -1) + ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "suspend_task")); + + // Wait for 5 more second and then resume the invoker task. + ACE_OS::sleep (ACE_Time_Value (5)); + + // @QTSK This ACE_DEBUG statement blows us away! can't understand why + ACE_DEBUG ((LM_DEBUG, " (%t) resuming invoker task\n")); + if (invoker_manager.resume_task (&invoker) == -1) + ACE_ERROR ((LM_DEBUG, " (%t) %p\n", "resume_task")); + + + // Wait for all the threads to reach their exit point. + invoker_manager.wait (); + + // @QTSK This ACE_DEBUG statement blows us away! can't understand why + ACE_DEBUG ((LM_DEBUG, " (%t) done\n")); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/task_one.cpp b/examples/Threads/task_one.cpp new file mode 100644 index 00000000000..d0a8a12e6c4 --- /dev/null +++ b/examples/Threads/task_one.cpp @@ -0,0 +1,104 @@ +// This test program illustrates how the ACE barrier synchronization +// $Id$ + +// mechanisms work in conjunction with the ACE_Task and the +// ACE_Thread_Manager. It is instructive to compare this with the +// test_barrier.cpp test to see how they differ. + +#include "ace/Task.h" +#include "ace/Service_Config.h" + + +#if defined (ACE_HAS_THREADS) + +#include "ace/Task.h" + +class Barrier_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Barrier_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int n_iterations); + + virtual int svc (void); + // Iterate <n_iterations> time printing off a message and "waiting" + // for all other threads to complete this iteration. + +private: + ACE_Barrier barrier_; + // Reference to the tester barrier. This controls each + // iteration of the tester function running in every thread. + + int n_iterations_; + // Number of iterations to run. + + // = Not needed for this test. + virtual int open (void *) { return 0; } + virtual int close (u_long) { return 0; } + virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } +}; + +Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr, + int n_threads, + int n_iterations) + : ACE_Task<ACE_MT_SYNCH> (thr_mgr), + barrier_ (n_threads), + n_iterations_ (n_iterations) +{ + // Create worker threads. + if (this->activate (THR_NEW_LWP, n_threads) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); +} + +// Iterate <n_iterations> time printing off a message and "waiting" +// for all other threads to complete this iteration. + +int +Barrier_Task::svc (void) +{ + // Note that the ACE_Task::svc_run() method automatically adds us to + // the Thread_Manager when the thread begins. + + for (int iterations = 1; + iterations <= this->n_iterations_; + iterations++) + { + ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d\n", iterations)); + + // Block until all other threads have waited, then continue. + this->barrier_.wait (); + } + + // Note that the ACE_Task::svc_run() method automatically removes us + // from the Thread_Manager when the thread exits. + + return 0; +} + +// Default number of threads to spawn. +static const int DEFAULT_ITERATIONS = 5; + +int +main (int argc, char *argv[]) +{ + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; + int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS; + + Barrier_Task barrier_task (ACE_Service_Config::thr_mgr (), + n_threads, + n_iterations); + + // Wait for all the threads to reach their exit point. + ACE_Service_Config::thr_mgr ()->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%t) done\n")); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp new file mode 100644 index 00000000000..0214ac10ddf --- /dev/null +++ b/examples/Threads/task_three.cpp @@ -0,0 +1,230 @@ +// $Id$ + +// Exercise more tests for the ACE Tasks. This also shows off some +// Interesting uses of the ACE Log_Msg's ability to print to ostreams. +// BTW, make sure that you set the out_stream in *every* thread that +// you want to have write to the output file, i.e.: +// +// +// if (out_stream) +// { +// ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM); +// ACE_LOG_MSG->msg_ostream (out_stream); +// } + +#include <fstream.h> +#include "ace/Reactor.h" +#include "ace/Service_Config.h" +#include "ace/Task.h" + + +#if defined (ACE_HAS_THREADS) + +static ofstream *out_stream = 0; + +static const int NUM_INVOCATIONS = 100; +static const int TASK_COUNT = 130; + +class Test_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Test_Task (void); + ~Test_Task (void); + + virtual int open (void *args = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + virtual int svc (void); + + virtual int handle_input (ACE_HANDLE fd); + + ACE_Reactor *r_; + int handled_; + static int current_count_; + static int done_cnt_; +}; + +int Test_Task::current_count_ = 0; +int Test_Task::done_cnt_ = 0; + +static ACE_Thread_Mutex lock_; + +Test_Task::Test_Task (void) +{ + ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_); + + this->handled_ = 0; + Test_Task::current_count_++; + ACE_DEBUG ((LM_DEBUG, + "Test_Task constructed, current_count_ = %d\n", + Test_Task::current_count_)); +} + +Test_Task::~Test_Task (void) +{ + ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_); + + ACE_DEBUG ((LM_DEBUG, "Test_Task destroyed, current_count_ = %d\n", + Test_Task::current_count_)); +} + +int +Test_Task::open (void *args) +{ + r_ = (ACE_Reactor *) args; + return ACE_Task<ACE_MT_SYNCH>::activate (THR_NEW_LWP); +} + +int +Test_Task::close (u_long) +{ + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1); + + Test_Task::current_count_--; + ACE_DEBUG ((LM_DEBUG, "Test_Task::close () current_count_ = %d.\n", + Test_Task::current_count_)); + return 0; +} + +int +Test_Task::put (ACE_Message_Block *, ACE_Time_Value *) +{ + return 0; +} + +Test_Task::svc (void) +{ + // Every thread must register the same stream to write to file. + if (out_stream) + { + ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM); + ACE_LOG_MSG->msg_ostream (out_stream); + } + + for (int index = 0; index < NUM_INVOCATIONS; index++) + { + ACE_OS::thr_yield (); + + if (r_->notify (this, ACE_Event_Handler::READ_MASK)) + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1); + + ACE_DEBUG ((LM_DEBUG, "Test_Task: error notifying reactor!\n")); + } + } + + ACE_DEBUG ((LM_DEBUG, " (%t) returning from svc ()\n")); + return 0; +} + +int +Test_Task::handle_input (ACE_HANDLE) +{ + this->handled_++; + + if (this->handled_ == NUM_INVOCATIONS) + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock_, -1); + Test_Task::done_cnt_++; + ACE_DEBUG ((LM_DEBUG, + " (%t) Test_Task: handle_input! done_cnt_ = %d.\n", + Test_Task::done_cnt_)); + } + + ACE_OS::thr_yield (); + return -1; +} + +static void * +dispatch (void *arg) +{ + // every thread must register the same stream to write to file + if (out_stream) + { + ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM); + ACE_LOG_MSG->msg_ostream (out_stream); + } + + ACE_DEBUG ((LM_DEBUG, " (%t) Dispatcher Thread started!\n")); + ACE_Reactor *r = (ACE_Reactor *) arg; + int result; + + r->owner (ACE_OS::thr_self ()); + + while (1) + { + result = r->handle_events (); + if (result <= 0) + ACE_DEBUG ((LM_DEBUG, "Dispatch: handle_events (): %d", result)); + } + + return 0; +} + +extern "C" void +handler (int) +{ + *out_stream << flush; + out_stream->close (); + ACE_OS::exit (42); +} + +int +main (int argc, char **) +{ + if (argc > 1) + { + // Send output to file. + out_stream = new ofstream ("test_task_three.out", ios::trunc|ios::out); + ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM); + ACE_LOG_MSG->msg_ostream (out_stream); + } + + // Register a signal handler. + ACE_Sig_Action sa (ACE_SignalHandler (handler), SIGINT); + + ACE_Reactor *reactor1 = ACE_Service_Config::reactor (); + ACE_Reactor *reactor2 = new ACE_Reactor (); + + Test_Task t1[TASK_COUNT]; + Test_Task t2[TASK_COUNT]; + + ACE_Thread::spawn (ACE_THR_FUNC (dispatch), reactor2); + + reactor1->owner (ACE_OS::thr_self ()); + + for (int index = 0; index < TASK_COUNT; index++) + { + t1[index].open (reactor1); + t2[index].open (reactor2); + } + + ACE_OS::sleep (3); + + for (;;) + { + ACE_Time_Value timeout (2); + + if (reactor1->handle_events (timeout) <= 0) + { + if (errno == ETIME) + { + ACE_DEBUG ((LM_DEBUG, "no activity within 2 seconds, shutting down\n")); + break; + } + else + ACE_ERROR ((LM_ERROR, "%p error handling events\n", "main")); + } + } + + return 0; +} + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/task_two.cpp b/examples/Threads/task_two.cpp new file mode 100644 index 00000000000..1c6366c4b12 --- /dev/null +++ b/examples/Threads/task_two.cpp @@ -0,0 +1,156 @@ +// $Id$ + +// Exercise more tests for the ACE Tasks. This test can spawn off +// zillions of tasks and then wait for them using both polling and the +// ACE Thread Manager. + +#include "ace/Task.h" + +#include "ace/Service_Config.h" +#include "ace/Synch.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Atomic_Op<ACE_Thread_Mutex, int> ATOMIC_INT; + +static u_long zero = 0; +static ATOMIC_INT task_count (zero); +static ATOMIC_INT max_count (zero); +static ATOMIC_INT wait_count (zero); + +static int n_threads = 0; + +// Default number of tasks. +static const int default_threads = ACE_DEFAULT_THREADS; + +// Default number of times to run the test. +static const int default_iterations = 1000; + +class Task_Test : public ACE_Task<ACE_MT_SYNCH> +{ +public: + virtual int open (void *args = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + virtual int svc (void); + +private: + static ACE_Thread_Mutex lock_; +}; + +ACE_Thread_Mutex Task_Test::lock_; + +int +Task_Test::open (void *) +{ + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Task_Test::lock_, -1); + + task_count++; + ACE_DEBUG ((LM_DEBUG, "(%t) creating Task_Test, task count = %d\n", + (u_long) task_count)); + + return this->activate (THR_BOUND); +} + +int +Task_Test::close (u_long) +{ + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Task_Test::lock_, -1); + + task_count--; + ACE_DEBUG ((LM_DEBUG, "(%t) destroying Task_Test, task count = %d\n", + (u_long) task_count)); + wait_count--; +// delete this; + return 0; +} + +int +Task_Test::put (ACE_Message_Block *, + ACE_Time_Value *) +{ + return 0; +} + +int +Task_Test::svc (void) +{ + wait_count++; + max_count++; + + ACE_DEBUG ((LM_DEBUG, "(%t) svc: waiting\n")); + + for (;;) + if (max_count >= n_threads) + break; + else + ACE_Thread::yield (); + + ACE_DEBUG ((LM_DEBUG, "(%t) svc: finished waiting\n")); + return 0; +} + +int +main (int argc, char *argv[]) +{ + n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : default_threads; + int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : default_iterations; + + Task_Test **task_array = new Task_Test *[n_threads]; + + for (int i = 1; i <= n_iterations; i++) + { + ACE_DEBUG ((LM_DEBUG, "(%t) iteration = %d, max_count %d\n", + i, (u_long) max_count)); + max_count = 0; + + ACE_DEBUG ((LM_DEBUG, "(%t) starting %d task%s\n", + n_threads, n_threads == 1 ? "" : "s")); + + // Launch the new tasks. + for (int j = 0; j < n_threads; j++) + { + task_array[j] = new Task_Test; + // Activate the task, i.e., make it an active object. + task_array[j]->open (); + } + + // Wait for initialization to kick in. + while (max_count == 0) + ACE_Thread::yield (); + + ACE_DEBUG ((LM_DEBUG, "(%t) waiting for threads to finish\n")); + + // Wait for the threads to finish this iteration. + while (max_count != n_threads && wait_count != 0) + ACE_Thread::yield (); + + ACE_DEBUG ((LM_DEBUG, "(%t) iteration %d finished, max_count %d, wait_count %d, waiting for tasks to exit\n", + i, (u_long) max_count, (u_long) wait_count)); + + // Wait for all the tasks to exit. + ACE_Service_Config::thr_mgr ()->wait (); + + // Delete the existing tasks. + for (int k = 0; k < n_threads; k++) + delete task_array[k]; + } + + delete [] task_array; + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down the test\n")); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/thread_manager.cpp b/examples/Threads/thread_manager.cpp new file mode 100644 index 00000000000..73029d70d88 --- /dev/null +++ b/examples/Threads/thread_manager.cpp @@ -0,0 +1,104 @@ +// $Id$ + +// Test out the group management mechanisms provided by the +// ACE_Thread_Manager, including the group signal handling, group +// suspension and resumption, and cooperative thread cancellation +// mechanisms. + +#include "ace/Service_Config.h" +#include "ace/Thread_Manager.h" + +#if defined (ACE_HAS_THREADS) + +extern "C" void +handler (int signum) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) received signal %d\n", signum)); +} + +static void * +worker (int iterations) +{ + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + + for (int i = 0; i < iterations; i++) + { + if ((i % 1000) == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) checking cancellation before iteration %d!\n", + i)); + + if (ACE_Service_Config::thr_mgr ()->testcancel (ACE_Thread::self ()) != 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) has been cancelled before iteration %d!\n", + i)); + break; + } + } + } + + // Destructor removes thread from Thread_Manager. + return 0; +} + +static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS; +static const int DEFAULT_ITERATIONS = 100000; + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon; + + daemon.open (argv[0]); + + // Register a signal handler. + ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT); + + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS; + int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS; + + ACE_Thread_Manager *thr_mgr = ACE_Service_Config::thr_mgr (); + + int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker), + (void *) n_iterations, + THR_NEW_LWP | THR_DETACHED); + + // Wait for 1 second and then suspend every thread in the group. + ACE_OS::sleep (1); + ACE_DEBUG ((LM_DEBUG, "(%t) suspending group\n")); + if (thr_mgr->suspend_grp (grp_id) == -1) + ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "suspend_grp")); + + // Wait for 1 more second and then resume every thread in the + // group. + ACE_OS::sleep (ACE_Time_Value (1)); + ACE_DEBUG ((LM_DEBUG, "(%t) resuming group\n")); + if (thr_mgr->resume_grp (grp_id) == -1) + ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "resume_grp")); + + // Wait for 1 more second and then send a SIGINT to every thread in + // the group. + ACE_OS::sleep (ACE_Time_Value (1)); + ACE_DEBUG ((LM_DEBUG, "(%t) signaling group\n")); + if (thr_mgr->kill_grp (grp_id, SIGINT) == -1) + ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "kill_grp")); + + // Wait for 1 more second and then cancel all the threads. + ACE_OS::sleep (ACE_Time_Value (1)); + ACE_DEBUG ((LM_DEBUG, "(%t) cancelling group\n")); + if (thr_mgr->cancel_grp (grp_id) == -1) + ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "cancel_grp")); + + // Perform a barrier wait until all the threads have shut down. + thr_mgr->wait (); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, "threads not supported on this platform\n"), -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp new file mode 100644 index 00000000000..9478ed0883d --- /dev/null +++ b/examples/Threads/thread_pool.cpp @@ -0,0 +1,214 @@ +// This test program illustrates how the ACE task synchronization +// $Id$ + +// mechanisms work in conjunction with the ACE_Task and the +// ACE_Thread_Manager. If the manual flag is not set input comes from +// stdin until the user enters a return only. This stops all workers +// via a message block of length 0. This is an alternative shutdown of +// workers compared to queue deactivate. +// +// This code is original based on a test program written by Karlheinz +// Dorn. It was modified to utilize more "ACE" features by Doug Schmidt. + +#include "ace/Task.h" +#include "ace/Service_Config.h" + +#include "ace/Task.h" + +#if defined (ACE_HAS_THREADS) + +// Number of iterations to run the test. +static int n_iterations = 100; + +class Thread_Pool : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Thread_Pool (ACE_Thread_Manager *thr_mgr, int n_threads); + + virtual int svc (void); + // Iterate <n_iterations> time printing off a message and "waiting" + // for all other threads to complete this iteration. + + virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0); + // This allows the producer to pass messages to the <Thread_Pool>. + +private: + virtual int close (u_long); + + // = Not needed for this test. + virtual int open (void *) { return 0; } +}; + +int +Thread_Pool::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) close of worker\n")); + return 0; +} + +Thread_Pool::Thread_Pool (ACE_Thread_Manager *thr_mgr, + int n_threads) + : ACE_Task<ACE_MT_SYNCH> (thr_mgr) +{ + // Create worker threads. + if (this->activate (THR_NEW_LWP, n_threads) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "activate failed")); +} + +// Simply enqueue the Message_Block into the end of the queue. + +int +Thread_Pool::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +{ + return this->putq (mb, tv); +} + +// Iterate <n_iterations> time printing off a message and "waiting" +// for all other threads to complete this iteration. + +int +Thread_Pool::svc (void) +{ + // Note that the ACE_Task::svc_run () method automatically adds us to + // the Thread_Manager when the thread begins. + + int result = 0; + int count = 1; + + // Keep looping, reading a message out of the queue, until we get a + // message with a length == 0, which signals us to quit. + + for (;; count++) + { + ACE_Message_Block *mb; + + ACE_DEBUG ((LM_DEBUG, "(%t) in iteration %d before getq ()\n", count)); + + if (this->getq (mb) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%t) in iteration %d, got result -1, exiting\n", count)); + break; + } + + int length = mb->length (); + + if (length > 0) + ACE_DEBUG ((LM_DEBUG, + "(%t) in iteration %d, length = %d, text = \"%*s\"\n", + count, length, length - 1, mb->rd_ptr ())); + + // We're responsible for deallocating this. + delete mb; + + if (length == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) in iteration %d, got NULL message, exiting\n", + count)); + break; + } + } + + // Note that the ACE_Task::svc_run () method automatically removes + // us from the Thread_Manager when the thread exits. + return 0; +} + +static void +produce (Thread_Pool &thread_pool) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) producer start, dumping the Thread_Pool\n")); + thread_pool.dump (); + + for (int n;;) + { + // Allocate a new message. + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + +#if defined (manual) + ACE_DEBUG ((LM_DEBUG, + "(%t) press chars and enter to put a new message into task queue...")); + n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); +#else // Automatically generate messages. + static int count = 0; + + ACE_OS::sprintf (mb->rd_ptr (), "%d\n", count); + + n = ACE_OS::strlen (mb->rd_ptr ()); + + if (count == n_iterations) + n = 1; // Indicate that we need to shut down. + else + count++; + + if (count == 0 || (count % 20 == 0)) + ACE_OS::sleep (1); +#endif /* manual */ + if (n > 1) + { + // Send a normal message to the waiting threads and continue + // producing. + mb->wr_ptr (n); + + // Pass the message to the Thread_Pool. + if (thread_pool.put (mb) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + } + else + { + // Send a shutdown message to the waiting threads and exit. + ACE_DEBUG ((LM_DEBUG, "\n(%t) start loop, dump of task:\n")); + thread_pool.dump (); + + for (int i = thread_pool.thr_count (); i > 0; i--) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) EOF, enqueueing NULL block for thread = %d\n", + i)); + + // Enqueue a NULL message to flag each consumer to + // shutdown. + if (thread_pool.put (new ACE_Message_Block) == -1) + ACE_ERROR ((LM_ERROR, " (%t) %p\n", "put")); + } + + ACE_DEBUG ((LM_DEBUG, "\n(%t) end loop, dump of task:\n")); + thread_pool.dump (); + break; + } + } +} + +int +main (int argc, char *argv[]) +{ + int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : ACE_DEFAULT_THREADS; + n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : n_iterations; + + ACE_DEBUG ((LM_DEBUG, "(%t) argc = %d, threads = %d\n", + argc, n_threads)); + + // Create the worker tasks. + Thread_Pool thread_pool (ACE_Service_Config::thr_mgr (), + n_threads); + + // Create work for the worker tasks to process in their own threads. + produce (thread_pool); + + // Wait for all the threads to reach their exit point. + + ACE_DEBUG ((LM_DEBUG, "(%t) waiting with thread manager...\n")); + ACE_Service_Config::thr_mgr ()->wait (); + + ACE_DEBUG ((LM_DEBUG, "(%t) destroying worker tasks and exiting...\n")); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/thread_specific.cpp b/examples/Threads/thread_specific.cpp new file mode 100644 index 00000000000..f7a4f6dccf3 --- /dev/null +++ b/examples/Threads/thread_specific.cpp @@ -0,0 +1,219 @@ +#include "ace/Service_Config.h" +// $Id$ + +#include "ace/Synch.h" + +#if defined (ACE_HAS_THREADS) + +// Define a class that will be stored in thread-specific data. Note +// that as far as this class is concerned it's just a regular C++ +// class. The ACE_TSS wrapper transparently ensures that +// objects of this class will be placed in thread-specific storage. +// All calls on ACE_TSS::operator->() are delegated to the +// appropriate method in the Errno class. + +class Errno +{ +public: + int error (void) { return this->errno_; } + void error (int i) { this->errno_ = i; } + + int line (void) { return this->lineno_; } + void line (int l) { this->lineno_ = l; } + + // Errno::flags_ is a static variable, so we've got to protect it + // with a mutex since it isn't kept in thread-specific storage. + int flags (void) + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Errno::lock_, -1); + + return Errno::flags_; + } + + void flags (int f) + { + ACE_GUARD (ACE_Thread_Mutex, ace_mon, Errno::lock_); + + Errno::flags_ = f; + } + +private: + // = errno_ and lineno_ will be thread-specific data so they don't + // need a lock. + int errno_; + int lineno_; + + static int flags_; +#if defined (ACE_HAS_THREADS) + // flags_ needs a lock. + static ACE_Thread_Mutex lock_; +#endif /* ACE_HAS_THREADS */ +}; + +// Static variables. +ACE_MT (ACE_Thread_Mutex Errno::lock_); +int Errno::flags_; + +// This is our thread-specific error handler... +static ACE_TSS<Errno> TSS_Error; + +#if defined (ACE_HAS_THREADS) +// Serializes output via cout. +static ACE_Thread_Mutex lock; + +typedef ACE_TSS_Guard<ACE_Thread_Mutex> GUARD; +#else +// Serializes output via cout. +static ACE_Null_Mutex lock; + +typedef ACE_Guard<ACE_Null_Mutex> GUARD; +#endif /* ACE_HAS_THREADS */ + +static void +cleanup (void *ptr) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) in cleanup, ptr = %x\n", ptr)); + + delete ptr; +} + +// This worker function is the entry point for each thread. + +static void * +worker (void *c) +{ + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + int count = int (c); + + ACE_thread_key_t key = 0; + int *ip = 0; + + // Make one key that will be available when the thread exits so that + // we'll have something to cleanup! + + if (ACE_OS::thr_keycreate (&key, cleanup) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate")); + + ip = new int; + + if (ACE_OS::thr_setspecific (key, (void *) ip) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + for (int i = 0; i < count; i++) + { + if (ACE_OS::thr_keycreate (&key, cleanup) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate")); + + ip = new int; + + ACE_DEBUG ((LM_DEBUG, "(%t) in worker 1, key = %d, ip = %x\n", key, ip)); + + if (ACE_OS::thr_setspecific (key, (void *) ip) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + if (ACE_OS::thr_getspecific (key, (void **) &ip) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + if (ACE_OS::thr_setspecific (key, (void *) 0) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + delete ip; + + if (ACE_OS::thr_keyfree (key) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keyfree")); + + // Cause an error. + ACE_OS::read (ACE_INVALID_HANDLE, 0, 0); + + // The following two lines set the thread-specific state. + TSS_Error->error (errno); + TSS_Error->line (__LINE__); + + // This sets the static state (note how C++ makes it easy to do + // both). + TSS_Error->flags (count); + + { + // Use the guard to serialize access to cout... + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, lock, 0); + + cout << "(" << ACE_Thread::self () + << ") errno = " << TSS_Error->error () + << ", lineno = " << TSS_Error->line () + << ", flags = " << TSS_Error->flags () + << endl; + } + key = 0; + + if (ACE_OS::thr_keycreate (&key, cleanup) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keycreate")); + + ip = new int; + + ACE_DEBUG ((LM_DEBUG, "(%t) in worker 2, key = %d, ip = %x\n", key, ip)); + + if (ACE_OS::thr_setspecific (key, (void *) ip) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + if (ACE_OS::thr_getspecific (key, (void **) &ip) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + if (ACE_OS::thr_setspecific (key, (void *) 0) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_setspecific")); + + delete ip; + + if (ACE_OS::thr_keyfree (key) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "ACE_OS::thr_keyfree")); + } + + ACE_DEBUG ((LM_DEBUG, "(%t) exiting\n")); + return 0; +} + +extern "C" void +handler (int signum) +{ + ACE_DEBUG ((LM_DEBUG, "signal = %S\n", signum)); + ACE_Service_Config::thr_mgr ()->exit (0); +} + +int +main (int argc, char *argv[]) +{ + // The Service_Config must be the first object defined in main... + ACE_Service_Config daemon (argv[0]); + ACE_Thread_Control tc (ACE_Service_Config::thr_mgr ()); + int threads = argc > 1 ? ACE_OS::atoi (argv[1]) : 4; + int count = argc > 2 ? ACE_OS::atoi (argv[2]) : 10000; + + // Register a signal handler. + ACE_Sig_Action sa ((ACE_SignalHandler) (handler), SIGINT); + +#if defined (ACE_HAS_THREADS) + if (ACE_Service_Config::thr_mgr ()->spawn_n (threads, + ACE_THR_FUNC (&worker), + (void *) count, + THR_BOUND | THR_DETACHED) == -1) + ACE_OS::perror ("ACE_Thread_Manager::spawn_n"); + + ACE_Service_Config::thr_mgr ()->wait (); +#else + worker ((void *) count); +#endif /* ACE_HAS_THREADS */ + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_TSS<Errno>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "ACE doesn't support support threads on this platform (yet)\n"), + -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/token.cpp b/examples/Threads/token.cpp new file mode 100644 index 00000000000..5a51496d011 --- /dev/null +++ b/examples/Threads/token.cpp @@ -0,0 +1,76 @@ +// Test out the ACE Token class. +// $Id$ + +#include "ace/Token.h" +#include "ace/Task.h" + +#if defined (ACE_HAS_THREADS) + +class My_Task : public ACE_Task<ACE_MT_SYNCH> +{ +public: + My_Task (int n); + virtual int open (void *) { return 0; } + virtual int close (u_long) { return 0; } + virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } + virtual int svc (void); + + static void sleep_hook (void *); + +private: + ACE_Token token_; +}; + +My_Task::My_Task (int n) +{ + // Make this Task into an Active Object. + this->activate (THR_BOUND | THR_DETACHED, n); + + // Wait for all the threads to exit. + this->thr_mgr ()->wait (); +} + +void +My_Task::sleep_hook (void *) +{ + cerr << '(' << ACE_Thread::self () << ')' + << " blocking, My_Task::sleep_hook () called" << endl; +} + +// Test out the behavior of the ACE_Token class. + +int +My_Task::svc (void) +{ + for (int i = 0; i < 10000; i++) + { + // Wait for up to 1 millisecond past the current time to get the token. + ACE_Time_Value timeout (ACE_OS::time (0), 1000); + + if (this->token_.acquire (&My_Task::sleep_hook, 0, &timeout) == 1) + { + this->token_.acquire (); + this->token_.renew (); + this->token_.release (); + this->token_.release (); + } + else + ACE_Thread::yield (); + } + return 0; +} + +int +main (int argc, char *argv[]) +{ + My_Task tasks (argc > 1 ? atoi (argv[1]) : 4); + + return 0; +} +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, "your platform doesn't support threads\n"), -1); +} +#endif /* */ diff --git a/examples/Threads/tss1.cpp b/examples/Threads/tss1.cpp new file mode 100644 index 00000000000..7efdc9dc3ef --- /dev/null +++ b/examples/Threads/tss1.cpp @@ -0,0 +1,164 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// TSS_Test.cpp +// +// = DESCRIPTION +// This program tests thread specific storage of data. The ACE_TSS +// wrapper transparently ensures that the objects of this class +// will be placed in thread-specific storage. All calls on +// ACE_TSS::operator->() are delegated to the appropriate method +// in the Errno class. +// +// = AUTHOR +// Detlef Becker +// +// ============================================================================ + +#include "ace/Service_Config.h" +#include "ace/Synch.h" +#include "ace/Task.h" + +#if defined (ACE_HAS_THREADS) + +static int iterations = 100; + +class Errno +{ +public: + int error (void) { return this->errno_; } + void error (int i) { this->errno_ = i; } + + int line (void) { return this->lineno_; } + void line (int l) { this->lineno_ = l; } + + // Errno::flags_ is a static variable, so we've got to protect it + // with a mutex since it isn't kept in thread-specific storage. + int flags (void) { + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_Mon, Errno::lock_, -1)); + + return Errno::flags_; + } + int flags (int f) + { + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Errno::lock_, -1)); + + Errno::flags_ = f; + return 0; + } + +private: + // = errno_ and lineno_ will be thread-specific data so they don't + // need a lock. + int errno_; + int lineno_; + + static int flags_; +#if defined (ACE_HAS_THREADS) + // flags_ needs a lock. + static ACE_Thread_Mutex lock_; +#endif /* ACE_HAS_THREADS */ +}; + +// Static variables. +ACE_MT (ACE_Thread_Mutex Errno::lock_); +int Errno::flags_; + +// This is our thread-specific error handler... +static ACE_TSS<Errno> TSS_Error; + +#if defined (ACE_HAS_THREADS) +// Serializes output via cout. +static ACE_Thread_Mutex lock; + +typedef ACE_TSS_Guard<ACE_Thread_Mutex> GUARD; +#else +// Serializes output via cout. +static ACE_Null_Mutex lock; + +typedef ACE_Guard<ACE_Null_Mutex> GUARD; +#endif /* ACE_HAS_THREADS */ + +// Keeps track of whether Tester::close () has started. +static int close_started = 0; + +template <ACE_SYNCH_1> +class Tester: public ACE_Task<ACE_SYNCH_2> +{ +public: + Tester (void) {} + ~Tester (void) {} + + virtual int open (void *theArgs = 0); + virtual int close (u_long theArg = 0); + virtual int put (ACE_Message_Block *theMsgBlock, + ACE_Time_Value *theTimeVal = 0); + virtual int svc (void); +}; + +template <ACE_SYNCH_1> int +Tester<ACE_SYNCH_2>::open (void *) +{ + return this->activate (); +} + +template <ACE_SYNCH_1> +int Tester<ACE_SYNCH_2>::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "close running\n!")); + close_started = 1; + ACE_OS::sleep (2); + ACE_DEBUG ((LM_DEBUG, "close: trying to log error code 7!\n")); + TSS_Error->error (7); + ACE_DEBUG ((LM_DEBUG, "close: logging succeeded!\n")); + return 0; +} + +template <ACE_SYNCH_1> int +Tester<ACE_SYNCH_2>::put (ACE_Message_Block *, ACE_Time_Value *) +{ + return 0; +} + +template <ACE_SYNCH_1> int +Tester<ACE_SYNCH_2>::svc (void) +{ + return 0; +} + +int +main (int, char *[]) +{ + Tester<ACE_MT_SYNCH> tester; + + tester.open (); + + while (!close_started) + continue; + + ACE_DEBUG ((LM_DEBUG, "main: trying to log error code 7!\n")); + + TSS_Error->error (3); + + ACE_DEBUG ((LM_DEBUG, "main: logging succeeded!\n")); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_TSS<Errno>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ + +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "ACE doesn't support support threads on this platform (yet)\n"), + -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/Threads/tss2.cpp b/examples/Threads/tss2.cpp new file mode 100644 index 00000000000..24a8d958e91 --- /dev/null +++ b/examples/Threads/tss2.cpp @@ -0,0 +1,252 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// TSS_Test.cpp +// +// = DESCRIPTION +// This program tests thread specific storage of data. The ACE_TSS +// wrapper transparently ensures that the objects of this class +// will be placed in thread-specific storage. All calls on +// ACE_TSS::operator->() are delegated to the appropriate method +// in the Errno class. +// +// = AUTHOR +// Prashant Jain and Doug Schmidt +// +// ============================================================================ + +#include "ace/Task.h" +#include "ace/Token.h" + +#if defined (ACE_HAS_THREADS) + +class TSS_Obj +{ +public: + + TSS_Obj (void); + ~TSS_Obj (void); + +private: + static int count_; + static ACE_Thread_Mutex lock_; +}; + +int TSS_Obj::count_ = 0; +ACE_Thread_Mutex TSS_Obj::lock_; + +TSS_Obj::TSS_Obj (void) +{ + ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_); + + count_++; + cout << "TO+ : " << count_ << endl; +} + +TSS_Obj::~TSS_Obj (void) +{ + ACE_GUARD (ACE_Thread_Mutex, ace_mon, lock_); + + count_--; + cout << "TO- : " << count_ << endl; +} + +class Test_Task +{ +public: + + Test_Task (void); + ~Test_Task (void); + + int open (void *arg); + + static void *svc (void *arg); + static int wait_count_; + static int max_count_; + +private: + static int count_; +}; + +int Test_Task::count_ = 0; +int Test_Task::wait_count_ = 0; +int Test_Task::max_count_ = 0; +int num_threads_ = 0; + +ACE_Token token; + +Test_Task::Test_Task (void) +{ + ACE_GUARD (ACE_Token, ace_mon, token); + + count_++; + cout << "Test_Task+ : " + << count_ << " (" + << ACE_OS::thr_self () + << ")" << endl; +} + +Test_Task::~Test_Task (void) +{ + ACE_GUARD (ACE_Token, ace_mon, token); + + count_--; + cout << "Test_Task- : " + << count_ << " (" + << ACE_OS::thr_self () + << ")" << endl; + + wait_count_--; +} + +void * +Test_Task::svc (void *arg) +{ + ACE_TSS<TSS_Obj> tss (new TSS_Obj); + + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0); + + wait_count_++; + max_count_++; + cout << "svc: waiting (" << ACE_OS::thr_self () << ")" << endl; + } + + while (1) + { + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0); + + if (max_count_ >= num_threads_) + break; + else + { + ace_mon.release (); + ACE_Thread::yield (); + ace_mon.acquire (); + } + } + + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, 0); + + cout << "svc: waiting (" << ACE_OS::thr_self () << ") finished" << endl; + } + } + + delete (Test_Task *) arg; + + return 0; +} + +int +Test_Task::open (void *arg) +{ + if (ACE_Thread::spawn (Test_Task::svc, arg) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Thread::spawn"), 0); + + return 0; +} + +int +main (int argc, char **argv) +{ + if (argc != 2) + { + cout << "Missing parameters!" << endl; + return 1; + } + + int num_Tasks = atoi (argv[1]); + + num_threads_ = num_Tasks; + + Test_Task **task_arr = (Test_Task**) new char[sizeof (Test_Task*) * num_Tasks]; + + while (1) + { + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1); + + cout << "ReseTest_Tasking Test_Task::max_count_ from: " + << Test_Task::max_count_ << endl; + + Test_Task::max_count_ = 0; + } + + for (int i = 0; i < num_Tasks; i++) + { + task_arr[i] = new Test_Task; + task_arr[i]->open (task_arr[i]); + } + + cout << "Waiting for first thread started..." << endl; + + for (;;) + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1); + + if (Test_Task::max_count_ != 0 ) + { + ace_mon.release (); + ACE_Thread::yield (); + ace_mon.acquire (); + break; + } + ace_mon.release (); + ACE_Thread::yield (); + ace_mon.acquire (); + } + + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1); + + cout << "First thread started!" << endl + << "Waiting for all threads finished..." << endl; + } + + for (;;) + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1); + + if (!(Test_Task::max_count_ == num_threads_ + && Test_Task::wait_count_ == 0)) + { + ace_mon.release (); + ACE_Thread::yield (); + ace_mon.acquire (); + continue; + } + + cout << "Test_Task::max_count_ = " + << Test_Task::max_count_ + << " Test_Task::wait_count_ = " + << Test_Task::wait_count_ + << endl; + break; + } + + { + ACE_GUARD_RETURN (ACE_Token, ace_mon, token, -1); + cout << "All threads finished..." << endl; + } + + ACE_OS::sleep (2); + } + + return 0; +} + +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp index e645866b34c..c072d012211 100644 --- a/netsvcs/servers/main.cpp +++ b/netsvcs/servers/main.cpp @@ -35,11 +35,12 @@ main (int argc, char *argv[]) l_argv[1] = 0; so = ACE_SVC_INVOKE (ACE_TS_Server_Acceptor); - if (so->init (2, l_argv) == -1) + if (so->init (1, l_argv) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "ACE_TS_Server_Acceptor", 1)); l_argv[0] = argv[0]; l_argv[1] = "-p 10011"; + l_argv[2] = 0; so = ACE_SVC_INVOKE (ACE_TS_Clerk_Processor); if (so->init (2, l_argv) == -1) diff --git a/tests/Barrier_Test.cpp b/tests/Barrier_Test.cpp index 37e779181a7..be44b998f84 100644 --- a/tests/Barrier_Test.cpp +++ b/tests/Barrier_Test.cpp @@ -66,7 +66,7 @@ tester (Tester_Args *args) #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Barrier_Test"); diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp index f08b3c4691e..bce8c31b266 100644 --- a/tests/Buffer_Stream_Test.cpp +++ b/tests/Buffer_Stream_Test.cpp @@ -197,7 +197,7 @@ Consumer::svc (void) // Main driver function. int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Buffer_Stream_Test"); diff --git a/tests/CPP_Test.cpp b/tests/CPP_Test.cpp index fed43aaa5b2..4c0bd291a50 100644 --- a/tests/CPP_Test.cpp +++ b/tests/CPP_Test.cpp @@ -250,7 +250,7 @@ spawn (void) } int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("CPP_Test"); diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp index 5357ee597bc..00ed154ebbf 100644 --- a/tests/Future_Test.cpp +++ b/tests/Future_Test.cpp @@ -311,7 +311,7 @@ template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>; #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Future_Test"); diff --git a/tests/Handle_Set_Test.cpp b/tests/Handle_Set_Test.cpp index d8803dfb137..ef0ec5ffe17 100644 --- a/tests/Handle_Set_Test.cpp +++ b/tests/Handle_Set_Test.cpp @@ -65,7 +65,7 @@ run_test (int count) } int -main (int argc, char *argv[]) +main (int argc, char *[]) { ACE_START_TEST ("Handle_Set_Test"); diff --git a/tests/MM_Shared_Memory_Test.cpp b/tests/MM_Shared_Memory_Test.cpp index 0f8e38a9da5..aacbfd3137b 100644 --- a/tests/MM_Shared_Memory_Test.cpp +++ b/tests/MM_Shared_Memory_Test.cpp @@ -120,7 +120,7 @@ spawn (void) } int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("MM_Shared_Memory_Test"); diff --git a/tests/Map_Manager_Test.cpp b/tests/Map_Manager_Test.cpp index 51f4f8dc193..e059b2bcb9b 100644 --- a/tests/Map_Manager_Test.cpp +++ b/tests/Map_Manager_Test.cpp @@ -31,7 +31,7 @@ typedef ACE_Map_Reverse_Iterator <KEY, VALUE, MUTEX> REVERSE_ITERATOR; typedef ACE_Map_Entry <KEY, VALUE> ENTRY; int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Map_Manager_Test"); diff --git a/tests/Mem_Map_Test.cpp b/tests/Mem_Map_Test.cpp index 4f5216c4d97..3054d4208ad 100644 --- a/tests/Mem_Map_Test.cpp +++ b/tests/Mem_Map_Test.cpp @@ -91,7 +91,7 @@ create_test_file (int size, int num_lines) } int -main (int, char **argv) +main (int, char *[]) { ACE_START_TEST ("Mem_Map_Test"); diff --git a/tests/Message_Queue_Test.cpp b/tests/Message_Queue_Test.cpp index 4daee69cb9a..fcff279346d 100644 --- a/tests/Message_Queue_Test.cpp +++ b/tests/Message_Queue_Test.cpp @@ -26,7 +26,7 @@ typedef ACE_Message_Queue_Iterator <ACE_NULL_SYNCH> ITERATOR; typedef ACE_Message_Queue_Reverse_Iterator <ACE_NULL_SYNCH> REVERSE_ITERATOR; int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Message_Queue_Test"); diff --git a/tests/Mutex_Test.cpp b/tests/Mutex_Test.cpp index 518b957fb40..7bdc4acbe84 100644 --- a/tests/Mutex_Test.cpp +++ b/tests/Mutex_Test.cpp @@ -103,7 +103,7 @@ spawn (void) } int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Mutex_Test"); diff --git a/tests/Pipe_Test.cpp b/tests/Pipe_Test.cpp index 80a04f8c474..efd9edfd906 100644 --- a/tests/Pipe_Test.cpp +++ b/tests/Pipe_Test.cpp @@ -45,10 +45,10 @@ parse_args (int argc, char *argv[]) switch (c) { case 'd': - ::close_pipe = 0; + close_pipe = 0; break; case 'c': - ::child_process = 1; + child_process = 1; break; default: print_usage_and_die (); @@ -65,15 +65,15 @@ open (ACE_Pipe &pipe, ACE_ASSERT (pipe.read_handle () != ACE_INVALID_HANDLE && pipe.write_handle () != ACE_INVALID_HANDLE); - if (::close_pipe) + if (close_pipe) pipe.close (); } int main (int argc, char *argv[]) { - ::parse_args (argc, argv); - if (::child_process) + parse_args (argc, argv); + if (child_process) { ACE_APPEND_LOG ("Pipe_Test-children"); ACE_Pipe a, b, c, d, e; @@ -94,7 +94,7 @@ main (int argc, char *argv[]) char *s_argv[4]; s_argv[0] = "Pipe_Test" ACE_PLATFORM_EXE_SUFFIX; s_argv[1] = "-c"; // child/slave process - if (::close_pipe == 0) + if (close_pipe == 0) s_argv[2] = "-d"; else s_argv[2] = 0; diff --git a/tests/Priority_Buffer_Test.cpp b/tests/Priority_Buffer_Test.cpp index fd20df7937c..d947b0c7da6 100644 --- a/tests/Priority_Buffer_Test.cpp +++ b/tests/Priority_Buffer_Test.cpp @@ -135,7 +135,7 @@ producer (void *args) // size of each line. int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Priority_Buffer_Test"); diff --git a/tests/Reactor_Timer_Test.cpp b/tests/Reactor_Timer_Test.cpp index ff5d6b9938e..4f843c77c37 100644 --- a/tests/Reactor_Timer_Test.cpp +++ b/tests/Reactor_Timer_Test.cpp @@ -50,7 +50,7 @@ public: }; int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Reactor_Timer_Test"); diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp index b2780d94a04..8bad943910d 100644 --- a/tests/Reactors_Test.cpp +++ b/tests/Reactors_Test.cpp @@ -177,7 +177,7 @@ template class ACE_Atomic_Op<ACE_Thread_Mutex, u_long>; #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Reactors_Test"); diff --git a/tests/Recursive_Mutex_Test.cpp b/tests/Recursive_Mutex_Test.cpp index d626899ff29..14975dc3c99 100644 --- a/tests/Recursive_Mutex_Test.cpp +++ b/tests/Recursive_Mutex_Test.cpp @@ -64,7 +64,7 @@ worker (void *arg) #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Recursive_Mutex_Test"); diff --git a/tests/SPIPE_Test.cpp b/tests/SPIPE_Test.cpp index 47249caebeb..c1f8c287b92 100644 --- a/tests/SPIPE_Test.cpp +++ b/tests/SPIPE_Test.cpp @@ -134,7 +134,7 @@ spawn (void) } int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("SPIPE_Test"); diff --git a/tests/SString_Test.cpp b/tests/SString_Test.cpp index f9b5df0f405..2f5e9598e03 100644 --- a/tests/SString_Test.cpp +++ b/tests/SString_Test.cpp @@ -22,7 +22,7 @@ #include "test_config.h" int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("SString_Test"); diff --git a/tests/SV_Shared_Memory_Test.cpp b/tests/SV_Shared_Memory_Test.cpp index 594918ef216..54ca146294f 100644 --- a/tests/SV_Shared_Memory_Test.cpp +++ b/tests/SV_Shared_Memory_Test.cpp @@ -95,7 +95,7 @@ child (char *shm) #endif /* ACE_HAS_SYSV_IPC */ int -main (int, char * /* argv */[]) +main (int, char *[]) { ACE_START_TEST ("SV_Shared_Memory_Test"); diff --git a/tests/TSS_Test.cpp b/tests/TSS_Test.cpp index 5efd91cf7ce..0642f84e2ba 100644 --- a/tests/TSS_Test.cpp +++ b/tests/TSS_Test.cpp @@ -22,10 +22,10 @@ #include "ace/Service_Config.h" #include "ace/Synch.h" - #include "test_config.h" -#if defined (ACE_HAS_THREADS) +#if defined (ACE_HAS_THREADS) +#if !defined (ACE_TEMPLATES_REQUIRE_PRAGMA) // AIX is evil static const int ITERATIONS = 100; @@ -193,10 +193,11 @@ handler (int signum) template class ACE_TSS<Errno>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ +#endif /* !ACE_TEMPLATES_REQUIRE_PRAGMA */ #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("TSS_Test"); @@ -213,6 +214,9 @@ main (int, char *argv[]) ACE_OS::perror ("ACE_Thread_Manager::spawn_n"); ACE_Service_Config::thr_mgr ()->wait (); +#elif defined (ACE_TEMPLATES_REQUIRE_PRAGMA) + ACE_ERROR ((LM_ERROR, + "This platform has an evil template instantiation mechanism...\n")); #else ACE_ERROR ((LM_ERROR, "threads are not supported on this platform\n")); diff --git a/tests/Task_Test.cpp b/tests/Task_Test.cpp index 57bbc72eec9..be1256c3023 100644 --- a/tests/Task_Test.cpp +++ b/tests/Task_Test.cpp @@ -91,7 +91,7 @@ Barrier_Task::svc (void) #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Task_Test"); diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index a61f92b22eb..8d7c654c4a3 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -190,7 +190,7 @@ produce (Thread_Pool &thread_pool) #endif /* ACE_HAS_THREADS */ int -main (int argc, char *argv[]) +main (int argc, char *[]) { ACE_START_TEST ("Thread_Pool_Test"); #if defined (ACE_HAS_THREADS) diff --git a/tests/Time_Service_Test.cpp b/tests/Time_Service_Test.cpp index f83c14f4429..d26eb9fb83c 100644 --- a/tests/Time_Service_Test.cpp +++ b/tests/Time_Service_Test.cpp @@ -27,7 +27,7 @@ #include "ace/Process.h" int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Time_Service_Test"); diff --git a/tests/Time_Value_Test.cpp b/tests/Time_Value_Test.cpp index fcce2d43f6f..74488ead5e3 100644 --- a/tests/Time_Value_Test.cpp +++ b/tests/Time_Value_Test.cpp @@ -22,7 +22,7 @@ #include "test_config.h" int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Time_Value_Test"); diff --git a/tests/Timer_Queue_Test.cpp b/tests/Timer_Queue_Test.cpp index f305fcf45a0..f0ec1c90815 100644 --- a/tests/Timer_Queue_Test.cpp +++ b/tests/Timer_Queue_Test.cpp @@ -35,7 +35,7 @@ public: }; int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("Timer_Queue_Test"); diff --git a/tests/Tokens_Test.cpp b/tests/Tokens_Test.cpp index f5592e76e20..5e22c657c1f 100644 --- a/tests/Tokens_Test.cpp +++ b/tests/Tokens_Test.cpp @@ -175,7 +175,7 @@ run_test (ACE_Token_Proxy *A, #endif /* ACE_HAS_THREADS */ int -main (int, char* argv[]) +main (int, char *[]) { ACE_START_TEST ("Tokens_Test"); #if defined (ACE_HAS_THREADS) diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp index 07a2800fe3c..7d43ced4f12 100644 --- a/tests/UPIPE_SAP_Test.cpp +++ b/tests/UPIPE_SAP_Test.cpp @@ -152,7 +152,7 @@ acceptor (void *args) #endif /* ACE_HAS_THREADS */ int -main (int, char *argv[]) +main (int, char *[]) { ACE_START_TEST ("UPIPE_SAP_Test"); |