summaryrefslogtreecommitdiff
path: root/TAO/tests/Big_Oneways
diff options
context:
space:
mode:
authorWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:21 +0000
committerWilliam R. Otte <wotte@dre.vanderbilt.edu>2006-07-24 15:50:21 +0000
commit3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (patch)
tree197c810e5f5bce17b1233a7cb8d7b50c0bcd25e2 /TAO/tests/Big_Oneways
parent6b846cf03c0bcbd8c276cb0af61a181e5f98eaae (diff)
downloadATCD-3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c.tar.gz
Repo restructuring
Diffstat (limited to 'TAO/tests/Big_Oneways')
-rw-r--r--TAO/tests/Big_Oneways/.cvsignore2
-rw-r--r--TAO/tests/Big_Oneways/Big_Oneways.mpc21
-rw-r--r--TAO/tests/Big_Oneways/Coordinator.cpp83
-rw-r--r--TAO/tests/Big_Oneways/Coordinator.h55
-rw-r--r--TAO/tests/Big_Oneways/Peer.cpp51
-rw-r--r--TAO/tests/Big_Oneways/Peer.h39
-rw-r--r--TAO/tests/Big_Oneways/README21
-rw-r--r--TAO/tests/Big_Oneways/Session.cpp282
-rw-r--r--TAO/tests/Big_Oneways/Session.h112
-rw-r--r--TAO/tests/Big_Oneways/Session_Control.cpp75
-rw-r--r--TAO/tests/Big_Oneways/Session_Control.h45
-rw-r--r--TAO/tests/Big_Oneways/Session_Task.cpp19
-rw-r--r--TAO/tests/Big_Oneways/Session_Task.h33
-rw-r--r--TAO/tests/Big_Oneways/Test.idl83
-rw-r--r--TAO/tests/Big_Oneways/client.cpp118
-rwxr-xr-xTAO/tests/Big_Oneways/run_test.pl88
-rw-r--r--TAO/tests/Big_Oneways/server.cpp230
17 files changed, 1357 insertions, 0 deletions
diff --git a/TAO/tests/Big_Oneways/.cvsignore b/TAO/tests/Big_Oneways/.cvsignore
new file mode 100644
index 00000000000..f2ad85300eb
--- /dev/null
+++ b/TAO/tests/Big_Oneways/.cvsignore
@@ -0,0 +1,2 @@
+client
+server
diff --git a/TAO/tests/Big_Oneways/Big_Oneways.mpc b/TAO/tests/Big_Oneways/Big_Oneways.mpc
new file mode 100644
index 00000000000..5cf458a0b1e
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Big_Oneways.mpc
@@ -0,0 +1,21 @@
+// -*- MPC -*-
+// $Id$
+
+project(*Server): taoserver {
+ Source_Files {
+ Coordinator.cpp
+ Session_Control.cpp
+ server.cpp
+ }
+}
+
+project(*Client): taoexe, portableserver {
+ after += *Server
+ Source_Files {
+ Peer.cpp
+ Session.cpp
+ Session_Task.cpp
+ client.cpp
+ }
+}
+
diff --git a/TAO/tests/Big_Oneways/Coordinator.cpp b/TAO/tests/Big_Oneways/Coordinator.cpp
new file mode 100644
index 00000000000..6b6b332a9cc
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Coordinator.cpp
@@ -0,0 +1,83 @@
+//
+// $Id$
+//
+#include "Coordinator.h"
+
+ACE_RCSID(Big_Oneways, Coordinator, "$Id$")
+
+Coordinator::Coordinator (CORBA::ULong peer_count)
+ : peers_ (0)
+ , peer_count_ (0)
+ , peer_max_ (peer_count)
+{
+ ACE_NEW (this->peers_, Test::Peer_var[this->peer_max_]);
+}
+
+Coordinator::~Coordinator (void)
+{
+ delete[] this->peers_;
+}
+
+int
+Coordinator::has_all_peers (void) const
+{
+ return this->peer_count_ == this->peer_max_;
+}
+
+void
+Coordinator::create_session_list (Test::Session_Control_ptr session_control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ Test::Session_List &session_list
+ ACE_ENV_ARG_DECL)
+{
+ session_list.length (this->peer_count_);
+ CORBA::ULong count = 0;
+ for (Test::Peer_var *i = this->peers_;
+ i != this->peers_ + this->peer_count_;
+ ++i)
+ {
+ session_list[count++] =
+ (*i)->create_session (session_control,
+ payload_size,
+ thread_count,
+ message_count,
+ this->peer_count_
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+}
+
+void
+Coordinator::shutdown_all_peers (ACE_ENV_SINGLE_ARG_DECL)
+{
+ for (Test::Peer_var *i = this->peers_;
+ i != this->peers_ + this->peer_count_;
+ ++i)
+ {
+ ACE_TRY
+ {
+ (*i)->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Coordinator::shutdown, ignored");
+ }
+ ACE_ENDTRY;
+ }
+}
+
+void
+Coordinator::add_peer (Test::Peer_ptr peer
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (this->peer_count_ >= this->peer_max_)
+ return;
+
+ this->peers_[this->peer_count_++] =
+ Test::Peer::_duplicate (peer);
+}
diff --git a/TAO/tests/Big_Oneways/Coordinator.h b/TAO/tests/Big_Oneways/Coordinator.h
new file mode 100644
index 00000000000..7879e47752a
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Coordinator.h
@@ -0,0 +1,55 @@
+//
+// $Id$
+//
+
+#ifndef BIG_ONEWAYS_COORDINATOR_H
+#define BIG_ONEWAYS_COORDINATOR_H
+#include /**/ "ace/pre.h"
+
+#include "TestS.h"
+
+/// Implement the Test::Coordinator interface
+class Coordinator
+ : public virtual POA_Test::Coordinator
+{
+public:
+ /// Constructor
+ Coordinator (CORBA::ULong peer_count);
+
+
+ /// Check if all the peers have registered already
+ int has_all_peers (void) const;
+
+ /// Check a session on each peer
+ void create_session_list (Test::Session_Control_ptr session_control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ Test::Session_List &session_list
+ ACE_ENV_ARG_DECL);
+
+ /// Shutdown all the peers
+ void shutdown_all_peers (ACE_ENV_SINGLE_ARG_DECL);
+
+ // = The skeleton methods
+ virtual void add_peer (Test::Peer_ptr peer
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+protected:
+ /// Protected Destructor, call _remove_ref() instead
+ virtual ~Coordinator (void);
+
+private:
+ /// Store a reference to each peer
+ Test::Peer_var *peers_;
+
+ /// Current number of elements in the array
+ size_t peer_count_;
+
+ /// Array's capacity
+ size_t peer_max_;
+};
+
+#include /**/ "ace/post.h"
+#endif /* BIG_ONEWAYS_COORDINATOR_H */
diff --git a/TAO/tests/Big_Oneways/Peer.cpp b/TAO/tests/Big_Oneways/Peer.cpp
new file mode 100644
index 00000000000..b93f0e3f7b2
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Peer.cpp
@@ -0,0 +1,51 @@
+//
+// $Id$
+//
+#include "Peer.h"
+#include "Session.h"
+
+ACE_RCSID(Big_Oneways, Peer, "$Id$")
+
+Peer::Peer (CORBA::ORB_ptr orb)
+ : orb_ (CORBA::ORB::_duplicate (orb))
+{
+}
+
+Peer::~Peer (void)
+{
+}
+
+Test::Session_ptr
+Peer::create_session (Test::Session_Control_ptr control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ CORBA::ULong peer_count
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ Session *session_impl = 0;
+ ACE_NEW_THROW_EX (session_impl,
+ Session (control,
+ payload_size,
+ thread_count,
+ message_count,
+ peer_count),
+ CORBA::NO_MEMORY ());
+ ACE_CHECK_RETURN (Test::Session::_nil ());
+ PortableServer::ServantBase_var transfer_ownership (session_impl);
+
+ return session_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+void
+Peer::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Peer::shutdown, waiting for threads\n"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Peer::shutdown, shutting down ORB\n"));
+ this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER);
+}
diff --git a/TAO/tests/Big_Oneways/Peer.h b/TAO/tests/Big_Oneways/Peer.h
new file mode 100644
index 00000000000..2a142889512
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Peer.h
@@ -0,0 +1,39 @@
+//
+// $Id$
+//
+
+#ifndef BIG_ONEWAYS_PEER_H
+#define BIG_ONEWAYS_PEER_H
+#include /**/ "ace/pre.h"
+
+#include "TestS.h"
+
+/// Implement the Test::Peer interface
+class Peer
+ : public virtual POA_Test::Peer
+{
+public:
+ /// Constructor
+ Peer (CORBA::ORB_ptr orb);
+
+ /// Destructor
+ virtual ~Peer (void);
+
+ // = The skeleton methods
+ virtual Test::Session_ptr create_session (Test::Session_Control_ptr control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ CORBA::ULong peer_count
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+private:
+ /// Keep a pointer to the ORB to shutdown cleanly
+ CORBA::ORB_var orb_;
+};
+
+#include /**/ "ace/post.h"
+#endif /* BIG_ONEWAYS_PEER_H */
diff --git a/TAO/tests/Big_Oneways/README b/TAO/tests/Big_Oneways/README
new file mode 100644
index 00000000000..ed389e9009c
--- /dev/null
+++ b/TAO/tests/Big_Oneways/README
@@ -0,0 +1,21 @@
+/**
+
+@page Big_Oneways Test README File
+
+ This is a stress test for the non-blocking I/O features in the
+ORB. The test connects multiple peer processes together. Each
+process sends messages to all its peers, using multiple threads to
+generate the messages. Without non-blocking I/O the system soon
+deadlocks.
+
+ This is part of the regression testsuite for:
+
+http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=132
+
+ To run the test use the run_test.pl script:
+
+$ ./run_test.pl
+
+ the script returns 0 if the test was successful.
+
+*/
diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp
new file mode 100644
index 00000000000..e6f77c212c6
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session.cpp
@@ -0,0 +1,282 @@
+//
+// $Id$
+//
+#include "Session.h"
+#include "tao/debug.h"
+
+ACE_RCSID(Big_Oneways, Session, "$Id$")
+
+Session::Session (Test::Session_Control_ptr control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ CORBA::ULong peer_count)
+ : control_ (Test::Session_Control::_duplicate (control))
+ , running_ (0)
+ , payload_size_ (payload_size)
+ , thread_count_ (thread_count)
+ , message_count_ (message_count)
+ , active_thread_count_ (0)
+ , expected_messages_ (thread_count * message_count * (peer_count - 1))
+ , task_ (this)
+ , barrier_ (thread_count + 1)
+{
+}
+
+Session::~Session (void)
+{
+}
+
+int
+Session::svc (void)
+{
+ this->barrier_.wait ();
+ CORBA::ULong i = 0;
+
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ // Use the same payload over and over
+ Test::Payload payload (this->payload_size_);
+ payload.length (this->payload_size_);
+
+ for (CORBA::ULong j = 0; j != this->payload_size_; ++j)
+ {
+ payload[j] = j % 256;
+ }
+
+ // Get the number of peers just once.
+ CORBA::ULong session_count =
+ this->other_sessions_.length ();
+
+ this->validate_connections (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ for (; i != this->message_count_; ++i)
+ {
+#if 0
+ if (i % 500 == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Session::svc, "
+ "sending message %d\n",
+ i));
+ }
+#endif /* 0 */
+ for (CORBA::ULong j = 0; j != session_count; ++j)
+ {
+ this->other_sessions_[j]->receive_payload (payload
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+ this->active_thread_count_--;
+ if (this->more_work ())
+ {
+ return 0;
+ }
+ }
+ this->terminate (1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%P|%t) ERROR: Session::svc, "
+ "send %d messages out of %d\n",
+ i, message_count_));
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Session::svc - ");
+ return -1;
+ }
+ ACE_ENDTRY;
+ ACE_CHECK_RETURN (-1);
+
+ this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+void
+Session::validate_connections (ACE_ENV_SINGLE_ARG_DECL)
+{
+ CORBA::ULong session_count =
+ this->other_sessions_.length ();
+ for (CORBA::ULong i = 0; i != 100; ++i)
+ {
+ for (CORBA::ULong j = 0; j != session_count; ++j)
+ {
+ ACE_TRY
+ {
+ this->other_sessions_[j]->ping (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+ }
+ }
+}
+
+void
+Session::start (const Test::Session_List &other_sessions
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Already_Running,
+ Test::No_Peers))
+{
+ if (other_sessions.length () == 0)
+ ACE_THROW (Test::No_Peers ());
+
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (this->running_)
+ ACE_THROW (Test::Already_Running ());
+
+ this->other_sessions_ = other_sessions;
+
+ for (CORBA::ULong i = 0; i != this->thread_count_; ++i)
+ {
+ // Increase the reference count because the new thread will have
+ // access to this object....
+ ACE_TRY
+ {
+ this->_add_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (this->task_.activate (
+ THR_NEW_LWP | THR_JOINABLE, 1, 1) == -1)
+ {
+ this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ else
+ {
+ this->running_ = 1;
+ this->active_thread_count_++;
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Session::start, ignored");
+ }
+ ACE_ENDTRY;
+ }
+
+ if (this->active_thread_count_ != this->thread_count_)
+ return;
+ }
+
+ this->validate_connections (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->barrier_.wait ();
+
+ if (this->running_ != 0)
+ return;
+
+ /// None of the threads are running, this session is useless at
+ /// this point, report the problem and destroy the local objects
+ this->terminate (0 ACE_ENV_ARG_PARAMETER);
+}
+
+void
+Session::ping (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException))
+{
+}
+
+void
+Session::receive_payload (const Test::Payload &the_payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (the_payload.length () != this->payload_size_)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: (%P|%t) Session::receive_payload, "
+ "unexpected payload size (%d != %d)\n",
+ the_payload.length (), this->payload_size_));
+ }
+
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->expected_messages_--;
+
+#if 0
+ int verbose = 0;
+ verbose = this->expected_messages_ % 500 == 0;
+ if (this->expected_messages_ < 500)
+ verbose = (this->expected_messages_ % 100 == 0);
+ if (this->expected_messages_ < 100)
+ verbose = (this->expected_messages_ % 10 == 0);
+ if (this->expected_messages_ < 5)
+ verbose = 1;
+
+ if (verbose)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Session::receive_payload, "
+ "%d messages to go\n",
+ this->expected_messages_));
+ }
+#endif /* 0 */
+
+ if (this->more_work ())
+ return;
+ }
+ this->terminate (1 ACE_ENV_ARG_PARAMETER);
+}
+
+
+void
+Session::destroy (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // Make sure local resources are released
+
+ PortableServer::POA_var poa =
+ this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+}
+
+int
+Session::more_work (void) const
+{
+ if (this->expected_messages_ > 0
+ || this->active_thread_count_ > 0
+ || this->running_ == 0)
+ return 1;
+
+ return 0;
+}
+
+void
+Session::terminate (CORBA::Boolean success
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC (())
+{
+ // Make sure that global resources are released
+ ACE_TRY_EX(GLOBAL)
+ {
+ this->control_->session_finished (success
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK_EX(GLOBAL);
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Session::terminate, ignored");
+ }
+ ACE_ENDTRY;
+
+}
diff --git a/TAO/tests/Big_Oneways/Session.h b/TAO/tests/Big_Oneways/Session.h
new file mode 100644
index 00000000000..ef408f81679
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session.h
@@ -0,0 +1,112 @@
+// -*- C++ -*-
+//
+// $Id$
+
+#ifndef BIG_ONEWAYS_SESSION_H
+#define BIG_ONEWAYS_SESSION_H
+
+#include /**/ "ace/pre.h"
+
+#include "TestS.h"
+#include "Session_Task.h"
+
+#if defined(ACE_HAS_THREADS)
+# include "ace/Barrier.h"
+#else
+# include "ace/Null_Barrier.h"
+#endif /* ACE_HAS_THREADS */
+
+/// Implement the Test::Session interface
+class Session
+ : public virtual POA_Test::Session
+{
+public:
+ /// Constructor
+ Session (Test::Session_Control_ptr control,
+ CORBA::ULong payload_size,
+ CORBA::ULong thread_count,
+ CORBA::ULong message_count,
+ CORBA::ULong peer_count);
+
+ /// Destructor
+ virtual ~Session (void);
+
+ /// Run one of the experiment threads
+ int svc (void);
+
+ // = The skeleton methods
+ virtual void start (const Test::Session_List &other_sessions
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException,
+ Test::Already_Running,
+ Test::No_Peers));
+
+ virtual void ping (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void receive_payload (const Test::Payload &the_payload
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ virtual void destroy (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+private:
+ /// Helper function used to report any problems and destroy local
+ /// resources
+ void terminate (CORBA::Boolean success
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC (());
+
+ /// Return 1 if all the work in this session has been completed
+ int more_work (void) const;
+
+ /// Make sure that all threads have connections avaiable to the
+ /// other sessions.
+ void validate_connections (ACE_ENV_SINGLE_ARG_DECL);
+
+private:
+ /// Synchronize the internal state
+ ACE_SYNCH_MUTEX mutex_;
+
+ /// Keep a reference to the Session_Control, this is used to report
+ /// when the test finishes.
+ Test::Session_Control_var control_;
+
+ /// Keep track of wether the test is running.
+ int running_;
+
+ /// The other session objects participating in the test
+ Test::Session_List other_sessions_;
+
+ /// Size of each message
+ CORBA::ULong payload_size_;
+
+ /// Number of threads
+ CORBA::ULong thread_count_;
+
+ /// Number of messages to send
+ CORBA::ULong message_count_;
+
+ /// The number of threads currently running, when this reaches 0 the
+ /// session destroys itself.
+ CORBA::ULong active_thread_count_;
+
+ /// Number of messages expected
+ CORBA::ULong expected_messages_;
+
+ /// Helper class to run svc() in a separate thread
+ Session_Task task_;
+
+ /// Barrier to start all threads simultaenously
+#if defined(ACE_HAS_THREADS)
+ typedef ACE_Thread_Barrier Barrier;
+#else
+ typedef ACE_Null_Barrier Barrier;
+#endif /* ACE_HAS_THREADS */
+ Barrier barrier_;
+};
+
+#include /**/ "ace/post.h"
+
+#endif /* BIG_ONEWAYS_SESSION_H */
diff --git a/TAO/tests/Big_Oneways/Session_Control.cpp b/TAO/tests/Big_Oneways/Session_Control.cpp
new file mode 100644
index 00000000000..12f3c2d7566
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session_Control.cpp
@@ -0,0 +1,75 @@
+//
+// $Id$
+//
+#include "Session_Control.h"
+
+ACE_RCSID(Big_Oneways, Session_Control, "$Id$")
+
+Session_Control::Session_Control (CORBA::ULong session_count)
+ : session_count_ (session_count)
+ , success_ (1)
+{
+}
+
+int
+Session_Control::all_sessions_finished (void) const
+{
+ return this->session_count_ == 0;
+}
+
+Session_Control::~Session_Control (void)
+{
+ if (this->session_count_ == 0 && this->success_)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Session_Control::~Session_control, "
+ "good, all sessions did finish\n"));
+ }
+ else if (session_count_ != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: (%P|%t) Session_Control::~Session_control, "
+ "%d sessions did not finish\n",
+ this->session_count_));
+ }
+ else
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: (%P|%t) Session_Control::~Session_control, "
+ "some sessions failed\n"));
+ }
+}
+
+void
+Session_Control::session_finished (CORBA::Boolean success
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ if (this->session_count_ == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: (%P|%t) Session_Control::session_finished, "
+ "unexpected callback\n"));
+ }
+ if (success == 0)
+ this->success_ = 0;
+
+ this->session_count_--;
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) Session_Control::session_finished, "
+ "%d sessions to go\n",
+ this->session_count_));
+ if (session_count_ == 0)
+ {
+ PortableServer::POA_var poa =
+ this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var oid =
+ poa->servant_to_id (this ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+}
diff --git a/TAO/tests/Big_Oneways/Session_Control.h b/TAO/tests/Big_Oneways/Session_Control.h
new file mode 100644
index 00000000000..2f0232be6d8
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session_Control.h
@@ -0,0 +1,45 @@
+//
+// $Id$
+//
+
+#ifndef BIG_ONEWAYS_SESSION_CONTROL_H
+#define BIG_ONEWAYS_SESSION_CONTROL_H
+#include /**/ "ace/pre.h"
+
+#include "TestS.h"
+
+/// Implement the Test::Session_Control interface
+class Session_Control
+ : public virtual POA_Test::Session_Control
+{
+public:
+ /// Constructor
+ /**
+ * @param session_count Number of session objects in the experiment.
+ */
+ Session_Control (CORBA::ULong session_count);
+
+ /// Destructor
+ virtual ~Session_Control (void);
+
+ /// Return 1 when all sessions have finished
+ int all_sessions_finished (void) const;
+
+ // = The skeleton methods
+ virtual void session_finished (CORBA::Boolean success
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+private:
+ /// Synchronize the internal state
+ ACE_SYNCH_MUTEX mutex_;
+
+ /// The type of test
+ CORBA::ULong session_count_;
+
+ /// Set to falso if any session reported a failure
+ CORBA::Boolean success_;
+};
+
+#include /**/ "ace/post.h"
+#endif /* BIG_ONEWAYS_SESSION_CONTROL_H */
diff --git a/TAO/tests/Big_Oneways/Session_Task.cpp b/TAO/tests/Big_Oneways/Session_Task.cpp
new file mode 100644
index 00000000000..bd992a66e5d
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session_Task.cpp
@@ -0,0 +1,19 @@
+//
+// $Id$
+//
+
+#include "Session_Task.h"
+#include "Session.h"
+
+ACE_RCSID(Big_Oneways, Session_Task, "$Id$")
+
+Session_Task::Session_Task (Session *session)
+ : session_ (session)
+{
+}
+
+int
+Session_Task::svc (void)
+{
+ return this->session_->svc ();
+}
diff --git a/TAO/tests/Big_Oneways/Session_Task.h b/TAO/tests/Big_Oneways/Session_Task.h
new file mode 100644
index 00000000000..6f73ee069b0
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Session_Task.h
@@ -0,0 +1,33 @@
+//
+// $Id$
+//
+
+#ifndef BIG_ONEWAYS_SESSION_TASK_H
+#define BIG_ONEWAYS_SESSION_TASK_H
+#include /**/ "ace/pre.h"
+
+#include "ace/Task.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class Session;
+
+/// Implement a Task to run the experiments using multiple threads.
+class Session_Task : public ACE_Task_Base
+{
+public:
+ /// Constructor
+ Session_Task (Session *session);
+
+ /// Thread entry point
+ int svc (void);
+
+private:
+ /// Reference to the test interface
+ Session *session_;
+};
+
+#include /**/ "ace/post.h"
+#endif /* BIG_ONEWAYS_SESSION_TASK_H */
diff --git a/TAO/tests/Big_Oneways/Test.idl b/TAO/tests/Big_Oneways/Test.idl
new file mode 100644
index 00000000000..60065cf85fc
--- /dev/null
+++ b/TAO/tests/Big_Oneways/Test.idl
@@ -0,0 +1,83 @@
+//
+// $Id$
+//
+
+module Test
+{
+ /// The message type, just used to send a lot of data on each
+ /// request
+ typedef sequence<octet> Payload;
+
+ /// A session is a single instance of the test
+ interface Session;
+ typedef sequence<Session> Session_List;
+
+ /// A session control is used to determine if all the Session in an
+ /// experiment have finished.
+ /**
+ * @param success If false then the session failed, the experiment
+ * is successful only if all sessions finish successfully
+ */
+ interface Session_Control
+ {
+ void session_finished (in boolean success);
+ };
+
+ /// A Peer is used to create sessions
+ interface Peer
+ {
+ /// Create a new session
+ /**
+ * @param payload_size The size of each message
+ * @param thread_count The number of threads that each session
+ * must create
+ * @param message_count How many messages does each thread send.
+ */
+ Session create_session (in Session_Control control,
+ in unsigned long payload_size,
+ in unsigned long thread_count,
+ in unsigned long message_count,
+ in unsigned long peer_count);
+
+ /// Shutdown the peer
+ oneway void shutdown ();
+ };
+
+ /// The Session already has an experiment running.
+ exception Already_Running {};
+
+ /// The experiment requires at least two Sessions
+ exception No_Peers {};
+
+ interface Session
+ {
+ /// Start the test, send messages to all the peers
+ /**
+ * @param other_sessions The list of sessions participating in the
+ * experiment, this list must not include the session
+ * receiving the start() call.
+ */
+ void start (in Session_List other_sessions)
+ raises (Already_Running, No_Peers);
+
+ /// Ping the session, used to validate all connections
+ void ping ();
+
+ /// Receive the payload
+ oneway void receive_payload (in Payload the_payload);
+
+ /// Destroy the Session object
+ void destroy ();
+ };
+
+ interface Coordinator
+ {
+ /// Add a new peer.
+ /**
+ * The coordinator starts the test by calling <send_oneways> on
+ * all peers. How does it decide to do that is application
+ * specific.
+ */
+ void add_peer (in Peer the_peer);
+ };
+};
diff --git a/TAO/tests/Big_Oneways/client.cpp b/TAO/tests/Big_Oneways/client.cpp
new file mode 100644
index 00000000000..13465fab43c
--- /dev/null
+++ b/TAO/tests/Big_Oneways/client.cpp
@@ -0,0 +1,118 @@
+// $Id$
+
+#include "Peer.h"
+#include "ace/Get_Opt.h"
+#include "ace/Thread_Manager.h"
+
+ACE_RCSID(Big_Oneways, client, "$Id$")
+
+const char *ior = "file://test.ior";
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "k:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'k':
+ ior = get_opts.opt_arg ();
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-k <ior> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Panic got a nil RootPOA\n"),
+ 1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ CORBA::Object_var tmp =
+ orb->string_to_object(ior ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ Test::Coordinator_var coordinator =
+ Test::Coordinator::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (coordinator.in ()))
+ {
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "Nil coordinator reference <%s>\n",
+ ior),
+ 1);
+ }
+
+ Peer *peer_impl = 0;
+ ACE_NEW_RETURN (peer_impl,
+ Peer (orb.in ()),
+ 1);
+ PortableServer::ServantBase_var peer_owner_transfer(peer_impl);
+
+ Test::Peer_var peer =
+ peer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ coordinator->add_peer (peer.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->run (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Wait for all the threads.
+ ACE_Thread_Manager::instance ()->wait ();
+
+ root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
diff --git a/TAO/tests/Big_Oneways/run_test.pl b/TAO/tests/Big_Oneways/run_test.pl
new file mode 100755
index 00000000000..45175d43e98
--- /dev/null
+++ b/TAO/tests/Big_Oneways/run_test.pl
@@ -0,0 +1,88 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id$
+# -*- perl -*-
+
+use lib '../../../bin';
+use PerlACE::Run_Test;
+use Getopt::Std;
+
+local ($opt_i, $opt_b);
+
+if (!getopts ('i:b:')) {
+ print "Usage: run_test.pl [-b payload_size] [-i iterations]\n";
+ exit 1;
+}
+
+my $server_args = " -p 3";
+if (defined $opt_i) {
+ $server_args .= " -i ".$opt_i;
+}
+if (defined $opt_b) {
+ $server_args .= " -b ".$opt_b;
+}
+
+$iorfile = PerlACE::LocalFile ("server.ior");
+
+$status = 0;
+unlink $iorfile;
+if (PerlACE::is_vxworks_test()) {
+ $SV = new PerlACE::ProcessVX ("server", "-o server.ior $server_args");
+}
+else {
+ $SV = new PerlACE::Process ("server", "-o $iorfile $server_args");
+}
+$CL1 = new PerlACE::Process ("client", " -k file://$iorfile");
+$CL2 = new PerlACE::Process ("client", " -k file://$iorfile");
+$CL3 = new PerlACE::Process ("client", " -k file://$iorfile");
+
+$server = $SV->Spawn ();
+
+if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ exit 1;
+}
+
+if (PerlACE::waitforfile_timed ($iorfile, 15) == -1) {
+ print STDERR "ERROR: cannot find file <$iorfile>\n";
+ $SV->Kill (); $SV->TimedWait (1);
+ exit 1;
+}
+
+$CL1->Spawn ();
+$CL2->Spawn ();
+$CL3->Spawn ();
+
+$client1 = $CL1->WaitKill (300);
+
+if ($client1 != 0) {
+ print STDERR "ERROR: client 1 returned $client1\n";
+ $status = 1;
+}
+
+$client2 = $CL2->WaitKill (300);
+
+if ($client2 != 0) {
+ print STDERR "ERROR: client 2 returned $client2\n";
+ $status = 1;
+}
+
+$client3 = $CL3->WaitKill (300);
+
+if ($client3 != 0) {
+ print STDERR "ERROR: client 3 returned $client3\n";
+ $status = 1;
+}
+
+$server = $SV->WaitKill (5);
+
+if ($server != 0) {
+ print STDERR "ERROR: server returned $server\n";
+ $status = 1;
+}
+
+unlink $iorfile;
+
+exit $status;
diff --git a/TAO/tests/Big_Oneways/server.cpp b/TAO/tests/Big_Oneways/server.cpp
new file mode 100644
index 00000000000..9aa9d7a7522
--- /dev/null
+++ b/TAO/tests/Big_Oneways/server.cpp
@@ -0,0 +1,230 @@
+// $Id$
+
+#include "Coordinator.h"
+#include "Session_Control.h"
+#include "ace/Get_Opt.h"
+#include "ace/OS_NS_stdio.h"
+
+ACE_RCSID(Big_Oneways, server, "$Id$")
+
+const char *ior_output_file = "test.ior";
+CORBA::ULong peer_count = 4;
+CORBA::ULong payload_size = 1024;
+CORBA::ULong message_count = 1000;
+CORBA::ULong thread_count = 4;
+
+int
+parse_args (int argc, char *argv[])
+{
+ ACE_Get_Opt get_opts (argc, argv, "o:p:b:i:n:");
+ int c;
+
+ while ((c = get_opts ()) != -1)
+ switch (c)
+ {
+ case 'o':
+ ior_output_file = get_opts.opt_arg ();
+ break;
+
+ case 'p':
+ peer_count = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'b':
+ payload_size = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'i':
+ message_count = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case 'n':
+ thread_count = ACE_OS::atoi (get_opts.opt_arg ());
+ break;
+
+ case '?':
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "usage: %s "
+ "-o <iorfile> "
+ "-p <peer_count> "
+ "-b <payload_size> "
+ "-i <message_count> "
+ "-n <thread_count> "
+ "\n",
+ argv [0]),
+ -1);
+ }
+ // Indicates sucessful parsing of the command line
+ return 0;
+}
+
+int
+main (int argc, char *argv[])
+{
+ ACE_TRY_NEW_ENV
+ {
+ CORBA::ORB_var orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var poa_object =
+ orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ PortableServer::POA_var root_poa =
+ PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (CORBA::is_nil (poa_object.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Panic got a nil RootPOA\n"),
+ 1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (parse_args (argc, argv) != 0)
+ return 1;
+
+ Coordinator *coordinator_impl = 0;
+ ACE_NEW_RETURN (coordinator_impl,
+ Coordinator (peer_count),
+ 1);
+
+ Test::Coordinator_var coordinator =
+ coordinator_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::String_var ior =
+ orb->object_to_string (coordinator.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // If the ior_output_file exists, output the ior to it
+ FILE *output_file= ACE_OS::fopen (ior_output_file, "w");
+ if (output_file == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Cannot open output file for writing IOR: %s",
+ ior_output_file),
+ 1);
+ ACE_OS::fprintf (output_file, "%s", ior.in ());
+ ACE_OS::fclose (output_file);
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "Waiting for peers . . . "));
+ for (int i = 0;
+ i != 60 && !coordinator_impl->has_all_peers ();
+ ++i)
+ {
+ ACE_Time_Value tv (1, 0);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_DEBUG ((LM_DEBUG, "done.\n"));
+
+ if (!coordinator_impl->has_all_peers ())
+ {
+ ACE_ERROR ((LM_DEBUG,
+ "ERROR: timeout, some peers failed to register\n"));
+ return 1;
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Building session list . . . "));
+
+ Session_Control *session_control_impl = 0;
+ ACE_NEW_RETURN (session_control_impl,
+ Session_Control (peer_count),
+ 1);
+
+ Test::Session_Control_var session_control =
+ session_control_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ Test::Session_List session_list;
+ coordinator_impl->create_session_list (session_control.in (),
+ payload_size,
+ thread_count,
+ message_count,
+ session_list
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_ASSERT (session_list.length () == peer_count);
+
+ ACE_DEBUG ((LM_DEBUG, "done.\n"));
+ ACE_DEBUG ((LM_DEBUG, "Giving start signal . . . "));
+
+ CORBA::ULong j;
+ for (j = 0; j != peer_count; ++j)
+ {
+ // Make a copy of the sessions, excluding the j-th element
+ Test::Session_List other_sessions (peer_count - 1);
+ other_sessions.length (peer_count - 1);
+ CORBA::ULong count = 0;
+ for (CORBA::ULong k = 0; k != peer_count; ++k)
+ {
+ if (k == j)
+ continue;
+ other_sessions[count++] =
+ Test::Session::_duplicate (session_list[k]);
+ }
+
+ session_list[j]->start (other_sessions
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ }
+
+ ACE_DEBUG ((LM_DEBUG ,"done\n"));
+ ACE_DEBUG ((LM_DEBUG, "Waiting for sessions to finish. . . \n"));
+ for (int k = 0;
+ k != 300 && !session_control_impl->all_sessions_finished ();
+ ++k)
+ {
+ ACE_Time_Value tv (1, 0);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ if (!session_control_impl->all_sessions_finished ())
+ {
+ ACE_ERROR ((LM_ERROR,
+ "ERROR: timeout waiting for sessions\n"));
+ return 1;
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "All sessions finished, destroy session list . . .\n"));
+
+ for (j = 0; j != peer_count; ++j)
+ {
+ session_list[j]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Shutdown all peers . . .\n"));
+
+ coordinator_impl->shutdown_all_peers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "Shutdown poa and orb . . .\n"));
+
+ root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
+ "Exception caught:");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}