diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
commit | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (patch) | |
tree | 197c810e5f5bce17b1233a7cb8d7b50c0bcd25e2 /TAO/tests/Big_Oneways | |
parent | 6b846cf03c0bcbd8c276cb0af61a181e5f98eaae (diff) | |
download | ATCD-3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c.tar.gz |
Repo restructuring
Diffstat (limited to 'TAO/tests/Big_Oneways')
-rw-r--r-- | TAO/tests/Big_Oneways/.cvsignore | 2 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Big_Oneways.mpc | 21 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Coordinator.cpp | 83 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Coordinator.h | 55 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Peer.cpp | 51 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Peer.h | 39 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/README | 21 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session.cpp | 282 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session.h | 112 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session_Control.cpp | 75 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session_Control.h | 45 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session_Task.cpp | 19 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Session_Task.h | 33 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/Test.idl | 83 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/client.cpp | 118 | ||||
-rwxr-xr-x | TAO/tests/Big_Oneways/run_test.pl | 88 | ||||
-rw-r--r-- | TAO/tests/Big_Oneways/server.cpp | 230 |
17 files changed, 1357 insertions, 0 deletions
diff --git a/TAO/tests/Big_Oneways/.cvsignore b/TAO/tests/Big_Oneways/.cvsignore new file mode 100644 index 00000000000..f2ad85300eb --- /dev/null +++ b/TAO/tests/Big_Oneways/.cvsignore @@ -0,0 +1,2 @@ +client +server diff --git a/TAO/tests/Big_Oneways/Big_Oneways.mpc b/TAO/tests/Big_Oneways/Big_Oneways.mpc new file mode 100644 index 00000000000..5cf458a0b1e --- /dev/null +++ b/TAO/tests/Big_Oneways/Big_Oneways.mpc @@ -0,0 +1,21 @@ +// -*- MPC -*- +// $Id$ + +project(*Server): taoserver { + Source_Files { + Coordinator.cpp + Session_Control.cpp + server.cpp + } +} + +project(*Client): taoexe, portableserver { + after += *Server + Source_Files { + Peer.cpp + Session.cpp + Session_Task.cpp + client.cpp + } +} + diff --git a/TAO/tests/Big_Oneways/Coordinator.cpp b/TAO/tests/Big_Oneways/Coordinator.cpp new file mode 100644 index 00000000000..6b6b332a9cc --- /dev/null +++ b/TAO/tests/Big_Oneways/Coordinator.cpp @@ -0,0 +1,83 @@ +// +// $Id$ +// +#include "Coordinator.h" + +ACE_RCSID(Big_Oneways, Coordinator, "$Id$") + +Coordinator::Coordinator (CORBA::ULong peer_count) + : peers_ (0) + , peer_count_ (0) + , peer_max_ (peer_count) +{ + ACE_NEW (this->peers_, Test::Peer_var[this->peer_max_]); +} + +Coordinator::~Coordinator (void) +{ + delete[] this->peers_; +} + +int +Coordinator::has_all_peers (void) const +{ + return this->peer_count_ == this->peer_max_; +} + +void +Coordinator::create_session_list (Test::Session_Control_ptr session_control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + Test::Session_List &session_list + ACE_ENV_ARG_DECL) +{ + session_list.length (this->peer_count_); + CORBA::ULong count = 0; + for (Test::Peer_var *i = this->peers_; + i != this->peers_ + this->peer_count_; + ++i) + { + session_list[count++] = + (*i)->create_session (session_control, + payload_size, + thread_count, + message_count, + this->peer_count_ + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } +} + +void +Coordinator::shutdown_all_peers (ACE_ENV_SINGLE_ARG_DECL) +{ + for (Test::Peer_var *i = this->peers_; + i != this->peers_ + this->peer_count_; + ++i) + { + ACE_TRY + { + (*i)->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Coordinator::shutdown, ignored"); + } + ACE_ENDTRY; + } +} + +void +Coordinator::add_peer (Test::Peer_ptr peer + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (this->peer_count_ >= this->peer_max_) + return; + + this->peers_[this->peer_count_++] = + Test::Peer::_duplicate (peer); +} diff --git a/TAO/tests/Big_Oneways/Coordinator.h b/TAO/tests/Big_Oneways/Coordinator.h new file mode 100644 index 00000000000..7879e47752a --- /dev/null +++ b/TAO/tests/Big_Oneways/Coordinator.h @@ -0,0 +1,55 @@ +// +// $Id$ +// + +#ifndef BIG_ONEWAYS_COORDINATOR_H +#define BIG_ONEWAYS_COORDINATOR_H +#include /**/ "ace/pre.h" + +#include "TestS.h" + +/// Implement the Test::Coordinator interface +class Coordinator + : public virtual POA_Test::Coordinator +{ +public: + /// Constructor + Coordinator (CORBA::ULong peer_count); + + + /// Check if all the peers have registered already + int has_all_peers (void) const; + + /// Check a session on each peer + void create_session_list (Test::Session_Control_ptr session_control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + Test::Session_List &session_list + ACE_ENV_ARG_DECL); + + /// Shutdown all the peers + void shutdown_all_peers (ACE_ENV_SINGLE_ARG_DECL); + + // = The skeleton methods + virtual void add_peer (Test::Peer_ptr peer + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +protected: + /// Protected Destructor, call _remove_ref() instead + virtual ~Coordinator (void); + +private: + /// Store a reference to each peer + Test::Peer_var *peers_; + + /// Current number of elements in the array + size_t peer_count_; + + /// Array's capacity + size_t peer_max_; +}; + +#include /**/ "ace/post.h" +#endif /* BIG_ONEWAYS_COORDINATOR_H */ diff --git a/TAO/tests/Big_Oneways/Peer.cpp b/TAO/tests/Big_Oneways/Peer.cpp new file mode 100644 index 00000000000..b93f0e3f7b2 --- /dev/null +++ b/TAO/tests/Big_Oneways/Peer.cpp @@ -0,0 +1,51 @@ +// +// $Id$ +// +#include "Peer.h" +#include "Session.h" + +ACE_RCSID(Big_Oneways, Peer, "$Id$") + +Peer::Peer (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +Peer::~Peer (void) +{ +} + +Test::Session_ptr +Peer::create_session (Test::Session_Control_ptr control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + CORBA::ULong peer_count + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + Session *session_impl = 0; + ACE_NEW_THROW_EX (session_impl, + Session (control, + payload_size, + thread_count, + message_count, + peer_count), + CORBA::NO_MEMORY ()); + ACE_CHECK_RETURN (Test::Session::_nil ()); + PortableServer::ServantBase_var transfer_ownership (session_impl); + + return session_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +Peer::shutdown (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Peer::shutdown, waiting for threads\n")); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Peer::shutdown, shutting down ORB\n")); + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); +} diff --git a/TAO/tests/Big_Oneways/Peer.h b/TAO/tests/Big_Oneways/Peer.h new file mode 100644 index 00000000000..2a142889512 --- /dev/null +++ b/TAO/tests/Big_Oneways/Peer.h @@ -0,0 +1,39 @@ +// +// $Id$ +// + +#ifndef BIG_ONEWAYS_PEER_H +#define BIG_ONEWAYS_PEER_H +#include /**/ "ace/pre.h" + +#include "TestS.h" + +/// Implement the Test::Peer interface +class Peer + : public virtual POA_Test::Peer +{ +public: + /// Constructor + Peer (CORBA::ORB_ptr orb); + + /// Destructor + virtual ~Peer (void); + + // = The skeleton methods + virtual Test::Session_ptr create_session (Test::Session_Control_ptr control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + CORBA::ULong peer_count + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + /// Keep a pointer to the ORB to shutdown cleanly + CORBA::ORB_var orb_; +}; + +#include /**/ "ace/post.h" +#endif /* BIG_ONEWAYS_PEER_H */ diff --git a/TAO/tests/Big_Oneways/README b/TAO/tests/Big_Oneways/README new file mode 100644 index 00000000000..ed389e9009c --- /dev/null +++ b/TAO/tests/Big_Oneways/README @@ -0,0 +1,21 @@ +/** + +@page Big_Oneways Test README File + + This is a stress test for the non-blocking I/O features in the +ORB. The test connects multiple peer processes together. Each +process sends messages to all its peers, using multiple threads to +generate the messages. Without non-blocking I/O the system soon +deadlocks. + + This is part of the regression testsuite for: + +http://ace.cs.wustl.edu/bugzilla/show_bug.cgi?id=132 + + To run the test use the run_test.pl script: + +$ ./run_test.pl + + the script returns 0 if the test was successful. + +*/ diff --git a/TAO/tests/Big_Oneways/Session.cpp b/TAO/tests/Big_Oneways/Session.cpp new file mode 100644 index 00000000000..e6f77c212c6 --- /dev/null +++ b/TAO/tests/Big_Oneways/Session.cpp @@ -0,0 +1,282 @@ +// +// $Id$ +// +#include "Session.h" +#include "tao/debug.h" + +ACE_RCSID(Big_Oneways, Session, "$Id$") + +Session::Session (Test::Session_Control_ptr control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + CORBA::ULong peer_count) + : control_ (Test::Session_Control::_duplicate (control)) + , running_ (0) + , payload_size_ (payload_size) + , thread_count_ (thread_count) + , message_count_ (message_count) + , active_thread_count_ (0) + , expected_messages_ (thread_count * message_count * (peer_count - 1)) + , task_ (this) + , barrier_ (thread_count + 1) +{ +} + +Session::~Session (void) +{ +} + +int +Session::svc (void) +{ + this->barrier_.wait (); + CORBA::ULong i = 0; + + ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + // Use the same payload over and over + Test::Payload payload (this->payload_size_); + payload.length (this->payload_size_); + + for (CORBA::ULong j = 0; j != this->payload_size_; ++j) + { + payload[j] = j % 256; + } + + // Get the number of peers just once. + CORBA::ULong session_count = + this->other_sessions_.length (); + + this->validate_connections (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + for (; i != this->message_count_; ++i) + { +#if 0 + if (i % 500 == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Session::svc, " + "sending message %d\n", + i)); + } +#endif /* 0 */ + for (CORBA::ULong j = 0; j != session_count; ++j) + { + this->other_sessions_[j]->receive_payload (payload + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + } + + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + this->active_thread_count_--; + if (this->more_work ()) + { + return 0; + } + } + this->terminate (1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) ERROR: Session::svc, " + "send %d messages out of %d\n", + i, message_count_)); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Session::svc - "); + return -1; + } + ACE_ENDTRY; + ACE_CHECK_RETURN (-1); + + this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + return 0; +} + +void +Session::validate_connections (ACE_ENV_SINGLE_ARG_DECL) +{ + CORBA::ULong session_count = + this->other_sessions_.length (); + for (CORBA::ULong i = 0; i != 100; ++i) + { + for (CORBA::ULong j = 0; j != session_count; ++j) + { + ACE_TRY + { + this->other_sessions_[j]->ping (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + } + } +} + +void +Session::start (const Test::Session_List &other_sessions + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + Test::Already_Running, + Test::No_Peers)) +{ + if (other_sessions.length () == 0) + ACE_THROW (Test::No_Peers ()); + + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + if (this->running_) + ACE_THROW (Test::Already_Running ()); + + this->other_sessions_ = other_sessions; + + for (CORBA::ULong i = 0; i != this->thread_count_; ++i) + { + // Increase the reference count because the new thread will have + // access to this object.... + ACE_TRY + { + this->_add_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (this->task_.activate ( + THR_NEW_LWP | THR_JOINABLE, 1, 1) == -1) + { + this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + { + this->running_ = 1; + this->active_thread_count_++; + } + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Session::start, ignored"); + } + ACE_ENDTRY; + } + + if (this->active_thread_count_ != this->thread_count_) + return; + } + + this->validate_connections (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->barrier_.wait (); + + if (this->running_ != 0) + return; + + /// None of the threads are running, this session is useless at + /// this point, report the problem and destroy the local objects + this->terminate (0 ACE_ENV_ARG_PARAMETER); +} + +void +Session::ping (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void +Session::receive_payload (const Test::Payload &the_payload + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (the_payload.length () != this->payload_size_) + { + ACE_ERROR ((LM_ERROR, + "ERROR: (%P|%t) Session::receive_payload, " + "unexpected payload size (%d != %d)\n", + the_payload.length (), this->payload_size_)); + } + + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->expected_messages_--; + +#if 0 + int verbose = 0; + verbose = this->expected_messages_ % 500 == 0; + if (this->expected_messages_ < 500) + verbose = (this->expected_messages_ % 100 == 0); + if (this->expected_messages_ < 100) + verbose = (this->expected_messages_ % 10 == 0); + if (this->expected_messages_ < 5) + verbose = 1; + + if (verbose) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Session::receive_payload, " + "%d messages to go\n", + this->expected_messages_)); + } +#endif /* 0 */ + + if (this->more_work ()) + return; + } + this->terminate (1 ACE_ENV_ARG_PARAMETER); +} + + +void +Session::destroy (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // Make sure local resources are released + + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var oid = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +int +Session::more_work (void) const +{ + if (this->expected_messages_ > 0 + || this->active_thread_count_ > 0 + || this->running_ == 0) + return 1; + + return 0; +} + +void +Session::terminate (CORBA::Boolean success + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (()) +{ + // Make sure that global resources are released + ACE_TRY_EX(GLOBAL) + { + this->control_->session_finished (success + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK_EX(GLOBAL); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Session::terminate, ignored"); + } + ACE_ENDTRY; + +} diff --git a/TAO/tests/Big_Oneways/Session.h b/TAO/tests/Big_Oneways/Session.h new file mode 100644 index 00000000000..ef408f81679 --- /dev/null +++ b/TAO/tests/Big_Oneways/Session.h @@ -0,0 +1,112 @@ +// -*- C++ -*- +// +// $Id$ + +#ifndef BIG_ONEWAYS_SESSION_H +#define BIG_ONEWAYS_SESSION_H + +#include /**/ "ace/pre.h" + +#include "TestS.h" +#include "Session_Task.h" + +#if defined(ACE_HAS_THREADS) +# include "ace/Barrier.h" +#else +# include "ace/Null_Barrier.h" +#endif /* ACE_HAS_THREADS */ + +/// Implement the Test::Session interface +class Session + : public virtual POA_Test::Session +{ +public: + /// Constructor + Session (Test::Session_Control_ptr control, + CORBA::ULong payload_size, + CORBA::ULong thread_count, + CORBA::ULong message_count, + CORBA::ULong peer_count); + + /// Destructor + virtual ~Session (void); + + /// Run one of the experiment threads + int svc (void); + + // = The skeleton methods + virtual void start (const Test::Session_List &other_sessions + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + Test::Already_Running, + Test::No_Peers)); + + virtual void ping (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void receive_payload (const Test::Payload &the_payload + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void destroy (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + /// Helper function used to report any problems and destroy local + /// resources + void terminate (CORBA::Boolean success + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (()); + + /// Return 1 if all the work in this session has been completed + int more_work (void) const; + + /// Make sure that all threads have connections avaiable to the + /// other sessions. + void validate_connections (ACE_ENV_SINGLE_ARG_DECL); + +private: + /// Synchronize the internal state + ACE_SYNCH_MUTEX mutex_; + + /// Keep a reference to the Session_Control, this is used to report + /// when the test finishes. + Test::Session_Control_var control_; + + /// Keep track of wether the test is running. + int running_; + + /// The other session objects participating in the test + Test::Session_List other_sessions_; + + /// Size of each message + CORBA::ULong payload_size_; + + /// Number of threads + CORBA::ULong thread_count_; + + /// Number of messages to send + CORBA::ULong message_count_; + + /// The number of threads currently running, when this reaches 0 the + /// session destroys itself. + CORBA::ULong active_thread_count_; + + /// Number of messages expected + CORBA::ULong expected_messages_; + + /// Helper class to run svc() in a separate thread + Session_Task task_; + + /// Barrier to start all threads simultaenously +#if defined(ACE_HAS_THREADS) + typedef ACE_Thread_Barrier Barrier; +#else + typedef ACE_Null_Barrier Barrier; +#endif /* ACE_HAS_THREADS */ + Barrier barrier_; +}; + +#include /**/ "ace/post.h" + +#endif /* BIG_ONEWAYS_SESSION_H */ diff --git a/TAO/tests/Big_Oneways/Session_Control.cpp b/TAO/tests/Big_Oneways/Session_Control.cpp new file mode 100644 index 00000000000..12f3c2d7566 --- /dev/null +++ b/TAO/tests/Big_Oneways/Session_Control.cpp @@ -0,0 +1,75 @@ +// +// $Id$ +// +#include "Session_Control.h" + +ACE_RCSID(Big_Oneways, Session_Control, "$Id$") + +Session_Control::Session_Control (CORBA::ULong session_count) + : session_count_ (session_count) + , success_ (1) +{ +} + +int +Session_Control::all_sessions_finished (void) const +{ + return this->session_count_ == 0; +} + +Session_Control::~Session_Control (void) +{ + if (this->session_count_ == 0 && this->success_) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Session_Control::~Session_control, " + "good, all sessions did finish\n")); + } + else if (session_count_ != 0) + { + ACE_ERROR ((LM_ERROR, + "ERROR: (%P|%t) Session_Control::~Session_control, " + "%d sessions did not finish\n", + this->session_count_)); + } + else + { + ACE_ERROR ((LM_ERROR, + "ERROR: (%P|%t) Session_Control::~Session_control, " + "some sessions failed\n")); + } +} + +void +Session_Control::session_finished (CORBA::Boolean success + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + if (this->session_count_ == 0) + { + ACE_ERROR ((LM_ERROR, + "ERROR: (%P|%t) Session_Control::session_finished, " + "unexpected callback\n")); + } + if (success == 0) + this->success_ = 0; + + this->session_count_--; + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Session_Control::session_finished, " + "%d sessions to go\n", + this->session_count_)); + if (session_count_ == 0) + { + PortableServer::POA_var poa = + this->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var oid = + poa->servant_to_id (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (oid.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + +} diff --git a/TAO/tests/Big_Oneways/Session_Control.h b/TAO/tests/Big_Oneways/Session_Control.h new file mode 100644 index 00000000000..2f0232be6d8 --- /dev/null +++ b/TAO/tests/Big_Oneways/Session_Control.h @@ -0,0 +1,45 @@ +// +// $Id$ +// + +#ifndef BIG_ONEWAYS_SESSION_CONTROL_H +#define BIG_ONEWAYS_SESSION_CONTROL_H +#include /**/ "ace/pre.h" + +#include "TestS.h" + +/// Implement the Test::Session_Control interface +class Session_Control + : public virtual POA_Test::Session_Control +{ +public: + /// Constructor + /** + * @param session_count Number of session objects in the experiment. + */ + Session_Control (CORBA::ULong session_count); + + /// Destructor + virtual ~Session_Control (void); + + /// Return 1 when all sessions have finished + int all_sessions_finished (void) const; + + // = The skeleton methods + virtual void session_finished (CORBA::Boolean success + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + /// Synchronize the internal state + ACE_SYNCH_MUTEX mutex_; + + /// The type of test + CORBA::ULong session_count_; + + /// Set to falso if any session reported a failure + CORBA::Boolean success_; +}; + +#include /**/ "ace/post.h" +#endif /* BIG_ONEWAYS_SESSION_CONTROL_H */ diff --git a/TAO/tests/Big_Oneways/Session_Task.cpp b/TAO/tests/Big_Oneways/Session_Task.cpp new file mode 100644 index 00000000000..bd992a66e5d --- /dev/null +++ b/TAO/tests/Big_Oneways/Session_Task.cpp @@ -0,0 +1,19 @@ +// +// $Id$ +// + +#include "Session_Task.h" +#include "Session.h" + +ACE_RCSID(Big_Oneways, Session_Task, "$Id$") + +Session_Task::Session_Task (Session *session) + : session_ (session) +{ +} + +int +Session_Task::svc (void) +{ + return this->session_->svc (); +} diff --git a/TAO/tests/Big_Oneways/Session_Task.h b/TAO/tests/Big_Oneways/Session_Task.h new file mode 100644 index 00000000000..6f73ee069b0 --- /dev/null +++ b/TAO/tests/Big_Oneways/Session_Task.h @@ -0,0 +1,33 @@ +// +// $Id$ +// + +#ifndef BIG_ONEWAYS_SESSION_TASK_H +#define BIG_ONEWAYS_SESSION_TASK_H +#include /**/ "ace/pre.h" + +#include "ace/Task.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class Session; + +/// Implement a Task to run the experiments using multiple threads. +class Session_Task : public ACE_Task_Base +{ +public: + /// Constructor + Session_Task (Session *session); + + /// Thread entry point + int svc (void); + +private: + /// Reference to the test interface + Session *session_; +}; + +#include /**/ "ace/post.h" +#endif /* BIG_ONEWAYS_SESSION_TASK_H */ diff --git a/TAO/tests/Big_Oneways/Test.idl b/TAO/tests/Big_Oneways/Test.idl new file mode 100644 index 00000000000..60065cf85fc --- /dev/null +++ b/TAO/tests/Big_Oneways/Test.idl @@ -0,0 +1,83 @@ +// +// $Id$ +// + +module Test +{ + /// The message type, just used to send a lot of data on each + /// request + typedef sequence<octet> Payload; + + /// A session is a single instance of the test + interface Session; + typedef sequence<Session> Session_List; + + /// A session control is used to determine if all the Session in an + /// experiment have finished. + /** + * @param success If false then the session failed, the experiment + * is successful only if all sessions finish successfully + */ + interface Session_Control + { + void session_finished (in boolean success); + }; + + /// A Peer is used to create sessions + interface Peer + { + /// Create a new session + /** + * @param payload_size The size of each message + * @param thread_count The number of threads that each session + * must create + * @param message_count How many messages does each thread send. + */ + Session create_session (in Session_Control control, + in unsigned long payload_size, + in unsigned long thread_count, + in unsigned long message_count, + in unsigned long peer_count); + + /// Shutdown the peer + oneway void shutdown (); + }; + + /// The Session already has an experiment running. + exception Already_Running {}; + + /// The experiment requires at least two Sessions + exception No_Peers {}; + + interface Session + { + /// Start the test, send messages to all the peers + /** + * @param other_sessions The list of sessions participating in the + * experiment, this list must not include the session + * receiving the start() call. + */ + void start (in Session_List other_sessions) + raises (Already_Running, No_Peers); + + /// Ping the session, used to validate all connections + void ping (); + + /// Receive the payload + oneway void receive_payload (in Payload the_payload); + + /// Destroy the Session object + void destroy (); + }; + + interface Coordinator + { + /// Add a new peer. + /** + * The coordinator starts the test by calling <send_oneways> on + * all peers. How does it decide to do that is application + * specific. + */ + void add_peer (in Peer the_peer); + }; +}; diff --git a/TAO/tests/Big_Oneways/client.cpp b/TAO/tests/Big_Oneways/client.cpp new file mode 100644 index 00000000000..13465fab43c --- /dev/null +++ b/TAO/tests/Big_Oneways/client.cpp @@ -0,0 +1,118 @@ +// $Id$ + +#include "Peer.h" +#include "ace/Get_Opt.h" +#include "ace/Thread_Manager.h" + +ACE_RCSID(Big_Oneways, client, "$Id$") + +const char *ior = "file://test.ior"; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "k:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.opt_arg (); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <ior> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic got a nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + CORBA::Object_var tmp = + orb->string_to_object(ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + Test::Coordinator_var coordinator = + Test::Coordinator::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (coordinator.in ())) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "Nil coordinator reference <%s>\n", + ior), + 1); + } + + Peer *peer_impl = 0; + ACE_NEW_RETURN (peer_impl, + Peer (orb.in ()), + 1); + PortableServer::ServantBase_var peer_owner_transfer(peer_impl); + + Test::Peer_var peer = + peer_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + coordinator->add_peer (peer.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Wait for all the threads. + ACE_Thread_Manager::instance ()->wait (); + + root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/tests/Big_Oneways/run_test.pl b/TAO/tests/Big_Oneways/run_test.pl new file mode 100755 index 00000000000..45175d43e98 --- /dev/null +++ b/TAO/tests/Big_Oneways/run_test.pl @@ -0,0 +1,88 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../bin'; +use PerlACE::Run_Test; +use Getopt::Std; + +local ($opt_i, $opt_b); + +if (!getopts ('i:b:')) { + print "Usage: run_test.pl [-b payload_size] [-i iterations]\n"; + exit 1; +} + +my $server_args = " -p 3"; +if (defined $opt_i) { + $server_args .= " -i ".$opt_i; +} +if (defined $opt_b) { + $server_args .= " -b ".$opt_b; +} + +$iorfile = PerlACE::LocalFile ("server.ior"); + +$status = 0; +unlink $iorfile; +if (PerlACE::is_vxworks_test()) { + $SV = new PerlACE::ProcessVX ("server", "-o server.ior $server_args"); +} +else { + $SV = new PerlACE::Process ("server", "-o $iorfile $server_args"); +} +$CL1 = new PerlACE::Process ("client", " -k file://$iorfile"); +$CL2 = new PerlACE::Process ("client", " -k file://$iorfile"); +$CL3 = new PerlACE::Process ("client", " -k file://$iorfile"); + +$server = $SV->Spawn (); + +if ($server != 0) { + print STDERR "ERROR: server returned $server\n"; + exit 1; +} + +if (PerlACE::waitforfile_timed ($iorfile, 15) == -1) { + print STDERR "ERROR: cannot find file <$iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +$CL1->Spawn (); +$CL2->Spawn (); +$CL3->Spawn (); + +$client1 = $CL1->WaitKill (300); + +if ($client1 != 0) { + print STDERR "ERROR: client 1 returned $client1\n"; + $status = 1; +} + +$client2 = $CL2->WaitKill (300); + +if ($client2 != 0) { + print STDERR "ERROR: client 2 returned $client2\n"; + $status = 1; +} + +$client3 = $CL3->WaitKill (300); + +if ($client3 != 0) { + print STDERR "ERROR: client 3 returned $client3\n"; + $status = 1; +} + +$server = $SV->WaitKill (5); + +if ($server != 0) { + print STDERR "ERROR: server returned $server\n"; + $status = 1; +} + +unlink $iorfile; + +exit $status; diff --git a/TAO/tests/Big_Oneways/server.cpp b/TAO/tests/Big_Oneways/server.cpp new file mode 100644 index 00000000000..9aa9d7a7522 --- /dev/null +++ b/TAO/tests/Big_Oneways/server.cpp @@ -0,0 +1,230 @@ +// $Id$ + +#include "Coordinator.h" +#include "Session_Control.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_stdio.h" + +ACE_RCSID(Big_Oneways, server, "$Id$") + +const char *ior_output_file = "test.ior"; +CORBA::ULong peer_count = 4; +CORBA::ULong payload_size = 1024; +CORBA::ULong message_count = 1000; +CORBA::ULong thread_count = 4; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:p:b:i:n:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case 'p': + peer_count = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'b': + payload_size = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'i': + message_count = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'n': + thread_count = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> " + "-p <peer_count> " + "-b <payload_size> " + "-i <message_count> " + "-n <thread_count> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (poa_object.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic got a nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + Coordinator *coordinator_impl = 0; + ACE_NEW_RETURN (coordinator_impl, + Coordinator (peer_count), + 1); + + Test::Coordinator_var coordinator = + coordinator_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (coordinator.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // If the ior_output_file exists, output the ior to it + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Waiting for peers . . . ")); + for (int i = 0; + i != 60 && !coordinator_impl->has_all_peers (); + ++i) + { + ACE_Time_Value tv (1, 0); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_DEBUG ((LM_DEBUG, "done.\n")); + + if (!coordinator_impl->has_all_peers ()) + { + ACE_ERROR ((LM_DEBUG, + "ERROR: timeout, some peers failed to register\n")); + return 1; + } + + ACE_DEBUG ((LM_DEBUG, "Building session list . . . ")); + + Session_Control *session_control_impl = 0; + ACE_NEW_RETURN (session_control_impl, + Session_Control (peer_count), + 1); + + Test::Session_Control_var session_control = + session_control_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + Test::Session_List session_list; + coordinator_impl->create_session_list (session_control.in (), + payload_size, + thread_count, + message_count, + session_list + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ASSERT (session_list.length () == peer_count); + + ACE_DEBUG ((LM_DEBUG, "done.\n")); + ACE_DEBUG ((LM_DEBUG, "Giving start signal . . . ")); + + CORBA::ULong j; + for (j = 0; j != peer_count; ++j) + { + // Make a copy of the sessions, excluding the j-th element + Test::Session_List other_sessions (peer_count - 1); + other_sessions.length (peer_count - 1); + CORBA::ULong count = 0; + for (CORBA::ULong k = 0; k != peer_count; ++k) + { + if (k == j) + continue; + other_sessions[count++] = + Test::Session::_duplicate (session_list[k]); + } + + session_list[j]->start (other_sessions + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + } + + ACE_DEBUG ((LM_DEBUG ,"done\n")); + ACE_DEBUG ((LM_DEBUG, "Waiting for sessions to finish. . . \n")); + for (int k = 0; + k != 300 && !session_control_impl->all_sessions_finished (); + ++k) + { + ACE_Time_Value tv (1, 0); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + if (!session_control_impl->all_sessions_finished ()) + { + ACE_ERROR ((LM_ERROR, + "ERROR: timeout waiting for sessions\n")); + return 1; + } + + ACE_DEBUG ((LM_DEBUG, "All sessions finished, destroy session list . . .\n")); + + for (j = 0; j != peer_count; ++j) + { + session_list[j]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + } + + ACE_DEBUG ((LM_DEBUG, "Shutdown all peers . . .\n")); + + coordinator_impl->shutdown_all_peers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "Shutdown poa and orb . . .\n")); + + root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} |