summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-07-11 02:07:24 +0000
committerpoberlin <poberlin@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2007-07-11 02:07:24 +0000
commite03e1cc599eef72512104de984e2002b677e5f56 (patch)
treefc3912380542da5775e2e2bdba12a514065dacc5
parent22b0a7b0e1028b70da2c93ef5015c1372bde6615 (diff)
downloadATCD-e03e1cc599eef72512104de984e2002b677e5f56.tar.gz
running version of DA_Reactor
-rw-r--r--ACE/ace/Basic_P_Strategy.inl2
-rw-r--r--ACE/ace/DA_Strategy_Base.h12
-rw-r--r--ACE/ace/DA_Strategy_Base.inl7
-rw-r--r--ACE/ace/Efficient_P_Strategy.inl3
-rw-r--r--ACE/ace/Live_P_Strategy.h8
-rw-r--r--ACE/ace/Live_P_Strategy.inl43
-rw-r--r--ACE/ace/k_Efficient_P_Strategy.h8
-rw-r--r--ACE/ace/k_Efficient_P_Strategy.inl75
-rw-r--r--ACE/tests/DA_Reactor_Test.cpp1297
-rw-r--r--ACE/tests/DA_Reactor_Test.h200
10 files changed, 1589 insertions, 66 deletions
diff --git a/ACE/ace/Basic_P_Strategy.inl b/ACE/ace/Basic_P_Strategy.inl
index 656dcbd8a29..b6a8dba546c 100644
--- a/ACE/ace/Basic_P_Strategy.inl
+++ b/ACE/ace/Basic_P_Strategy.inl
@@ -2,7 +2,7 @@
template <typename AnnotationId>
ACE_INLINE
Basic_P_Strategy<AnnotationId>::Basic_P_Strategy(int maxThreads)
-:DA_Strategy_Base(maxThreads)
+:DA_Strategy_Base(maxThreads),
t_r(maxThreads)
{
}
diff --git a/ACE/ace/DA_Strategy_Base.h b/ACE/ace/DA_Strategy_Base.h
index 02ac40d9ef7..2b566d9b35c 100644
--- a/ACE/ace/DA_Strategy_Base.h
+++ b/ACE/ace/DA_Strategy_Base.h
@@ -21,7 +21,7 @@
#define DA_STRATEGY_BASE_H
#include /**/ "ace/pre.h"
-#include "ace/Hash_Map_Manager_T.h"
+#include "ace/Hash_Map_Manager.h"
#include "ace/Thread_Mutex.h"
#include "ace/Atomic_Op_T.h"
@@ -71,9 +71,9 @@ public:
virtual bool is_deadlock_potential(AnnotationId handle)=0;
virtual void grant(AnnotationId handle)=0;
virtual void release(AnnotationId upcall_handle)=0;
- int get_max_threads() { return num_avail_threads_};
- HASH_ANNOTATIONS_CONST_ITER get_annotations_iter();
- virtual int get_annotation (AnnotationId handle);
+ int get_max_threads() { return num_avail_threads_.value();}
+ HASH_ANNOTATIONS_CONST_ITER get_annotations_iter() const;
+ virtual int get_annotation (AnnotationId handle) const;
virtual int add_annotation (AnnotationId handle, int annotation);
virtual int remove_annotation (AnnotationId handle);
virtual int set_annotations_table (const HASH_ANNOTATIONS_REVERSE_ITER& table);
@@ -85,6 +85,10 @@ private:
};
+#if defined (__ACE_INLINE__)
+#include "ace/DA_Strategy_Base.inl"
+#endif /* __ACE_INLINE__ */
+
#include /**/ "ace/post.h"
#endif /* DA_STRATEGY_BASE_H */ \ No newline at end of file
diff --git a/ACE/ace/DA_Strategy_Base.inl b/ACE/ace/DA_Strategy_Base.inl
index e726ef2e011..ad62a9f585a 100644
--- a/ACE/ace/DA_Strategy_Base.inl
+++ b/ACE/ace/DA_Strategy_Base.inl
@@ -13,7 +13,7 @@ DA_Strategy_Base<AnnotationId>::~DA_Strategy_Base()
template <typename AnnotationId>
ACE_INLINE int
-DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id)
+DA_Strategy_Base<AnnotationId>::get_annotation (AnnotationId id) const
{
int annotation;
if (annotations_repo_.find (id, annotation) == -1)
@@ -26,7 +26,7 @@ ACE_INLINE int
DA_Strategy_Base<AnnotationId>::set_annotations_table (
const HASH_ANNOTATIONS_REVERSE_ITER& table)
{
- HASH_ANNOTATIONS_CONST_ITER iter(table);
+ HASH_ANNOTATIONS_REVERSE_ITER iter(table);
int rc=0;
for (;!(iter.done()); iter++)
@@ -61,8 +61,9 @@ ACE_INLINE ACE_Hash_Map_Const_Iterator_Ex<AnnotationId,
ACE_Hash<AnnotationId>,
ACE_Equal_To<AnnotationId>,
ACE_Thread_Mutex>
-DA_Strategy_Base<AnnotationId>::get_annotations_iter()
+DA_Strategy_Base<AnnotationId>::get_annotations_iter() const
{
+
return annotations_repo_.begin();
}
diff --git a/ACE/ace/Efficient_P_Strategy.inl b/ACE/ace/Efficient_P_Strategy.inl
index e42a2330f8e..fdd6b76468d 100644
--- a/ACE/ace/Efficient_P_Strategy.inl
+++ b/ACE/ace/Efficient_P_Strategy.inl
@@ -42,7 +42,6 @@ Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle)
p_r--;
}
}
- return true;
}
template <typename AnnotationId>
@@ -50,7 +49,7 @@ ACE_INLINE
void
Efficient_P_Strategy<AnnotationId>::release(AnnotationId upcall_handle)
{
- int annotation = get_annotation(handle);
+ int annotation = get_annotation(upcall_handle);
{
ACE_Guard<ACE_Thread_Mutex> guard(_lock);
t_r ++;
diff --git a/ACE/ace/Live_P_Strategy.h b/ACE/ace/Live_P_Strategy.h
index 420307d2348..9a5e1782d6f 100644
--- a/ACE/ace/Live_P_Strategy.h
+++ b/ACE/ace/Live_P_Strategy.h
@@ -19,13 +19,14 @@
#include "ace/DA_Strategy_Base.h"
#include "ace/RB_Tree.h"
+#include "ace/Mutex.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
//forward decl
-class LivePTree;
+class Live_P_Tree;
template <typename AnnotationId>
class Live_P_Strategy : public DA_Strategy_Base<AnnotationId> {
@@ -39,7 +40,10 @@ public:
virtual void grant(AnnotationId handle);
virtual void release(AnnotationId upcall_handle);
private:
- LivePTree* tree_pimpl_;
+ Live_P_Tree* tree_pimpl_;
+ bool min_illegal_is_computed_;
+ int min_illegal_;
+ ACE_Mutex computation_mutex_;
};
#if defined (__ACE_INLINE__)
diff --git a/ACE/ace/Live_P_Strategy.inl b/ACE/ace/Live_P_Strategy.inl
index 697c26c45b7..641a50e70f6 100644
--- a/ACE/ace/Live_P_Strategy.inl
+++ b/ACE/ace/Live_P_Strategy.inl
@@ -42,7 +42,7 @@ private:
};
ACE_INLINE
-Live_P_Tree::Live_P_Tree(int maxThreads, int k)
+Live_P_Tree::Live_P_Tree(int maxThreads)
:ACE_RB_Tree(),
T_(maxThreads) {
@@ -162,7 +162,9 @@ Live_P_Tree::calc_max_i(ACE_RB_Tree_Node<int, AnnotationNode>* nodePtr, int extr
template <typename AnnotationId>
ACE_INLINE
Live_P_Strategy<AnnotationId>::Live_P_Strategy(int maxThreads)
-:DA_Strategy_Base(maxThreads)
+:DA_Strategy_Base(maxThreads),
+ min_illegal_is_computed_(false),
+ min_illegal_(0)
{
}
@@ -179,19 +181,18 @@ ACE_INLINE
bool
Live_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle)
{
- int min_illegal = getMaxThreads();
- int annotation = getAnnotation(handle);
-
- if (!min_ilegal_is_computed_)
+ int annotation = get_annotation(handle);
+ computation_mutex_.acquire();
+ if (!min_illegal_is_computed_)
{
- if (tree_->current_size() > 1)
+ if (tree_pimpl_->current_size() > 1)
{
- min_ilegal_ = calc_max();
+ min_illegal_ = tree_pimpl_->calc_max();
}
- min_ilegal_is_computed_ = true;
+ min_illegal_is_computed_ = true;
}
-
- return annotation >= min_ilegal_;
+ computation_mutex_.release();
+ return annotation >= min_illegal_;
}
template <typename AnnotationId>
@@ -199,9 +200,14 @@ ACE_INLINE
void
Live_P_Strategy<AnnotationId>::grant(AnnotationId handle)
{
- int annotation = getAnnotation(handle);
- tree_pimpl_->bind(annotation);
- min_ilegal_is_computed_ = false;
+ int annotation = get_annotation(handle);
+ //since the state of the tree is involved in calculation
+ //of max, we must aquire the lock before changing the
+ //structure of the tree
+ computation_mutex_.acquire();
+ tree_pimpl_->bind(annotation);
+ min_illegal_is_computed_ = false;
+ computation_mutex_.release();
}
template <typename AnnotationId>
@@ -209,7 +215,12 @@ ACE_INLINE
void
Live_P_Strategy<AnnotationId>::release(AnnotationId handle)
{
- min_ilegal_is_computed_ = false;
- int annotation = getAnnotation(handle);
+ //since the state of the tree is involved in calculation
+ //of max, we must aquire the lock before changing the
+ //structure of the tree
+ computation_mutex_.acquire();
+ min_illegal_is_computed_ = false;
+ int annotation = get_annotation(handle);
tree_pimpl_->unbind(annotation);
+ computation_mutex_.release();
} \ No newline at end of file
diff --git a/ACE/ace/k_Efficient_P_Strategy.h b/ACE/ace/k_Efficient_P_Strategy.h
index bce6af929f0..1f4b71a49c1 100644
--- a/ACE/ace/k_Efficient_P_Strategy.h
+++ b/ACE/ace/k_Efficient_P_Strategy.h
@@ -18,21 +18,21 @@
#include /**/ "ace/pre.h"
#include "ace/DA_Strategy_Base.h"
-
+#include "ace/Mutex.h"
#include <vector>
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
class k_Efficient_P_Strategy : public DA_Strategy_Base<AnnotationId> {
//The annotations consist of an identifier and a resource cost value
public:
//note: k must be less than maxThreads
- k_Efficient_P_Strategy(int maxThreads);
+ k_Efficient_P_Strategy(int maxThreads, int k);
virtual ~k_Efficient_P_Strategy();
virtual bool is_deadlock_potential(AnnotationId handle);
virtual void grant(AnnotationId handle);
@@ -41,6 +41,8 @@ private:
int compute_min_illegal();
int get_min_illegal();
int min_illegal_;
+ ACE_Mutex computation_mutex_;
+ int k_;
bool min_illegal_is_computed_ ;
std::vector<int> a;
std::vector<int> A;
diff --git a/ACE/ace/k_Efficient_P_Strategy.inl b/ACE/ace/k_Efficient_P_Strategy.inl
index dcb575a244f..5d67a95cc58 100644
--- a/ACE/ace/k_Efficient_P_Strategy.inl
+++ b/ACE/ace/k_Efficient_P_Strategy.inl
@@ -1,70 +1,73 @@
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
-k_Efficient_P_Strategy<AnnotationId, k>::k_Efficient_P_Strategy(int maxThreads)
+k_Efficient_P_Strategy<AnnotationId>::k_Efficient_P_Strategy(int maxThreads, int k)
:DA_Strategy_Base(maxThreads),
+ k_(k)
{
- a.resize(k + 1);
- A.resize(k + 1);
- for (unsigned i=0; i<k; ++i) {
+ a.resize(k_ + 1);
+ A.resize(k_ + 1);
+ for (int i=0; i<k_; ++i) {
a[i] = 0;
A[i] = 0;
}
- min_ilegal_ = maxThreads;
- min_ilegal_is_computed_ = true;
+ min_illegal_ = maxThreads;
+ min_illegal_is_computed_ = true;
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
-k_Efficient_P_Strategy<AnnotationId, k>::~k_Efficient_P_Strategy()
+k_Efficient_P_Strategy<AnnotationId>::~k_Efficient_P_Strategy()
{
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
-bool k_Efficient_P_Strategy<AnnotationId, k>::is_deadlock_potential(AnnotationId handle)
+bool k_Efficient_P_Strategy<AnnotationId>::is_deadlock_potential(AnnotationId handle)
{
int annotation = get_annotation(handle);
- return (annotation >= min_ilegal());
+ return (annotation >= get_min_illegal());
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
int
-k_Efficient_P_Strategy<AnnotationId, k>::compute_min_illegal()
+k_Efficient_P_Strategy<AnnotationId>::compute_min_illegal()
{
int T = get_max_threads();
- for (int i=0; i<k; ++i) {
+ for (int i=0; i<k_; ++i) {
if (!(A[i] < (T - i))) {
return i;
}
}
- if (A[k]>0) {
- return (T - A[k]);
+ if (A[k_]>0) {
+ return (T - A[k_]);
}
return T;
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
int
-k_Efficient_P_Strategy<AnnotationId, k>::get_min_illegal()
+k_Efficient_P_Strategy<AnnotationId>::get_min_illegal()
{
+ computation_mutex_.acquire();
if (!min_illegal_is_computed_) {
- min_illegal_ = compute_min_ilegal();
+ min_illegal_ = compute_min_illegal();
min_illegal_is_computed_ = true;
}
+ computation_mutex_.release();
return min_illegal_;
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
-void k_Efficient_P_Strategy<AnnotationId, k>::grant(AnnotationId handle)
+void k_Efficient_P_Strategy<AnnotationId>::grant(AnnotationId handle)
{
int annotation = get_annotation(handle);
-
- if (annotation < k)
+ computation_mutex_.acquire();
+ if (annotation < k_)
{
a[annotation] ++;
for (int i=0; i<=annotation; ++i)
@@ -74,42 +77,44 @@ void k_Efficient_P_Strategy<AnnotationId, k>::grant(AnnotationId handle)
}
else
{
- a[k] ++;
- for (int i=0; i<=k ; ++i)
+ a[k_] ++;
+ for (int i=0; i<=k_ ; ++i)
{
A[i]++;
}
}
- min_ilegal_is_computed_ = false;
- return true;
+ min_illegal_is_computed_ = false;
+ computation_mutex_.release();
}
-template <typename AnnotationId, int k>
+template <typename AnnotationId>
ACE_INLINE
-void k_Efficient_P_Strategy<AnnotationId, k>::release(AnnotationId handle)
+void k_Efficient_P_Strategy<AnnotationId>::release(AnnotationId handle)
{
int annotation = get_annotation(handle);
+ computation_mutex_.acquire();
/* if (annotation < k ) {
assert(a[annotation]>0);
} else {
assert(a[k] >0);
}
*/
- if (annotation < k)
+ if (annotation < k_)
{
a[annotation] --;
- for (unsigned i=0; i<=annotation; ++i)
+ for (int i=0; i<=annotation; ++i)
{
A[i]--;
}
}
else
{
- a[k] --;
- for (unsigned i=0; i<=k ; ++i)
+ a[k_] --;
+ for (int i=0; i<=k_ ; ++i)
{
A[i]--;
}
}
- min_ilegal_is_computed_ = false;
+ min_illegal_is_computed_ = false;
+ computation_mutex_.release();
} \ No newline at end of file
diff --git a/ACE/tests/DA_Reactor_Test.cpp b/ACE/tests/DA_Reactor_Test.cpp
new file mode 100644
index 00000000000..0b9c64ab426
--- /dev/null
+++ b/ACE/tests/DA_Reactor_Test.cpp
@@ -0,0 +1,1297 @@
+//============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// DAReactor_test.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the <ACE_TP_Reactor> can be used to
+// implement an application that does various operations.
+// usage: DA_Reactor_Test
+// -n number threads in the TP_Reactor thread pool
+// -d duplex mode 1 (full-duplex) vs. 0 (half-duplex)
+// -p port to listen(Server)/connect(Client)
+// -h host to connect (Client mode)
+// -s number of sender's instances ( Client mode)
+// -b run client and server (both modes ) at the same time
+// -v log level
+// 0 - log all messages
+// 1 - log only errors and unusual cases
+// -i time to run in seconds
+// -u show this message
+// -t specify the deadlock avoidance strategy
+// basic - Basic P strategy
+// efficient <k> - k-efficient P strategy (default is 1)
+// live - Live-P strategy
+//
+// The main difference between TP_Reactor_Test.cpp and
+// this test is the addition of deadlock avoidance strategies. In
+// this way, it is a test for Deadlock_Free_TP_Reactor as well as
+// the various visit strategies
+//
+//
+// = AUTHOR
+// Paul Oberlin <pauloberlin@gmail.com>
+//
+//============================================================================
+
+#include "test_config.h"
+
+#if defined(ACE_HAS_THREADS)
+
+#include "TP_Reactor_Test.h"
+
+#include "ace/Signal.h"
+#include "ace/Service_Config.h"
+#include "ace/Get_Opt.h"
+
+#include "ace/Reactor.h"
+#include "ace/Deadlock_Free_TP_Reactor.h"
+#include "ace/OS_NS_signal.h"
+#include "ace/OS_NS_stdio.h"
+#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_strings.h"
+#include "ace/OS_NS_unistd.h"
+#include "ace/Synch_Traits.h"
+#include "ace/Thread_Semaphore.h"
+
+#include "ace/Basic_P_Strategy.h"
+#include "ace/Efficient_P_Strategy.h"
+#include "ace/Live_P_Strategy.h"
+#include "ace/k_Efficient_P_Strategy.h"
+
+ACE_RCSID(TPReactor, TPReactor_Test, "TPReactor_Test.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
+
+// Some debug helper functions
+static int disable_signal (int sigmin, int sigmax);
+
+
+// both: 0 run client or server / depends on host
+// != 0 run client and server
+static int both = 0;
+
+// Host that we're connecting to.
+static const ACE_TCHAR *host = 0;
+
+// number of Senders instances
+static int senders = 1;
+
+// duplex mode: == 0 half-duplex
+// != 0 full duplex
+static int duplex = 0;
+
+// number threads in the TP_Reactor thread pool
+static int threads = 1;
+
+// Port that we're receiving connections on.
+static u_short port = ACE_DEFAULT_SERVER_PORT;
+
+// Log options
+static int loglevel = 1; // 0 full , 1 only errors
+
+static const size_t MIN_TIME = 1; // min 1 sec
+static const size_t MAX_TIME = 3600; // max 1 hour
+static u_int seconds = 2; // default time to run - 2 seconds
+
+enum StrategyEnum {BASIC, EFFICIENT, LIVE};
+static StrategyEnum strategyEnum = BASIC;
+DA_Strategy_Base<ACE_HANDLE>* strategy = 0;
+static int k = 0;
+
+static char data[] =
+ "GET / HTTP/1.1\r\n"
+ "Accept: */*\r\n"
+ "Accept-Language: C++\r\n"
+ "Accept-Encoding: gzip, deflate\r\n"
+ "User-Agent: DAReactor_Test/1.0 (non-compatible)\r\n"
+ "Connection: Keep-Alive\r\n"
+ "\r\n" ;
+
+// *************************************************************
+
+class LogLocker
+{
+public:
+
+ LogLocker () { ACE_LOG_MSG->acquire (); }
+ virtual ~LogLocker () { ACE_LOG_MSG->release (); }
+};
+// *************************************************************
+
+/**
+ * @class MyTask
+ *
+ * MyTask plays role for DA_Reactor threads pool
+ *
+ * MyTask is ACE_Task resposible for:
+ * 1. Creation and deletion of DA_Reactor and DA_Reactor thread pool
+ * 2. Running DA_Reactor event loop
+ */
+class MyTask : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ MyTask (void): sem_ ((unsigned int) 0),
+ my_reactor_ (0) {}
+
+ virtual ~MyTask () { stop (); }
+
+ virtual int svc (void);
+
+ int start (int num_threads);
+ int stop (void);
+
+private:
+ int create_reactor (int num_threads);
+ int delete_reactor (void);
+
+ ACE_SYNCH_RECURSIVE_MUTEX lock_;
+ ACE_Thread_Semaphore sem_;
+ ACE_Reactor *my_reactor_;
+};
+
+int
+MyTask::create_reactor (int num_threads)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_ASSERT (this->my_reactor_ == 0);
+
+ ACE_TP_Reactor * pImpl = 0;
+
+ switch (strategyEnum) {
+ case BASIC:
+ ACE_NEW_RETURN(strategy, Basic_P_Strategy<ACE_HANDLE>(threads), -1);
+ break;
+ case LIVE:
+ ACE_NEW_RETURN(strategy, Live_P_Strategy<ACE_HANDLE>(threads), -1);
+ break;
+ case EFFICIENT:
+ if ( k == 1) {
+ ACE_NEW_RETURN(strategy, Efficient_P_Strategy<ACE_HANDLE>(threads), -1);
+ } else {
+ ACE_NEW_RETURN(strategy, k_Efficient_P_Strategy<ACE_HANDLE>(threads, k), -1);
+ }
+ break;
+ }
+
+ //Note, this line seems to construct the reactor using the default thread pool size
+ //which is ACE::max_handles(). So, I don't see how the parameter to the program is
+ //setting the number of threads in the reactor. The way I see it, this parameter
+ //is used for starting the reactor event loop in a number of threads equal to this parameter.
+ ACE_NEW_RETURN (pImpl,
+ ACE_Deadlock_Free_TP_Reactor(num_threads, strategy),
+ -1);
+
+ ACE_NEW_RETURN (my_reactor_,
+ ACE_Reactor (pImpl ,1),
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT (" (%t) Create TP_Reactor\n")));
+
+ ACE_Reactor::instance (this->my_reactor_);
+
+ this->reactor (my_reactor_);
+
+ return 0;
+}
+
+int
+MyTask::delete_reactor (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
+ monitor,
+ this->lock_,
+ -1);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT (" (%t) Delete TP_Reactor\n")));
+
+ delete this->my_reactor_;
+ ACE_Reactor::instance ((ACE_Reactor *) 0);
+ this->my_reactor_ = 0;
+ this->reactor (0);
+
+ return 0;
+}
+
+int
+MyTask::start (int num_threads)
+{
+ if (this->create_reactor (num_threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to create reactor")),
+ -1);
+
+ if (this->activate (THR_NEW_LWP, num_threads) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to activate thread pool")),
+ -1);
+
+ for (; num_threads > 0 ; num_threads--)
+ sem_.acquire ();
+
+ return 0;
+}
+
+int
+MyTask::stop (void)
+{
+ if (this->my_reactor_ != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("End TP_Reactor event loop\n")));
+
+ ACE_Reactor::instance()->end_reactor_event_loop ();
+ }
+
+ if (this->wait () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to stop thread pool")));
+
+ if (this->delete_reactor () == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p.\n"),
+ ACE_TEXT ("unable to delete reactor")));
+
+ return 0;
+}
+
+int
+MyTask::svc (void)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask started\n")));
+
+ // signal that we are ready
+ sem_.release (1);
+
+ while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
+ ACE_Reactor::instance()->run_reactor_event_loop ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n")));
+ return 0;
+}
+
+// *************************************************************
+
+Acceptor::Acceptor (void)
+ : ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR> ((ACE_Reactor *) 0),
+ sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ for (size_t i = 0; i < MAX_RECEIVERS; ++i)
+ this->list_receivers_[i] =0;
+}
+
+Acceptor::~Acceptor (void)
+{
+ this->reactor (0);
+ stop ();
+}
+
+void
+Acceptor::stop (void)
+{
+ // this method can be called only after reactor event loop id done
+ // in all threads
+
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ for (size_t i = 0; i < MAX_RECEIVERS; ++i)
+ {
+ delete this->list_receivers_[i];
+ this->list_receivers_[i] =0;
+ }
+}
+
+void
+Acceptor::on_new_receiver (Receiver &rcvr)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+ this->sessions_++;
+ this->list_receivers_[rcvr.index_] = & rcvr;
+ ACE_DEBUG ((LM_DEBUG,
+ "Receiver::CTOR sessions_=%d\n",
+ this->sessions_));
+}
+
+void
+Acceptor::on_delete_receiver (Receiver &rcvr)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ this->sessions_--;
+
+ this->total_snd_ += rcvr.get_total_snd ();
+ this->total_rcv_ += rcvr.get_total_rcv ();
+ this->total_w_ += rcvr.get_total_w ();
+ this->total_r_ += rcvr.get_total_r ();
+
+ if (rcvr.index_ < MAX_RECEIVERS
+ && this->list_receivers_[rcvr.index_] == &rcvr)
+ this->list_receivers_[rcvr.index_] = 0;
+
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_snd (),
+ rcvr.get_total_w () );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ rcvr.get_total_rcv (),
+ rcvr.get_total_r ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ rcvr.index_,
+ bufs,
+ bufr,
+ this->sessions_));
+}
+
+int
+Acceptor::start (const ACE_INET_Addr &addr)
+{
+ if (ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
+ ::open (addr,
+ ACE_Reactor::instance (),
+ ACE_NONBLOCK) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p\n",
+ "Acceptor::start () - open failed"),
+ 0);
+ return 1;
+}
+
+int
+Acceptor::make_svc_handler (Receiver *&sh)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ if (sessions_ >= MAX_RECEIVERS)
+ return -1;
+
+ for (size_t i = 0; i < MAX_RECEIVERS; ++i)
+ if (this->list_receivers_ [i] == 0)
+ {
+ ACE_NEW_RETURN (sh,
+ Receiver (this , i),
+ -1);
+ return 0;
+ }
+ return -1;
+}
+
+// *************************************************************
+
+Receiver::Receiver (Acceptor * acceptor, size_t index)
+ : acceptor_ (acceptor),
+ index_ (index),
+ flg_mask_ (ACE_Event_Handler::NULL_MASK),
+ total_snd_(0),
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
+{
+ if (acceptor_ != 0)
+ acceptor_->on_new_receiver (*this);
+}
+
+
+Receiver::~Receiver (void)
+{
+ this->reactor (0);
+ if (acceptor_ != 0)
+ acceptor_->on_delete_receiver (*this);
+
+ this->index_ = 0;
+
+ for (; ;)
+ {
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+ ACE_Message_Block *mb = 0;
+
+ if (this->getq (mb, &tv) < 0)
+ break;
+
+ ACE_Message_Block::release (mb);
+ }
+}
+
+int
+Receiver::check_destroy (void)
+{
+ if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
+ return -1;
+
+ return 0;
+}
+
+int
+Receiver::open (void *)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Reactor *TPReactor = ACE_Reactor::instance ();
+
+ this->reactor (TPReactor);
+
+ flg_mask_ = ACE_Event_Handler::NULL_MASK ;
+
+ if (TPReactor->register_handler (this, flg_mask_) == -1)
+ return -1;
+
+ initiate_io (ACE_Event_Handler::READ_MASK);
+
+ return check_destroy ();
+}
+
+int
+Receiver::initiate_io (ACE_Reactor_Mask mask)
+{
+ if (ACE_BIT_ENABLED (flg_mask_, mask))
+ return 0;
+
+ if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
+ return -1;
+
+ ACE_SET_BITS (flg_mask_, mask);
+ return 0;
+}
+
+int
+Receiver::terminate_io (ACE_Reactor_Mask mask)
+{
+ if (ACE_BIT_DISABLED (flg_mask_, mask))
+ return 0;
+
+ if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
+ return -1;
+
+ ACE_CLR_BITS (flg_mask_, mask);
+ return 0;
+}
+
+int
+Receiver::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_Reactor *TPReactor = ACE_Reactor::instance ();
+
+ TPReactor->remove_handler (this,
+ ACE_Event_Handler::ALL_EVENTS_MASK |
+ ACE_Event_Handler::DONT_CALL); // Don't call handle_close
+ this->reactor (0);
+ this->destroy ();
+ return 0;
+}
+
+int
+Receiver::handle_input (ACE_HANDLE h)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ),
+ -1);
+
+ int err = 0;
+ ssize_t res = this->peer ().recv (mb->rd_ptr (), BUFSIZ-1);
+
+ this->total_r_++;
+
+ if (res >= 0)
+ {
+ mb->wr_ptr (res);
+ this->total_rcv_ += res;
+ }
+ else
+ err = errno ;
+
+ mb->wr_ptr ()[0] = '\0';
+
+ if (loglevel == 0 || res <= 0 || err!= 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_input () SessionId=%d****\n", index_));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
+ }
+
+ if (err == EWOULDBLOCK)
+ {
+ err=0;
+ res=0;
+ return check_destroy ();
+ }
+
+ if (err !=0 || res <= 0)
+ {
+ ACE_Message_Block::release (mb);
+ return -1;
+ }
+
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+
+ int qcount = this->putq (mb, & tv);
+
+ if (qcount <= 0) // failed to putq
+ {
+ ACE_Message_Block::release (mb);
+ return -1 ;
+ }
+
+ int rc = 0;
+
+ if (duplex == 0) // half-duplex , stop read
+ rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
+ else // full duplex
+ {
+ if (qcount >= 20 ) // flow control, stop read
+ rc = this->terminate_io (ACE_Event_Handler::READ_MASK);
+ else
+ rc = this->initiate_io (ACE_Event_Handler::READ_MASK);
+ }
+
+ if (rc == -1)
+ return -1;
+
+ //initiate write
+ if (this->initiate_io (ACE_Event_Handler::WRITE_MASK) != 0)
+ return -1;
+
+ return check_destroy ();
+}
+
+int
+Receiver::handle_output (ACE_HANDLE h)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+ ACE_Message_Block *mb = 0;
+
+ int err = 0;
+ ssize_t res = 0;
+ size_t bytes = 0;
+
+ int qcount = this->getq (mb, &tv);
+
+ if (mb != 0) // qcount >= 0)
+ {
+ bytes = mb->length ();
+ res = this->peer ().send (mb->rd_ptr (), bytes);
+
+ this->total_w_++;
+
+ if (res < 0)
+ err = errno ;
+ else
+ this->total_snd_ += res;
+
+
+ if (loglevel == 0 || res <= 0 || err!= 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG, "**** Receiver::handle_output () SessionId=%d****\n", index_));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
+ }
+ }
+
+ ACE_Message_Block::release (mb);
+
+ if (err != 0 || res < 0)
+ return -1;
+
+ if (qcount <= 0) // no more message blocks in queue
+ {
+ if (this->terminate_io (ACE_Event_Handler::WRITE_MASK) != 0)
+ return -1;
+
+ if (this->initiate_io (ACE_Event_Handler::READ_MASK) != 0)
+ return -1;
+ }
+
+ return check_destroy ();
+}
+
+// *************************************************************
+
+Connector::Connector (void)
+ : ACE_Connector<Sender,ACE_SOCK_CONNECTOR> ((ACE_Reactor *) 0),
+ sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ for (size_t i = 0; i < MAX_SENDERS; ++i)
+ this->list_senders_[i] = 0;
+}
+
+Connector::~Connector (void)
+{
+ this->reactor (0);
+ stop ();
+}
+
+void
+Connector::stop ()
+{
+ // this method can be called only
+ // after reactor event loop id done
+ // in all threads
+
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ for (size_t i = 0; i < MAX_SENDERS; ++i)
+ {
+ delete this->list_senders_[i];
+ this->list_senders_[i] =0;
+ }
+}
+
+void
+Connector::on_new_sender (Sender & sndr)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+ this->sessions_++;
+ this->list_senders_[sndr.index_] = &sndr;
+ ACE_DEBUG ((LM_DEBUG,
+ "Sender::CTOR sessions_=%d\n",
+ this->sessions_));
+}
+
+void
+Connector::on_delete_sender (Sender & sndr)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ this->sessions_--;
+ this->total_snd_ += sndr.get_total_snd();
+ this->total_rcv_ += sndr.get_total_rcv();
+ this->total_w_ += sndr.get_total_w();
+ this->total_r_ += sndr.get_total_r();
+
+ if (sndr.index_ < MAX_SENDERS
+ && this->list_senders_[sndr.index_] == &sndr)
+ this->list_senders_[sndr.index_] = 0;
+
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_snd(),
+ sndr.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ sndr.get_total_rcv(),
+ sndr.get_total_r() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ sndr.index_,
+ bufs,
+ bufr,
+ this->sessions_));
+
+}
+
+int
+Connector::start (const ACE_INET_Addr & addr, int num)
+{
+
+ if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>
+ ::open (ACE_Reactor::instance (),
+ ACE_NONBLOCK) < 0)
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ "%p\n",
+ "Connector::start () - open failed"),
+ 0);
+
+ int rc = 0;
+
+ for (int i = 0 ; i < num ; i++)
+ {
+ Sender * sender = 0;
+
+ if (ACE_Connector<Sender,ACE_SOCK_CONNECTOR>
+ ::connect (sender, addr) < 0)
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ "%p\n",
+ "Connector::start () - connect failed"),
+ rc);
+ }
+
+ return rc;
+}
+
+int
+Connector::make_svc_handler (Sender * & sh)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (this->mutex_);
+
+ if (sessions_ >= MAX_SENDERS)
+ return -1;
+
+ for (size_t i = 0; i < MAX_SENDERS; ++i)
+ if (this->list_senders_ [i] == 0)
+ {
+ ACE_NEW_RETURN (sh,
+ Sender (this , i),
+ -1);
+ return 0;
+ }
+
+ return -1;
+}
+
+// *************************************************************
+
+Sender::Sender (Connector* connector, size_t index)
+ : connector_ (connector),
+ index_ (index),
+ flg_mask_ (ACE_Event_Handler::NULL_MASK),
+ total_snd_(0),
+ total_rcv_(0),
+ total_w_ (0),
+ total_r_ (0)
+{
+ if (connector_ != 0)
+ connector_->on_new_sender (*this);
+
+ ACE_OS::sprintf (send_buf_ ,data);
+}
+
+
+Sender::~Sender (void)
+{
+ this->reactor (0);
+ if (connector_ != 0)
+ connector_->on_delete_sender (*this);
+
+ this->index_ = 0;
+
+ for (; ;)
+ {
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+ ACE_Message_Block *mb = 0;
+
+ if (this->getq (mb, &tv) < 0)
+ break;
+
+ ACE_Message_Block::release (mb);
+ }
+}
+
+int
+Sender::check_destroy (void)
+{
+ if (flg_mask_ == ACE_Event_Handler::NULL_MASK)
+ return -1;
+
+ return 0;
+}
+
+int Sender::open (void *)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Reactor * TPReactor = ACE_Reactor::instance ();
+
+ this->reactor (TPReactor);
+
+ flg_mask_ = ACE_Event_Handler::NULL_MASK ;
+
+ if (TPReactor->register_handler (this,flg_mask_) == -1)
+ return -1;
+
+ if (this->initiate_write () == -1)
+ return -1;
+
+ if (duplex != 0)
+ initiate_io (ACE_Event_Handler::READ_MASK);
+
+ return check_destroy ();
+}
+
+int
+Sender::initiate_write (void)
+{
+ if ( this->msg_queue ()->message_count () < 20) // flow control
+ {
+ size_t nbytes = ACE_OS::strlen (send_buf_);
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (nbytes+8),
+ -1);
+
+ mb->init (send_buf_, nbytes);
+ mb->rd_ptr (mb->base ());
+ mb->wr_ptr (mb->base ());
+ mb->wr_ptr (nbytes);
+
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+
+ int qcount =this->putq (mb, & tv);
+
+ if (qcount <= 0)
+ {
+ ACE_Message_Block::release (mb);
+ return -1;
+ }
+ }
+
+ return initiate_io (ACE_Event_Handler::WRITE_MASK);
+}
+
+int
+Sender::initiate_io (ACE_Reactor_Mask mask)
+{
+ if (ACE_BIT_ENABLED (flg_mask_, mask))
+ return 0;
+
+ if (ACE_Reactor::instance ()->schedule_wakeup (this, mask) == -1)
+ return -1;
+
+ ACE_SET_BITS (flg_mask_, mask);
+ return 0;
+}
+
+int
+Sender::terminate_io (ACE_Reactor_Mask mask)
+{
+ if (ACE_BIT_DISABLED (flg_mask_, mask))
+ return 0;
+
+ if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
+ return -1;
+
+ ACE_CLR_BITS (flg_mask_, mask);
+ return 0;
+}
+
+int
+Sender::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_Reactor * TPReactor = ACE_Reactor::instance ();
+
+ TPReactor->remove_handler (this,
+ ACE_Event_Handler::ALL_EVENTS_MASK |
+ ACE_Event_Handler::DONT_CALL); // Don't call handle_close
+ this->reactor (0);
+ this->destroy ();
+ return 0;
+}
+
+int
+Sender::handle_input (ACE_HANDLE h)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block (BUFSIZ),
+ -1);
+
+ int err = 0;
+ ssize_t res = this->peer ().recv (mb->rd_ptr (),
+ BUFSIZ-1);
+ this->total_r_++;
+
+ if (res >= 0)
+ {
+ mb->wr_ptr (res);
+ this->total_rcv_ += res;
+ }
+ else
+ err = errno ;
+
+ mb->wr_ptr ()[0] = '\0';
+
+ if (loglevel == 0 || res <= 0 || err!= 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_input () SessionId=%d****\n", index_));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", BUFSIZ));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
+ }
+
+ ACE_Message_Block::release (mb);
+
+ if (err == EWOULDBLOCK)
+ {
+ err=0;
+ res=0;
+ return check_destroy ();
+ }
+
+ if (err !=0 || res <= 0)
+ return -1;
+
+ int rc = 0;
+
+ if (duplex != 0) // full duplex, continue read
+ rc = initiate_io (ACE_Event_Handler::READ_MASK);
+ else
+ rc = terminate_io (ACE_Event_Handler::READ_MASK);
+
+ if (rc != 0)
+ return -1 ;
+
+ rc = initiate_write ();
+ if (rc != 0)
+ return -1;
+
+ return check_destroy ();
+}
+
+int
+Sender::handle_output (ACE_HANDLE h)
+{
+ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
+
+ ACE_Time_Value tv = ACE_Time_Value::zero;
+ ACE_Message_Block *mb = 0;
+
+ int err=0;
+ ssize_t res=0;
+ size_t bytes=0;
+
+ int qcount = this->getq (mb , & tv);
+
+ if (mb != 0) // qcount >= 0
+ {
+ bytes = mb->length ();
+ res = this->peer ().send (mb->rd_ptr (), bytes);
+
+ this->total_w_++;
+
+ if (res < 0)
+ err = errno ;
+ else
+ this->total_snd_ += res;
+
+ if (loglevel == 0 || res <= 0 || err!= 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG, "**** Sender::handle_output () SessionId=%d****\n", index_));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", bytes));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", h));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transferred", res));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", err));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", mb->rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG, "**** end of message ****************\n"));
+ }
+ }
+
+ ACE_Message_Block::release (mb);
+
+ if (err != 0 || res < 0)
+ return -1;
+
+ int rc = 0;
+
+ if (qcount <= 0) // no more message blocks in queue
+ {
+ if (duplex != 0 && // full duplex, continue write
+ (this->total_snd_ - this->total_rcv_ ) < 1024*32 ) // flow control
+ rc = initiate_write ();
+ else
+ rc = terminate_io (ACE_Event_Handler::WRITE_MASK);
+
+ if (rc == -1)
+ return -1;
+ }
+
+ rc = initiate_io (ACE_Event_Handler::READ_MASK);
+ if (rc == -1)
+ return -1;
+
+ return check_destroy ();
+}
+
+
+// *************************************************************
+// Configuration helpers
+// *************************************************************
+int
+print_usage (int /* argc */, ACE_TCHAR *argv[])
+{
+ ACE_ERROR
+ ((LM_ERROR,
+ ACE_TEXT ("\nusage: %s")
+ ACE_TEXT ("\n-n <number threads in the thread pool>")
+ ACE_TEXT ("\n-d <duplex mode 1-on/0-off>")
+ ACE_TEXT ("\n-p <port to listen/connect>")
+ ACE_TEXT ("\n-h <host> for Sender mode")
+ ACE_TEXT ("\n-s <number of sender's instances>")
+ ACE_TEXT ("\n-b run client and server at the same time")
+ ACE_TEXT ("\n-v log level")
+ ACE_TEXT ("\n 0 - log all messages")
+ ACE_TEXT ("\n 1 - log only errors and unusual cases")
+ ACE_TEXT ("\n-i time to run in seconds")
+ ACE_TEXT ("\n-u show this message")
+ ACE_TEXT ("\n"),
+ argv[0]
+ ));
+ return -1;
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ if (argc == 1) // no arguments , so one button test
+ {
+ both = 1; // client and server simultaneosly
+ duplex = 1; // full duplex is on
+ host = ACE_TEXT ("localhost"); // server to connect
+ port = ACE_DEFAULT_SERVER_PORT; // port to connect/listen
+ threads = 3; // size of Proactor thread pool
+ senders = 20; // number of senders
+ loglevel = 1; // log level : 0 full/ 1 only errors
+ seconds = 20; // time to run in seconds
+#if defined(SOMAXCONN) // The test is invalid if senders > SOMAXCONN
+ if(SOMAXCONN < senders)
+ senders = SOMAXCONN;
+#endif
+ return 0;
+ }
+
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("i:n:p:d:h:s:v:t:ub"));
+ int c;
+
+ while ((c = get_opt ()) != EOF)
+ {
+ ACE_TCHAR *stratarg = 0;
+ ACE_TCHAR *strategyStr = 0;
+ switch (c)
+ {
+ case 'i': // time to run
+ seconds = ACE_OS::atoi (get_opt.opt_arg());
+ if (seconds < MIN_TIME)
+ seconds = MIN_TIME;
+ if (seconds > MAX_TIME)
+ seconds = MAX_TIME;
+ break;
+ case 'b': // both client and server
+ both = 1;
+ break;
+ case 'v': // log level
+ loglevel = ACE_OS::atoi (get_opt.opt_arg());
+ break;
+ case 'd': // duplex
+ duplex = ACE_OS::atoi (get_opt.opt_arg());
+ break;
+ case 'h': // host for sender
+ host = get_opt.opt_arg();
+ break;
+ case 'p': // port number
+ port = ACE_OS::atoi (get_opt.opt_arg());
+ break;
+ case 'n': // thread pool size
+ threads = ACE_OS::atoi (get_opt.opt_arg());
+ break;
+ case 's': // number of senders
+ senders = ACE_OS::atoi (get_opt.opt_arg());
+ if (size_t (senders) > MAX_SENDERS)
+ senders = MAX_SENDERS;
+ break;
+ case 't': //avoidance strategy
+ stratarg = get_opt.opt_arg();
+
+ strategyStr = ACE_OS::strtok(stratarg, " ");
+ if (ACE_OS::strcmp(strategyStr, "basic"))
+ {
+ strategyEnum = BASIC;
+ }
+ else if (ACE_OS::strcasecmp(strategyStr, "live"))
+ {
+ strategyEnum = LIVE;
+ }
+ else if (ACE_OS::strcmp(strategyStr, "efficient"))
+ {
+ strategyEnum = EFFICIENT;
+ k = ACE_OS::atoi (ACE_OS::strtok(NULL, " "));
+ }
+ else
+ {
+ return print_usage(argc,argv);
+ }
+ break;
+ case 'u':
+ default:
+ return print_usage (argc,argv);
+ } // switch
+ } // while
+
+ return 0;
+}
+
+static int
+disable_signal (int sigmin, int sigmax)
+{
+#if defined (ACE_HAS_PTHREADS_STD) && !defined (ACE_LACKS_PTHREAD_SIGMASK)
+ sigset_t signal_set;
+ if (sigemptyset (&signal_set) == - 1)
+ ACE_ERROR ((LM_ERROR,
+ "Error: (%P | %t):%p\n",
+ "sigemptyset failed"));
+
+ for (int i = sigmin; i <= sigmax; i++)
+ sigaddset (&signal_set, i);
+
+ // Put the <signal_set>.
+ if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
+ ACE_ERROR ((LM_ERROR,
+ "Error: (%P | %t):%p\n",
+ "pthread_sigmask failed"));
+#else
+ ACE_UNUSED_ARG(sigmin);
+ ACE_UNUSED_ARG(sigmax);
+#endif /* ACE_HAS_PTHREADS_STD && !ACE_LACKS_PTHREAD_SIGMASK */
+
+ return 1;
+}
+
+#endif /* ACE_HAS_THREADS */
+
+
+void
+run_task(DA_Strategy_Base<ACE_HANDLE>* strategy) {
+
+}
+
+int
+run_main (int argc, ACE_TCHAR *argv[])
+{
+ ACE_START_TEST (ACE_TEXT ("TP_Reactor_Test"));
+
+#if defined(ACE_HAS_THREADS)
+ if (::parse_args (argc, argv) == -1)
+ return -1;
+
+ ::disable_signal (SIGPIPE, SIGPIPE);
+
+ MyTask task1;
+ Acceptor acceptor;
+ Connector connector;
+
+ if (task1.start (threads) == 0)
+ {
+ int rc = 0;
+
+ ACE_INET_Addr addr (port);
+ if (both != 0 || host == 0) { // Acceptor
+ rc += acceptor.start (addr);
+ }
+ if (both != 0 || host != 0)
+ {
+ if (host == 0)
+ {
+ host = ACE_LOCALHOST;
+ }
+ if (addr.set (port, host, 1, addr.get_type ()) == -1) {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), host));
+ }
+ rc += connector.start (addr, senders);
+
+ }
+
+ if (rc > 0) {
+ ACE_OS::sleep (seconds);
+ }
+ }
+
+ task1.stop ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\nNumber of Receivers objects = %d\n")
+ ACE_TEXT ("\nNumber of Sender objects = %d\n"),
+ acceptor.get_number_sessions (),
+ connector.get_number_sessions ()));
+
+ // As Reactor event loop now is inactive it is safe to destroy all
+ // senders
+
+ connector.stop ();
+ acceptor.stop ();
+
+ //Print statistic
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_snd(),
+ connector.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ connector.get_total_rcv(),
+ connector.get_total_r() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_snd(),
+ acceptor.get_total_w() );
+
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld(%ld)"),
+ acceptor.get_total_rcv(),
+ acceptor.get_total_r() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
+#else /* ACE_HAS_THREADS */
+ ACE_UNUSED_ARG( argc );
+ ACE_UNUSED_ARG( argv );
+#endif /* ACE_HAS_THREADS */
+
+
+ ACE_END_TEST;
+
+ return 0;
+}
diff --git a/ACE/tests/DA_Reactor_Test.h b/ACE/tests/DA_Reactor_Test.h
new file mode 100644
index 00000000000..6ca71ba811c
--- /dev/null
+++ b/ACE/tests/DA_Reactor_Test.h
@@ -0,0 +1,200 @@
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// DA_Reactor_Test.h
+//
+// = DESCRIPTION
+// Define class needed for generating templates. IBM C++ requires this to
+// be in its own file for auto template instantiation.
+//
+// = AUTHOR
+// Paul Oberlin (pauloberlin@gmail.com)
+//
+// ============================================================================
+
+#ifndef ACE_TESTS_DA_REACTOR_TEST_H
+#define ACE_TESTS_DA_REACTOR_TEST_H
+
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Acceptor.h"
+#include "ace/Connector.h"
+#include "ace/Svc_Handler.h"
+#include "ace/Recursive_Thread_Mutex.h"
+
+
+const size_t MAX_SENDERS = 1000;
+const size_t MAX_RECEIVERS = 1000;
+
+
+// *************************************************************
+// Receiver and Acceptor
+// *************************************************************
+// forward declaration
+class Acceptor;
+
+class Receiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
+{
+ friend class Acceptor;
+public:
+
+ Receiver (Acceptor * acceptor=0, size_t index=MAX_RECEIVERS+1);
+
+ ~Receiver (void);
+
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
+
+ // virtual from ACE_Svc_Handler<>
+ virtual int open (void * pVoid);
+
+ // virtual from ACE_Event_Handler
+ virtual int handle_input (ACE_HANDLE h);
+ virtual int handle_output (ACE_HANDLE h);
+ virtual int handle_close (ACE_HANDLE h , ACE_Reactor_Mask mask);
+
+private:
+ int terminate_io (ACE_Reactor_Mask mask);
+ int initiate_io (ACE_Reactor_Mask mask);
+ int check_destroy (void);
+
+ Acceptor * acceptor_;
+ size_t index_;
+ int flg_mask_;
+
+ ACE_Recursive_Thread_Mutex mutex_;
+ long total_snd_;
+ long total_rcv_;
+ long total_w_;
+ long total_r_;
+};
+
+// *************************************************************
+
+class Acceptor : public ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
+{
+ friend class Receiver;
+public:
+ size_t get_number_sessions (void) { return sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
+
+ Acceptor (void);
+ virtual ~Acceptor (void);
+
+ void stop (void);
+ int start (const ACE_INET_Addr & addr);
+
+ // virtual from ACE_Acceptor<Receiver,ACE_SOCK_ACCEPTOR>
+ virtual int make_svc_handler (Receiver * & sh);
+
+private:
+
+ ACE_Recursive_Thread_Mutex mutex_;
+ size_t sessions_;
+ Receiver *list_receivers_[MAX_RECEIVERS];
+ long total_snd_;
+ long total_rcv_;
+ long total_w_;
+ long total_r_;
+
+ void on_new_receiver (Receiver & rcvr);
+ void on_delete_receiver (Receiver & rcvr);
+};
+
+
+// *******************************************
+// Sender
+// *******************************************
+
+class Connector;
+
+class Sender : public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>
+{
+ friend class Connector;
+
+public:
+ Sender (Connector * connector=0, size_t index=MAX_SENDERS+1);
+
+ ~Sender (void);
+
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
+
+ // virtual from ACE_Svc_Handler<>
+ virtual int open (void * pVoid);
+
+ // virtual from ACE_Event_Handler
+ virtual int handle_input (ACE_HANDLE h);
+ virtual int handle_output (ACE_HANDLE h);
+ virtual int handle_close (ACE_HANDLE h , ACE_Reactor_Mask mask);
+
+private:
+ int terminate_io (ACE_Reactor_Mask mask);
+ int initiate_io (ACE_Reactor_Mask mask);
+ int initiate_write ();
+ int check_destroy (void);
+
+ Connector * connector_;
+ size_t index_;
+ int flg_mask_;
+
+ ACE_Recursive_Thread_Mutex mutex_;
+
+ char send_buf_ [1024];
+ long total_snd_;
+ long total_rcv_;
+ long total_w_;
+ long total_r_;
+};
+
+// *************************************************************
+
+class Connector: public ACE_Connector<Sender,ACE_SOCK_CONNECTOR>
+{
+ friend class Sender;
+public:
+ size_t get_number_sessions (void) { return sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+ long get_total_w (void) { return this->total_w_; }
+ long get_total_r (void) { return this->total_r_; }
+
+
+ Connector ();
+ virtual ~Connector ();
+
+ void stop ();
+ int start (const ACE_INET_Addr & addr , int num);
+
+ // virtual from ACE_Connector<>
+ virtual int make_svc_handler (Sender * & sh);
+
+private:
+
+ ACE_Recursive_Thread_Mutex mutex_;
+ size_t sessions_;
+ Sender * list_senders_ [MAX_SENDERS];
+ long total_snd_;
+ long total_rcv_;
+ long total_w_;
+ long total_r_;
+
+ void on_new_sender (Sender & sndr);
+ void on_delete_sender (Sender & sndr);
+};
+
+
+#endif /* ACE_TESTS_TP_REACTOR_TEST_H */