diff options
-rw-r--r-- | ACE/ace/Basic_P_Strategy.inl | 2 | ||||
-rw-r--r-- | ACE/ace/DA_Strategy_Base.h | 12 | ||||
-rw-r--r-- | ACE/ace/DA_Strategy_Base.inl | 7 | ||||
-rw-r--r-- | ACE/ace/Efficient_P_Strategy.inl | 3 | ||||
-rw-r--r-- | ACE/ace/Live_P_Strategy.h | 8 | ||||
-rw-r--r-- | ACE/ace/Live_P_Strategy.inl | 43 | ||||
-rw-r--r-- | ACE/ace/k_Efficient_P_Strategy.h | 8 | ||||
-rw-r--r-- | ACE/ace/k_Efficient_P_Strategy.inl | 75 | ||||
-rw-r--r-- | ACE/tests/DA_Reactor_Test.cpp | 1297 | ||||
-rw-r--r-- | ACE/tests/DA_Reactor_Test.h | 200 |
10 files changed, 1589 insertions, 66 deletions
diff --git a/ACE/ace/Basic_P_Strategy.inl b/ACE/ace/Basic_P_Strategy.inl index 656dcbd8a29..b6a8dba546c 100644 --- a/ACE/ace/Basic_P_Strategy.inl +++ b/ACE/ace/Basic_P_Strategy.inl @@ -2,7 +2,7 @@ template <typename AnnotationId> ACE_INLINE Basic_P_Strategy<AnnotationId>::Basic_P_Strategy(int maxThreads) -:DA_Strategy_Base(maxThreads) +:DA_Strategy_Base(maxThreads), t_r(maxThreads) { } diff --git a/ACE/ace/DA_Strategy_Base.h b/ACE/ace/DA_Strategy_Base.h index 02ac40d9ef7..2b566d9b35c 100644 --- a/ACE/ace/DA_Strategy_Base.h +++ b/ACE/ace/DA_Strategy_Base.h @@ -21,7 +21,7 @@ #define DA_STRATEGY_BASE_H #include /**/ "ace/pre.h" -#include "ace/Hash_Map_Manager_T.h" +#include "ace/Hash_Map_Manager.h" #include "ace/Thread_Mutex.h" #include "ace/Atomic_Op_T.h" @@ -71,9 +71,9 @@ public: virtual bool is_deadlock_potential(AnnotationId handle)=0; virtual void grant(AnnotationId handle)=0; virtual void release(AnnotationId upcall_handle)=0; - int get_max_threads() { return num_avail_threads_}; - HASH_ANNOTATIONS_CONST_ITER get_annotations_iter(); - virtual int get_annotation (AnnotationId handle); + int get_max_threads() { return num_avail_threads_.value();} + HASH_ANNOTATIONS_CONST_ITER get_annotations_iter() const; + virtual int get_annotation (AnnotationId handle) const; virtual int add_annotation (AnnotationId handle, int annotation); virtual int remove_annotation (AnnotationId handle); virtual int set_annotations_table (const HASH_ANNOTATIONS_REVERSE_ITER& table); @@ -85,6 +85,10 @@ private: }; +#if defined (__ACE_INLINE__) +#include "ace/DA_Strategy_Base.inl" +#endif /* __ACE_INLINE__ */ + #include /**/ "ace/post.h" #endif /* DA_STRATEGY_BASE_H */
\ No newline at end of file diff --git a/ACE/ace/DA_Strategy_Base.inl b/ACE/ace/DA_Strategy_Base.inl index e726ef2e011..ad62a9f585a 100644 --- a/ACE/ace/DA_Strategy_Base.inl +++ b/ACE/ace/DA_Strategy_Base.inl @@ -13,7 +13,7 @@ DA_Strategy_Base<AnnotationId>::~DA_Strategy_Base() template <typename AnnotationId> ACE_INLINE int -DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id) +DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id) const { int annotation; if (annotations_repo_.find (id, annotation) == -1) @@ -26,7 +26,7 @@ ACE_INLINE int DA_Strategy_Base<AnnotationId>::set_annotations_table ( const HASH_ANNOTATIONS_REVERSE_ITER& table) { - HASH_ANNOTATIONS_CONST_ITER iter(table); + HASH_ANNOTATIONS_REVERSE_ITER iter(table); int rc=0; for (;!(iter.done()); iter++) @@ -61,8 +61,9 @@ ACE_INLINE ACE_Hash_Map_Const_Iterator_Ex<AnnotationId, ACE_Hash<AnnotationId>, ACE_Equal_To<AnnotationId>, ACE_Thread_Mutex> -DA_Strategy_Base<AnnotationId>::get_annotations_iter() +DA_Strategy_Base<AnnotationId>::get_annotations_iter() const { + return annotations_repo_.begin(); } diff --git a/ACE/ace/Efficient_P_Strategy.inl b/ACE/ace/Efficient_P_Strategy.inl index e42a2330f8e..fdd6b76468d 100644 --- a/ACE/ace/Efficient_P_Strategy.inl +++ b/ACE/ace/Efficient_P_Strategy.inl @@ -42,7 +42,6 @@ Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle) p_r--; } } - return true; } template <typename AnnotationId> @@ -50,7 +49,7 @@ ACE_INLINE void Efficient_P_Strategy<AnnotationId>::release(AnnotationId upcall_handle) { - int annotation = get_annotation(handle); + int annotation = get_annotation(upcall_handle); { ACE_Guard<ACE_Thread_Mutex> guard(_lock); t_r ++; diff --git a/ACE/ace/Live_P_Strategy.h b/ACE/ace/Live_P_Strategy.h index 420307d2348..9a5e1782d6f 100644 --- a/ACE/ace/Live_P_Strategy.h +++ b/ACE/ace/Live_P_Strategy.h @@ -19,13 +19,14 @@ #include "ace/DA_Strategy_Base.h" #include "ace/RB_Tree.h" +#include "ace/Mutex.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ //forward decl -class LivePTree; +class Live_P_Tree; template <typename AnnotationId> class Live_P_Strategy : public DA_Strategy_Base<AnnotationId> { @@ -39,7 +40,10 @@ public: virtual void grant(AnnotationId handle); virtual void release(AnnotationId upcall_handle); private: - LivePTree* tree_pimpl_; + Live_P_Tree* tree_pimpl_; + bool min_illegal_is_computed_; + int min_illegal_; + ACE_Mutex computation_mutex_; }; #if defined (__ACE_INLINE__) diff --git a/ACE/ace/Live_P_Strategy.inl b/ACE/ace/Live_P_Strategy.inl index 697c26c45b7..641a50e70f6 100644 --- a/ACE/ace/Live_P_Strategy.inl +++ b/ACE/ace/Live_P_Strategy.inl @@ -42,7 +42,7 @@ private: }; ACE_INLINE -Live_P_Tree::Live_P_Tree(int maxThreads, int k) +Live_P_Tree::Live_P_Tree(int maxThreads) :ACE_RB_Tree(), T_(maxThreads) { @@ -162,7 +162,9 @@ Live_P_Tree::calc_max_i(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr, int extr template <typename AnnotationId> ACE_INLINE Live_P_Strategy<AnnotationId>::Live_P_Strategy(int maxThreads) -:DA_Strategy_Base(maxThreads) +:DA_Strategy_Base(maxThreads), + min_illegal_is_computed_(false), + min_illegal_(0) { } @@ -179,19 +181,18 @@ ACE_INLINE bool Live_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) { - int min_illegal = getMaxThreads(); - int annotation = getAnnotation(handle); - - if (!min_ilegal_is_computed_) + int annotation = get_annotation(handle); + computation_mutex_.acquire(); + if (!min_illegal_is_computed_) { - if (tree_->current_size() > 1) + if (tree_pimpl_->current_size() > 1) { - min_ilegal_ = calc_max(); + min_illegal_ = tree_pimpl_->calc_max(); } - min_ilegal_is_computed_ = true; + min_illegal_is_computed_ = true; } - - return annotation >= min_ilegal_; + computation_mutex_.release(); + return annotation >= min_illegal_; } template <typename AnnotationId> @@ -199,9 +200,14 @@ ACE_INLINE void Live_P_Strategy<AnnotationId>::grant(AnnotationId handle) { - int annotation = getAnnotation(handle); - tree_pimpl_->bind(annotation); - min_ilegal_is_computed_ = false; + int annotation = get_annotation(handle); + //since the state of the tree is involved in calculation + //of max, we must aquire the lock before changing the + //structure of the tree + computation_mutex_.acquire(); + tree_pimpl_->bind(annotation); + min_illegal_is_computed_ = false; + computation_mutex_.release(); } template <typename AnnotationId> @@ -209,7 +215,12 @@ ACE_INLINE void Live_P_Strategy<AnnotationId>::release(AnnotationId handle) { - min_ilegal_is_computed_ = false; - int annotation = getAnnotation(handle); + //since the state of the tree is involved in calculation + //of max, we must aquire the lock before changing the + //structure of the tree + computation_mutex_.acquire(); + min_illegal_is_computed_ = false; + int annotation = get_annotation(handle); tree_pimpl_->unbind(annotation); + computation_mutex_.release(); }
\ No newline at end of file diff --git a/ACE/ace/k_Efficient_P_Strategy.h b/ACE/ace/k_Efficient_P_Strategy.h index bce6af929f0..1f4b71a49c1 100644 --- a/ACE/ace/k_Efficient_P_Strategy.h +++ b/ACE/ace/k_Efficient_P_Strategy.h @@ -18,21 +18,21 @@ #include /**/ "ace/pre.h" #include "ace/DA_Strategy_Base.h" - +#include "ace/Mutex.h" #include <vector> #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -template <typename AnnotationId, int k> +template <typename AnnotationId> class k_Efficient_P_Strategy : public DA_Strategy_Base<AnnotationId> { //The annotations consist of an identifier and a resource cost value public: //note: k must be less than maxThreads - k_Efficient_P_Strategy(int maxThreads); + k_Efficient_P_Strategy(int maxThreads, int k); virtual ~k_Efficient_P_Strategy(); virtual bool is_deadlock_potential(AnnotationId handle); virtual void grant(AnnotationId handle); @@ -41,6 +41,8 @@ private: int compute_min_illegal(); int get_min_illegal(); int min_illegal_; + ACE_Mutex computation_mutex_; + int k_; bool min_illegal_is_computed_ ; std::vector<int> a; std::vector<int> A; diff --git a/ACE/ace/k_Efficient_P_Strategy.inl b/ACE/ace/k_Efficient_P_Strategy.inl index dcb575a244f..5d67a95cc58 100644 --- a/ACE/ace/k_Efficient_P_Strategy.inl +++ b/ACE/ace/k_Efficient_P_Strategy.inl @@ -1,70 +1,73 @@ -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE -k_Efficient_P_Strategy<AnnotationId, k>::k_Efficient_P_Strategy(int maxThreads) +k_Efficient_P_Strategy<AnnotationId>::k_Efficient_P_Strategy(int maxThreads, int k) :DA_Strategy_Base(maxThreads), + k_(k) { - a.resize(k + 1); - A.resize(k + 1); - for (unsigned i=0; i<k; ++i) { + a.resize(k_ + 1); + A.resize(k_ + 1); + for (int i=0; i<k_; ++i) { a[i] = 0; A[i] = 0; } - min_ilegal_ = maxThreads; - min_ilegal_is_computed_ = true; + min_illegal_ = maxThreads; + min_illegal_is_computed_ = true; } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE -k_Efficient_P_Strategy<AnnotationId, k>::~k_Efficient_P_Strategy() +k_Efficient_P_Strategy<AnnotationId>::~k_Efficient_P_Strategy() { } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE -bool k_Efficient_P_Strategy<AnnotationId, k>::is_deadlock_potential(AnnotationId handle) +bool k_Efficient_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle) { int annotation = get_annotation(handle); - return (annotation >= min_ilegal()); + return (annotation >= get_min_illegal()); } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE int -k_Efficient_P_Strategy<AnnotationId, k>::compute_min_illegal() +k_Efficient_P_Strategy<AnnotationId>::compute_min_illegal() { int T = get_max_threads(); - for (int i=0; i<k; ++i) { + for (int i=0; i<k_; ++i) { if (!(A[i] < (T - i))) { return i; } } - if (A[k]>0) { - return (T - A[k]); + if (A[k_]>0) { + return (T - A[k_]); } return T; } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE int -k_Efficient_P_Strategy<AnnotationId, k>::get_min_illegal() +k_Efficient_P_Strategy<AnnotationId>::get_min_illegal() { + computation_mutex_.acquire(); if (!min_illegal_is_computed_) { - min_illegal_ = compute_min_ilegal(); + min_illegal_ = compute_min_illegal(); min_illegal_is_computed_ = true; } + computation_mutex_.release(); return min_illegal_; } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId, k>::grant(AnnotationId handle) +void k_Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle) { int annotation = get_annotation(handle); - - if (annotation < k) + computation_mutex_.acquire(); + if (annotation < k_) { a[annotation] ++; for (int i=0; i<=annotation; ++i) @@ -74,42 +77,44 @@ void k_Efficient_P_Strategy<AnnotationId, k>::grant(AnnotationId handle) } else { - a[k] ++; - for (int i=0; i<=k ; ++i) + a[k_] ++; + for (int i=0; i<=k_ ; ++i) { A[i]++; } } - min_ilegal_is_computed_ = false; - return true; + min_illegal_is_computed_ = false; + computation_mutex_.release(); } -template <typename AnnotationId, int k> +template <typename AnnotationId> ACE_INLINE -void k_Efficient_P_Strategy<AnnotationId, k>::release(AnnotationId handle) +void k_Efficient_P_Strategy<AnnotationId>::release(AnnotationId handle) { int annotation = get_annotation(handle); + computation_mutex_.acquire(); /* if (annotation < k ) { assert(a[annotation]>0); } else { assert(a[k] >0); } */ - if (annotation < k) + if (annotation < k_) { a[annotation] --; - for (unsigned i=0; i<=annotation; ++i) + for (int i=0; i<=annotation; ++i) { A[i]--; } } else { - a[k] --; - for (unsigned i=0; i<=k ; ++i) + a[k_] --; + for (int i=0; i<=k_ ; ++i) { A[i]--; } } - min_ilegal_is_computed_ = false; + min_illegal_is_computed_ = false; + computation_mutex_.release(); }
\ No newline at end of file diff --git a/ACE/tests/DA_Reactor_Test.cpp b/ACE/tests/DA_Reactor_Test.cpp new file mode 100644 index 00000000000..0b9c64ab426 --- /dev/null +++ b/ACE/tests/DA_Reactor_Test.cpp @@ -0,0 +1,1297 @@ +//============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// DAReactor_test.cpp +// +// = DESCRIPTION +// This program illustrates how the <ACE_TP_Reactor> can be used to +// implement an application that does various operations. +// usage: DA_Reactor_Test +// -n number threads in the TP_Reactor thread pool +// -d duplex mode 1 (full-duplex) vs. 0 (half-duplex) +// -p port to listen(Server)/connect(Client) +// -h host to connect (Client mode) +// -s number of sender's instances ( Client mode) +// -b run client and server (both modes ) at the same time +// -v log level +// 0 - log all messages +// 1 - log only errors and unusual cases +// -i time to run in seconds +// -u show this message +// -t specify the deadlock avoidance strategy +// basic - Basic P strategy +// efficient <k> - k-efficient P strategy (default is 1) +// live - Live-P strategy +// +// The main difference between TP_Reactor_Test.cpp and +// this test is the addition of deadlock avoidance strategies. In +// this way, it is a test for Deadlock_Free_TP_Reactor as well as +// the various visit strategies +// +// +// = AUTHOR +// Paul Oberlin <pauloberlin@gmail.com> +// +//============================================================================ + +#include "test_config.h" + +#if defined(ACE_HAS_THREADS) + +#include "TP_Reactor_Test.h" + +#include "ace/Signal.h" +#include "ace/Service_Config.h" +#include "ace/Get_Opt.h" + +#include "ace/Reactor.h" +#include "ace/Deadlock_Free_TP_Reactor.h" +#include "ace/OS_NS_signal.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_strings.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Synch_Traits.h" +#include "ace/Thread_Semaphore.h" + +#include "ace/Basic_P_Strategy.h" +#include "ace/Efficient_P_Strategy.h" +#include "ace/Live_P_Strategy.h" +#include "ace/k_Efficient_P_Strategy.h" + +ACE_RCSID(TPReactor, TPReactor_Test, "TPReactor_Test.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp") + +// Some debug helper functions +static int disable_signal (int sigmin, int sigmax); + + +// both: 0 run client or server / depends on host +// != 0 run client and server +static int both = 0; + +// Host that we're connecting to. +static const ACE_TCHAR *host = 0; + +// number of Senders instances +static int senders = 1; + +// duplex mode: == 0 half-duplex +// != 0 full duplex +static int duplex = 0; + +// number threads in the TP_Reactor thread pool +static int threads = 1; + +// Port that we're receiving connections on. +static u_short port = ACE_DEFAULT_SERVER_PORT; + +// Log options +static int loglevel = 1; // 0 full , 1 only errors + +static const size_t MIN_TIME = 1; // min 1 sec +static const size_t MAX_TIME = 3600; // max 1 hour +static u_int seconds = 2; // default time to run - 2 seconds + +enum StrategyEnum {BASIC, EFFICIENT, LIVE}; +static StrategyEnum strategyEnum = BASIC; +DA_Strategy_Base<ACE_HANDLE>* strategy = 0; +static int k = 0; + +static char data[] = + "GET / HTTP/1.1\r\n" + "Accept: */*\r\n" + "Accept-Language: C++\r\n" + "Accept-Encoding: gzip, deflate\r\n" + "User-Agent: DAReactor_Test/1.0 (non-compatible)\r\n" + "Connection: Keep-Alive\r\n" + "\r\n" ; + +// ************************************************************* + +class LogLocker +{ +public: + + LogLocker () { ACE_LOG_MSG->acquire (); } + virtual ~LogLocker () { ACE_LOG_MSG->release (); } +}; +// ************************************************************* + +/** + * @class MyTask + * + * MyTask plays role for DA_Reactor threads pool + * + * MyTask is ACE_Task resposible for: + * 1. Creation and deletion of DA_Reactor and DA_Reactor thread pool + * 2. Running DA_Reactor event loop + */ +class MyTask : public ACE_Task<ACE_MT_SYNCH> +{ +public: + MyTask (void): sem_ ((unsigned int) 0), + my_reactor_ (0) {} + + virtual ~MyTask () { stop (); } + + virtual int svc (void); + + int start (int num_threads); + int stop (void); + +private: + int create_reactor (int num_threads); + int delete_reactor (void); + + ACE_SYNCH_RECURSIVE_MUTEX lock_; + ACE_Thread_Semaphore sem_; + ACE_Reactor *my_reactor_; +}; + +int +MyTask::create_reactor (int num_threads) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_ASSERT (this->my_reactor_ == 0); + + ACE_TP_Reactor * pImpl = 0; + + switch (strategyEnum) { + case BASIC: + ACE_NEW_RETURN(strategy, Basic_P_Strategy<ACE_HANDLE>(threads), -1); + break; + case LIVE: + ACE_NEW_RETURN(strategy, Live_P_Strategy<ACE_HANDLE>(threads), -1); + break; + case EFFICIENT: + if ( k == 1) { + ACE_NEW_RETURN(strategy, Efficient_P_Strategy<ACE_HANDLE>(threads), -1); + } else { + ACE_NEW_RETURN(strategy, k_Efficient_P_Strategy<ACE_HANDLE>(threads, k), -1); + } + break; + } + + //Note, this line seems to construct the reactor using the default thread pool size + //which is ACE::max_handles(). So, I don't see how the parameter to the program is + //setting the number of threads in the reactor. The way I see it, this parameter + //is used for starting the reactor event loop in a number of threads equal to this parameter. + ACE_NEW_RETURN (pImpl, + ACE_Deadlock_Free_TP_Reactor(num_threads, strategy), + -1); + + ACE_NEW_RETURN (my_reactor_, + ACE_Reactor (pImpl ,1), + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT (" (%t) Create TP_Reactor\n"))); + + ACE_Reactor::instance (this->my_reactor_); + + this->reactor (my_reactor_); + + return 0; +} + +int +MyTask::delete_reactor (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, + monitor, + this->lock_, + -1); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT (" (%t) Delete TP_Reactor\n"))); + + delete this->my_reactor_; + ACE_Reactor::instance ((ACE_Reactor *) 0); + this->my_reactor_ = 0; + this->reactor (0); + + return 0; +} + +int +MyTask::start (int num_threads) +{ + if (this->create_reactor (num_threads) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to create reactor")), + -1); + + if (this->activate (THR_NEW_LWP, num_threads) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to activate thread pool")), + -1); + + for (; num_threads > 0 ; num_threads--) + sem_.acquire (); + + return 0; +} + +int +MyTask::stop (void) +{ + if (this->my_reactor_ != 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("End TP_Reactor event loop\n"))); + + ACE_Reactor::instance()->end_reactor_event_loop (); + } + + if (this->wait () == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to stop thread pool"))); + + if (this->delete_reactor () == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p.\n"), + ACE_TEXT ("unable to delete reactor"))); + + return 0; +} + +int +MyTask::svc (void) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask started\n"))); + + // signal that we are ready + sem_.release (1); + + while (ACE_Reactor::instance()->reactor_event_loop_done () == 0) + ACE_Reactor::instance()->run_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n"))); + return 0; +} + +// ************************************************************* + +Acceptor::Acceptor (void) + : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0), + sessions_ (0), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + for (size_t i = 0; i < MAX_RECEIVERS; ++i) + this->list_receivers_[i] =0; +} + +Acceptor::~Acceptor (void) +{ + this->reactor (0); + stop (); +} + +void +Acceptor::stop (void) +{ + // this method can be called only after reactor event loop id done + // in all threads + + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + for (size_t i = 0; i < MAX_RECEIVERS; ++i) + { + delete this->list_receivers_[i]; + this->list_receivers_[i] =0; + } +} + +void +Acceptor::on_new_receiver (Receiver &rcvr) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + this->sessions_++; + this->list_receivers_[rcvr.index_] = & rcvr; + ACE_DEBUG ((LM_DEBUG, + "Receiver::CTOR sessions_=%d\n", + this->sessions_)); +} + +void +Acceptor::on_delete_receiver (Receiver &rcvr) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + this->sessions_--; + + this->total_snd_ += rcvr.get_total_snd (); + this->total_rcv_ += rcvr.get_total_rcv (); + this->total_w_ += rcvr.get_total_w (); + this->total_r_ += rcvr.get_total_r (); + + if (rcvr.index_ < MAX_RECEIVERS + && this->list_receivers_[rcvr.index_] == &rcvr) + this->list_receivers_[rcvr.index_] = 0; + + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_snd (), + rcvr.get_total_w () ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + rcvr.get_total_rcv (), + rcvr.get_total_r ()); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + rcvr.index_, + bufs, + bufr, + this->sessions_)); +} + +int +Acceptor::start (const ACE_INET_Addr &addr) +{ + if (ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> + ::open (addr, + ACE_Reactor::instance (), + ACE_NONBLOCK) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "Acceptor::start () - open failed"), + 0); + return 1; +} + +int +Acceptor::make_svc_handler (Receiver *&sh) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + if (sessions_ >= MAX_RECEIVERS) + return -1; + + for (size_t i = 0; i < MAX_RECEIVERS; ++i) + if (this->list_receivers_ [i] == 0) + { + ACE_NEW_RETURN (sh, + Receiver (this , i), + -1); + return 0; + } + return -1; +} + +// ************************************************************* + +Receiver::Receiver (Acceptor * acceptor, size_t index) + : acceptor_ (acceptor), + index_ (index), + flg_mask_ (ACE_Event_Handler::NULL_MASK), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + if (acceptor_ != 0) + acceptor_->on_new_receiver (*this); +} + + +Receiver::~Receiver (void) +{ + this->reactor (0); + if (acceptor_ != 0) + acceptor_->on_delete_receiver (*this); + + this->index_ = 0; + + for (; ;) + { + ACE_Time_Value tv = ACE_Time_Value::zero; + ACE_Message_Block *mb = 0; + + if (this->getq (mb, &tv) < 0) + break; + + ACE_Message_Block::release (mb); + } +} + +int +Receiver::check_destroy (void) +{ + if (flg_mask_ == ACE_Event_Handler::NULL_MASK) + return -1; + + return 0; +} + +int +Receiver::open (void *) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Reactor *TPReactor = ACE_Reactor::instance (); + + this->reactor (TPReactor); + + flg_mask_ = ACE_Event_Handler::NULL_MASK ; + + if (TPReactor->register_handler (this, flg_mask_) == -1) + return -1; + + initiate_io (ACE_Event_Handler::READ_MASK); + + return check_destroy (); +} + +int +Receiver::initiate_io (ACE_Reactor_Mask mask) +{ + if (ACE_BIT_ENABLED (flg_mask_, mask)) + return 0; + + if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1) + return -1; + + ACE_SET_BITS (flg_mask_, mask); + return 0; +} + +int +Receiver::terminate_io (ACE_Reactor_Mask mask) +{ + if (ACE_BIT_DISABLED (flg_mask_, mask)) + return 0; + + if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1) + return -1; + + ACE_CLR_BITS (flg_mask_, mask); + return 0; +} + +int +Receiver::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_Reactor *TPReactor = ACE_Reactor::instance (); + + TPReactor->remove_handler (this, + ACE_Event_Handler::ALL_EVENTS_MASK | + ACE_Event_Handler::DONT_CALL); // Don't call handle_close + this->reactor (0); + this->destroy (); + return 0; +} + +int +Receiver::handle_input (ACE_HANDLE h) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ), + -1); + + int err = 0; + ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1); + + this->total_r_++; + + if (res >= 0) + { + mb->wr_ptr (res); + this->total_rcv_ += res; + } + else + err = errno ; + + mb->wr_ptr ()[0] = '\0'; + + if (loglevel == 0 || res <= 0 || err!= 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err)); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n")); + } + + if (err == EWOULDBLOCK) + { + err=0; + res=0; + return check_destroy (); + } + + if (err !=0 || res <= 0) + { + ACE_Message_Block::release (mb); + return -1; + } + + ACE_Time_Value tv = ACE_Time_Value::zero; + + int qcount = this->putq (mb, & tv); + + if (qcount <= 0) // failed to putq + { + ACE_Message_Block::release (mb); + return -1 ; + } + + int rc = 0; + + if (duplex == 0) // half-duplex , stop read + rc = this->terminate_io (ACE_Event_Handler::READ_MASK); + else // full duplex + { + if (qcount >= 20 ) // flow control, stop read + rc = this->terminate_io (ACE_Event_Handler::READ_MASK); + else + rc = this->initiate_io (ACE_Event_Handler::READ_MASK); + } + + if (rc == -1) + return -1; + + //initiate write + if (this->initiate_io (ACE_Event_Handler::WRITE_MASK) != 0) + return -1; + + return check_destroy (); +} + +int +Receiver::handle_output (ACE_HANDLE h) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Time_Value tv = ACE_Time_Value::zero; + ACE_Message_Block *mb = 0; + + int err = 0; + ssize_t res = 0; + size_t bytes = 0; + + int qcount = this->getq (mb, &tv); + + if (mb != 0) // qcount >= 0) + { + bytes = mb->length (); + res = this->peer ().send (mb->rd_ptr (), bytes); + + this->total_w_++; + + if (res < 0) + err = errno ; + else + this->total_snd_ += res; + + + if (loglevel == 0 || res <= 0 || err!= 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err)); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n")); + } + } + + ACE_Message_Block::release (mb); + + if (err != 0 || res < 0) + return -1; + + if (qcount <= 0) // no more message blocks in queue + { + if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0) + return -1; + + if (this->initiate_io (ACE_Event_Handler::READ_MASK) != 0) + return -1; + } + + return check_destroy (); +} + +// ************************************************************* + +Connector::Connector (void) + : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0), + sessions_ (0), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + for (size_t i = 0; i < MAX_SENDERS; ++i) + this->list_senders_[i] = 0; +} + +Connector::~Connector (void) +{ + this->reactor (0); + stop (); +} + +void +Connector::stop () +{ + // this method can be called only + // after reactor event loop id done + // in all threads + + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + for (size_t i = 0; i < MAX_SENDERS; ++i) + { + delete this->list_senders_[i]; + this->list_senders_[i] =0; + } +} + +void +Connector::on_new_sender (Sender & sndr) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + this->sessions_++; + this->list_senders_[sndr.index_] = &sndr; + ACE_DEBUG ((LM_DEBUG, + "Sender::CTOR sessions_=%d\n", + this->sessions_)); +} + +void +Connector::on_delete_sender (Sender & sndr) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + this->sessions_--; + this->total_snd_ += sndr.get_total_snd(); + this->total_rcv_ += sndr.get_total_rcv(); + this->total_w_ += sndr.get_total_w(); + this->total_r_ += sndr.get_total_r(); + + if (sndr.index_ < MAX_SENDERS + && this->list_senders_[sndr.index_] == &sndr) + this->list_senders_[sndr.index_] = 0; + + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_snd(), + sndr.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + sndr.get_total_rcv(), + sndr.get_total_r() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + sndr.index_, + bufs, + bufr, + this->sessions_)); + +} + +int +Connector::start (const ACE_INET_Addr & addr, int num) +{ + + if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR> + ::open (ACE_Reactor::instance (), + ACE_NONBLOCK) < 0) + ACE_ERROR_RETURN + ((LM_ERROR, + "%p\n", + "Connector::start () - open failed"), + 0); + + int rc = 0; + + for (int i = 0 ; i < num ; i++) + { + Sender * sender = 0; + + if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR> + ::connect (sender, addr) < 0) + ACE_ERROR_RETURN + ((LM_ERROR, + "%p\n", + "Connector::start () - connect failed"), + rc); + } + + return rc; +} + +int +Connector::make_svc_handler (Sender * & sh) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_); + + if (sessions_ >= MAX_SENDERS) + return -1; + + for (size_t i = 0; i < MAX_SENDERS; ++i) + if (this->list_senders_ [i] == 0) + { + ACE_NEW_RETURN (sh, + Sender (this , i), + -1); + return 0; + } + + return -1; +} + +// ************************************************************* + +Sender::Sender (Connector* connector, size_t index) + : connector_ (connector), + index_ (index), + flg_mask_ (ACE_Event_Handler::NULL_MASK), + total_snd_(0), + total_rcv_(0), + total_w_ (0), + total_r_ (0) +{ + if (connector_ != 0) + connector_->on_new_sender (*this); + + ACE_OS::sprintf (send_buf_ ,data); +} + + +Sender::~Sender (void) +{ + this->reactor (0); + if (connector_ != 0) + connector_->on_delete_sender (*this); + + this->index_ = 0; + + for (; ;) + { + ACE_Time_Value tv = ACE_Time_Value::zero; + ACE_Message_Block *mb = 0; + + if (this->getq (mb, &tv) < 0) + break; + + ACE_Message_Block::release (mb); + } +} + +int +Sender::check_destroy (void) +{ + if (flg_mask_ == ACE_Event_Handler::NULL_MASK) + return -1; + + return 0; +} + +int Sender::open (void *) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Reactor * TPReactor = ACE_Reactor::instance (); + + this->reactor (TPReactor); + + flg_mask_ = ACE_Event_Handler::NULL_MASK ; + + if (TPReactor->register_handler (this,flg_mask_) == -1) + return -1; + + if (this->initiate_write () == -1) + return -1; + + if (duplex != 0) + initiate_io (ACE_Event_Handler::READ_MASK); + + return check_destroy (); +} + +int +Sender::initiate_write (void) +{ + if ( this->msg_queue ()->message_count () < 20) // flow control + { + size_t nbytes = ACE_OS::strlen (send_buf_); + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (nbytes+8), + -1); + + mb->init (send_buf_, nbytes); + mb->rd_ptr (mb->base ()); + mb->wr_ptr (mb->base ()); + mb->wr_ptr (nbytes); + + ACE_Time_Value tv = ACE_Time_Value::zero; + + int qcount =this->putq (mb, & tv); + + if (qcount <= 0) + { + ACE_Message_Block::release (mb); + return -1; + } + } + + return initiate_io (ACE_Event_Handler::WRITE_MASK); +} + +int +Sender::initiate_io (ACE_Reactor_Mask mask) +{ + if (ACE_BIT_ENABLED (flg_mask_, mask)) + return 0; + + if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1) + return -1; + + ACE_SET_BITS (flg_mask_, mask); + return 0; +} + +int +Sender::terminate_io (ACE_Reactor_Mask mask) +{ + if (ACE_BIT_DISABLED (flg_mask_, mask)) + return 0; + + if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1) + return -1; + + ACE_CLR_BITS (flg_mask_, mask); + return 0; +} + +int +Sender::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_Reactor * TPReactor = ACE_Reactor::instance (); + + TPReactor->remove_handler (this, + ACE_Event_Handler::ALL_EVENTS_MASK | + ACE_Event_Handler::DONT_CALL); // Don't call handle_close + this->reactor (0); + this->destroy (); + return 0; +} + +int +Sender::handle_input (ACE_HANDLE h) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, + ACE_Message_Block (BUFSIZ), + -1); + + int err = 0; + ssize_t res = this->peer ().recv (mb->rd_ptr (), + BUFSIZ-1); + this->total_r_++; + + if (res >= 0) + { + mb->wr_ptr (res); + this->total_rcv_ += res; + } + else + err = errno ; + + mb->wr_ptr ()[0] = '\0'; + + if (loglevel == 0 || res <= 0 || err!= 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err)); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n")); + } + + ACE_Message_Block::release (mb); + + if (err == EWOULDBLOCK) + { + err=0; + res=0; + return check_destroy (); + } + + if (err !=0 || res <= 0) + return -1; + + int rc = 0; + + if (duplex != 0) // full duplex, continue read + rc = initiate_io (ACE_Event_Handler::READ_MASK); + else + rc = terminate_io (ACE_Event_Handler::READ_MASK); + + if (rc != 0) + return -1 ; + + rc = initiate_write (); + if (rc != 0) + return -1; + + return check_destroy (); +} + +int +Sender::handle_output (ACE_HANDLE h) +{ + ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); + + ACE_Time_Value tv = ACE_Time_Value::zero; + ACE_Message_Block *mb = 0; + + int err=0; + ssize_t res=0; + size_t bytes=0; + + int qcount = this->getq (mb , & tv); + + if (mb != 0) // qcount >= 0 + { + bytes = mb->length (); + res = this->peer ().send (mb->rd_ptr (), bytes); + + this->total_w_++; + + if (res < 0) + err = errno ; + else + this->total_snd_ += res; + + if (loglevel == 0 || res <= 0 || err!= 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res)); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err)); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n")); + } + } + + ACE_Message_Block::release (mb); + + if (err != 0 || res < 0) + return -1; + + int rc = 0; + + if (qcount <= 0) // no more message blocks in queue + { + if (duplex != 0 && // full duplex, continue write + (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control + rc = initiate_write (); + else + rc = terminate_io (ACE_Event_Handler::WRITE_MASK); + + if (rc == -1) + return -1; + } + + rc = initiate_io (ACE_Event_Handler::READ_MASK); + if (rc == -1) + return -1; + + return check_destroy (); +} + + +// ************************************************************* +// Configuration helpers +// ************************************************************* +int +print_usage (int /* argc */, ACE_TCHAR *argv[]) +{ + ACE_ERROR + ((LM_ERROR, + ACE_TEXT ("\nusage: %s") + ACE_TEXT ("\n-n <number threads in the thread pool>") + ACE_TEXT ("\n-d <duplex mode 1-on/0-off>") + ACE_TEXT ("\n-p <port to listen/connect>") + ACE_TEXT ("\n-h <host> for Sender mode") + ACE_TEXT ("\n-s <number of sender's instances>") + ACE_TEXT ("\n-b run client and server at the same time") + ACE_TEXT ("\n-v log level") + ACE_TEXT ("\n 0 - log all messages") + ACE_TEXT ("\n 1 - log only errors and unusual cases") + ACE_TEXT ("\n-i time to run in seconds") + ACE_TEXT ("\n-u show this message") + ACE_TEXT ("\n"), + argv[0] + )); + return -1; +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + if (argc == 1) // no arguments , so one button test + { + both = 1; // client and server simultaneosly + duplex = 1; // full duplex is on + host = ACE_TEXT ("localhost"); // server to connect + port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen + threads = 3; // size of Proactor thread pool + senders = 20; // number of senders + loglevel = 1; // log level : 0 full/ 1 only errors + seconds = 20; // time to run in seconds +#if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN + if(SOMAXCONN < senders) + senders = SOMAXCONN; +#endif + return 0; + } + + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:n:p:d:h:s:v:t:ub")); + int c; + + while ((c = get_opt ()) != EOF) + { + ACE_TCHAR *stratarg = 0; + ACE_TCHAR *strategyStr = 0; + switch (c) + { + case 'i': // time to run + seconds = ACE_OS::atoi (get_opt.opt_arg()); + if (seconds < MIN_TIME) + seconds = MIN_TIME; + if (seconds > MAX_TIME) + seconds = MAX_TIME; + break; + case 'b': // both client and server + both = 1; + break; + case 'v': // log level + loglevel = ACE_OS::atoi (get_opt.opt_arg()); + break; + case 'd': // duplex + duplex = ACE_OS::atoi (get_opt.opt_arg()); + break; + case 'h': // host for sender + host = get_opt.opt_arg(); + break; + case 'p': // port number + port = ACE_OS::atoi (get_opt.opt_arg()); + break; + case 'n': // thread pool size + threads = ACE_OS::atoi (get_opt.opt_arg()); + break; + case 's': // number of senders + senders = ACE_OS::atoi (get_opt.opt_arg()); + if (size_t (senders) > MAX_SENDERS) + senders = MAX_SENDERS; + break; + case 't': //avoidance strategy + stratarg = get_opt.opt_arg(); + + strategyStr = ACE_OS::strtok(stratarg, " "); + if (ACE_OS::strcmp(strategyStr, "basic")) + { + strategyEnum = BASIC; + } + else if (ACE_OS::strcasecmp(strategyStr, "live")) + { + strategyEnum = LIVE; + } + else if (ACE_OS::strcmp(strategyStr, "efficient")) + { + strategyEnum = EFFICIENT; + k = ACE_OS::atoi (ACE_OS::strtok(NULL, " ")); + } + else + { + return print_usage(argc,argv); + } + break; + case 'u': + default: + return print_usage (argc,argv); + } // switch + } // while + + return 0; +} + +static int +disable_signal (int sigmin, int sigmax) +{ +#if defined (ACE_HAS_PTHREADS_STD) && !defined (ACE_LACKS_PTHREAD_SIGMASK) + sigset_t signal_set; + if (sigemptyset (&signal_set) == - 1) + ACE_ERROR ((LM_ERROR, + "Error: (%P | %t):%p\n", + "sigemptyset failed")); + + for (int i = sigmin; i <= sigmax; i++) + sigaddset (&signal_set, i); + + // Put the <signal_set>. + if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) + ACE_ERROR ((LM_ERROR, + "Error: (%P | %t):%p\n", + "pthread_sigmask failed")); +#else + ACE_UNUSED_ARG(sigmin); + ACE_UNUSED_ARG(sigmax); +#endif /* ACE_HAS_PTHREADS_STD && !ACE_LACKS_PTHREAD_SIGMASK */ + + return 1; +} + +#endif /* ACE_HAS_THREADS */ + + +void +run_task(DA_Strategy_Base<ACE_HANDLE>* strategy) { + +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test")); + +#if defined(ACE_HAS_THREADS) + if (::parse_args (argc, argv) == -1) + return -1; + + ::disable_signal (SIGPIPE, SIGPIPE); + + MyTask task1; + Acceptor acceptor; + Connector connector; + + if (task1.start (threads) == 0) + { + int rc = 0; + + ACE_INET_Addr addr (port); + if (both != 0 || host == 0) { // Acceptor + rc += acceptor.start (addr); + } + if (both != 0 || host != 0) + { + if (host == 0) + { + host = ACE_LOCALHOST; + } + if (addr.set (port, host, 1, addr.get_type ()) == -1) { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host)); + } + rc += connector.start (addr, senders); + + } + + if (rc > 0) { + ACE_OS::sleep (seconds); + } + } + + task1.stop (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("\nNumber of Receivers objects = %d\n") + ACE_TEXT ("\nNumber of Sender objects = %d\n"), + acceptor.get_number_sessions (), + connector.get_number_sessions ())); + + // As Reactor event loop now is inactive it is safe to destroy all + // senders + + connector.stop (); + acceptor.stop (); + + //Print statistic + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + connector.get_total_snd(), + connector.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + connector.get_total_rcv(), + connector.get_total_r() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_snd(), + acceptor.get_total_w() ); + + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"), + acceptor.get_total_rcv(), + acceptor.get_total_r() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + +#else /* ACE_HAS_THREADS */ + ACE_UNUSED_ARG( argc ); + ACE_UNUSED_ARG( argv ); +#endif /* ACE_HAS_THREADS */ + + + ACE_END_TEST; + + return 0; +} diff --git a/ACE/tests/DA_Reactor_Test.h b/ACE/tests/DA_Reactor_Test.h new file mode 100644 index 00000000000..6ca71ba811c --- /dev/null +++ b/ACE/tests/DA_Reactor_Test.h @@ -0,0 +1,200 @@ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// DA_Reactor_Test.h +// +// = DESCRIPTION +// Define class needed for generating templates. IBM C++ requires this to +// be in its own file for auto template instantiation. +// +// = AUTHOR +// Paul Oberlin (pauloberlin@gmail.com) +// +// ============================================================================ + +#ifndef ACE_TESTS_DA_REACTOR_TEST_H +#define ACE_TESTS_DA_REACTOR_TEST_H + +#include "ace/INET_Addr.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Stream.h" +#include "ace/Acceptor.h" +#include "ace/Connector.h" +#include "ace/Svc_Handler.h" +#include "ace/Recursive_Thread_Mutex.h" + + +const size_t MAX_SENDERS = 1000; +const size_t MAX_RECEIVERS = 1000; + + +// ************************************************************* +// Receiver and Acceptor +// ************************************************************* +// forward declaration +class Acceptor; + +class Receiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> +{ + friend class Acceptor; +public: + + Receiver (Acceptor * acceptor=0, size_t index=MAX_RECEIVERS+1); + + ~Receiver (void); + + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + // virtual from ACE_Svc_Handler<> + virtual int open (void * pVoid); + + // virtual from ACE_Event_Handler + virtual int handle_input (ACE_HANDLE h); + virtual int handle_output (ACE_HANDLE h); + virtual int handle_close (ACE_HANDLE h , ACE_Reactor_Mask mask); + +private: + int terminate_io (ACE_Reactor_Mask mask); + int initiate_io (ACE_Reactor_Mask mask); + int check_destroy (void); + + Acceptor * acceptor_; + size_t index_; + int flg_mask_; + + ACE_Recursive_Thread_Mutex mutex_; + long total_snd_; + long total_rcv_; + long total_w_; + long total_r_; +}; + +// ************************************************************* + +class Acceptor : public ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> +{ + friend class Receiver; +public: + size_t get_number_sessions (void) { return sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + Acceptor (void); + virtual ~Acceptor (void); + + void stop (void); + int start (const ACE_INET_Addr & addr); + + // virtual from ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> + virtual int make_svc_handler (Receiver * & sh); + +private: + + ACE_Recursive_Thread_Mutex mutex_; + size_t sessions_; + Receiver *list_receivers_[MAX_RECEIVERS]; + long total_snd_; + long total_rcv_; + long total_w_; + long total_r_; + + void on_new_receiver (Receiver & rcvr); + void on_delete_receiver (Receiver & rcvr); +}; + + +// ******************************************* +// Sender +// ******************************************* + +class Connector; + +class Sender : public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH> +{ + friend class Connector; + +public: + Sender (Connector * connector=0, size_t index=MAX_SENDERS+1); + + ~Sender (void); + + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + // virtual from ACE_Svc_Handler<> + virtual int open (void * pVoid); + + // virtual from ACE_Event_Handler + virtual int handle_input (ACE_HANDLE h); + virtual int handle_output (ACE_HANDLE h); + virtual int handle_close (ACE_HANDLE h , ACE_Reactor_Mask mask); + +private: + int terminate_io (ACE_Reactor_Mask mask); + int initiate_io (ACE_Reactor_Mask mask); + int initiate_write (); + int check_destroy (void); + + Connector * connector_; + size_t index_; + int flg_mask_; + + ACE_Recursive_Thread_Mutex mutex_; + + char send_buf_ [1024]; + long total_snd_; + long total_rcv_; + long total_w_; + long total_r_; +}; + +// ************************************************************* + +class Connector: public ACE_Connector<Sender,ACE_SOCK_CONNECTOR> +{ + friend class Sender; +public: + size_t get_number_sessions (void) { return sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + long get_total_w (void) { return this->total_w_; } + long get_total_r (void) { return this->total_r_; } + + + Connector (); + virtual ~Connector (); + + void stop (); + int start (const ACE_INET_Addr & addr , int num); + + // virtual from ACE_Connector<> + virtual int make_svc_handler (Sender * & sh); + +private: + + ACE_Recursive_Thread_Mutex mutex_; + size_t sessions_; + Sender * list_senders_ [MAX_SENDERS]; + long total_snd_; + long total_rcv_; + long total_w_; + long total_r_; + + void on_new_sender (Sender & sndr); + void on_delete_sender (Sender & sndr); +}; + + +#endif /* ACE_TESTS_TP_REACTOR_TEST_H */ |