summaryrefslogtreecommitdiff
path: root/tests/MT_NonBlocking_Connect_Test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tests/MT_NonBlocking_Connect_Test.cpp')
-rw-r--r--tests/MT_NonBlocking_Connect_Test.cpp409
1 files changed, 409 insertions, 0 deletions
diff --git a/tests/MT_NonBlocking_Connect_Test.cpp b/tests/MT_NonBlocking_Connect_Test.cpp
new file mode 100644
index 00000000000..2d94aa29707
--- /dev/null
+++ b/tests/MT_NonBlocking_Connect_Test.cpp
@@ -0,0 +1,409 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// tests
+//
+// = FILENAME
+// MT_NonBlocking_Connect_Test.cpp
+//
+// = DESCRIPTION
+//
+// This test makes non-blocking connects from multiple threads. Things are
+// complicated by the fact that after the connection is attempted reactor's
+// handle_events() is called and this leads to possible connection
+// completion in the other thread. This is similar to what TAO does for
+// oneway with SYNC_NONE sync scope policy.
+// The following reactors are tested: Select, TP, WFMO, and Dev Poll
+// (if enabled).
+//
+// = AUTHOR
+// Vladimir Zykov <vladimir.zykov@prismtech.com>
+//
+// ============================================================================
+
+#include "test_config.h"
+#include "ace/Reactor.h"
+#include "ace/Select_Reactor.h"
+#include "ace/TP_Reactor.h"
+#include "ace/WFMO_Reactor.h"
+#include "ace/Dev_Poll_Reactor.h"
+#include "ace/Svc_Handler.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Get_Opt.h"
+#include "ace/Task.h"
+#include "ace/Connector.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Thread_Mutex.h"
+
+ACE_RCSID(tests,
+ MT_NonBlocking_Connect_Test,
+ "$Id$")
+
+#if defined (ACE_HAS_THREADS)
+
+static int test_select_reactor = 1;
+static int test_tp_reactor = 1;
+static int test_wfmo_reactor = 1;
+static int test_dev_poll_reactor = 1;
+static int number_of_threads = 10;
+static int debug = 0;
+static int result = 0;
+
+static const ACE_TCHAR* hosts[] = {
+ ACE_TEXT ("www.russiantvguide.com:80"),
+ ACE_TEXT ("news.bbc.co.uk:80"),
+ ACE_TEXT ("www.cnn.com:80"),
+ ACE_TEXT ("www.waca.com.au:80"),
+ ACE_TEXT ("www.uganda.co.ug:80"),
+ ACE_TEXT ("www.cs.wustl.edu:80"),
+ ACE_TEXT ("www.dre.vanderbilt.edu:80"),
+ ACE_TEXT ("www.dhm.gov.np:80"),
+ ACE_TEXT ("www.msn.com:80"),
+ ACE_TEXT ("www.presidencymaldives.gov.mv:80")
+};
+
+class Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+{
+public:
+ Svc_Handler (void)
+ {
+ ACE_TEST_ASSERT (0);
+ }
+
+ Svc_Handler (ACE_Thread_Manager *);
+
+ int open (void *);
+
+ int close (u_long flags);
+
+ bool connected_;
+};
+
+Svc_Handler::Svc_Handler (ACE_Thread_Manager *)
+ : connected_ (false)
+{
+ this->reference_counting_policy ().value (
+ ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
+}
+
+int
+Svc_Handler::open (void *)
+{
+ this->connected_ = true;
+ return 0;
+}
+
+int
+Svc_Handler::close (u_long)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "%t close connection on handle %d.\n",
+ this->get_handle ()));
+ return 0;
+}
+
+template<class SVC_HANDLER>
+class Concurrency_Strategy :
+ public ACE_Concurrency_Strategy<SVC_HANDLER>
+{
+public:
+ virtual int activate_svc_handler (SVC_HANDLER *svc_handler, void *arg);
+};
+
+template<class SVC_HANDLER> int
+Concurrency_Strategy<SVC_HANDLER>::
+ activate_svc_handler (SVC_HANDLER *svc_handler, void *arg)
+{
+ // Every fourth connection fails.
+ static long count = 0;
+ if (++count % 4 == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%t connection on handle %d has artificially failed.\n",
+ svc_handler->get_handle ()));
+ return -1;
+ }
+ else
+ {
+ return ACE_Concurrency_Strategy<SVC_HANDLER>::activate_svc_handler (
+ svc_handler, arg);
+ }
+}
+
+typedef ACE_Creation_Strategy<Svc_Handler> CREATION_STRATEGY;
+typedef ACE_Connect_Strategy<Svc_Handler,
+ ACE_SOCK_CONNECTOR> CONNECT_STRATEGY;
+typedef Concurrency_Strategy<Svc_Handler> CONCURRENCY_STRATEGY;
+typedef ACE_Strategy_Connector<Svc_Handler,
+ ACE_SOCK_CONNECTOR> BASE_CONNECTOR;
+
+class Connect_Thread : public ACE_Task_Base
+{
+public:
+ Connect_Thread (ACE_Thread_Manager &thread_manager,
+ ACE_Reactor &reactor,
+ ACE_Thread_Mutex &reactor_lock)
+ : ACE_Task_Base (&thread_manager)
+ , reactor_ (reactor)
+ , reactor_lock_ (reactor_lock)
+ , threads_ (number_of_threads)
+ {
+ // Open the connector.
+ this->base_connector_.open (&this->reactor_,
+ &this->cns_, &this->cts_, &this->cys_);
+ }
+
+ int svc (void);
+
+private:
+ ACE_Reactor &reactor_;
+ ACE_Thread_Mutex &reactor_lock_;
+
+ CREATION_STRATEGY cns_;
+ CONNECT_STRATEGY cts_;
+ CONCURRENCY_STRATEGY cys_;
+ BASE_CONNECTOR base_connector_;
+
+ ACE_Atomic_Op<ACE_Thread_Mutex, long> threads_;
+};
+
+int
+Connect_Thread::svc (void)
+{
+ size_t const nr_names = sizeof hosts / sizeof (char *);
+ ACE_INET_Addr *addresses = new ACE_INET_Addr[nr_names];
+
+ for (size_t i = 0; i < nr_names; ++i)
+ {
+ if (addresses[i].set (hosts[i]) != 0)
+ {
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("%p\n"),
+ hosts[i]));
+ }
+ }
+
+ ACE_INET_Addr local_addr;
+ ACE_Synch_Options synch_options (ACE_Synch_Options::USE_REACTOR);
+
+ for (size_t i = 0; i < nr_names; ++i)
+ {
+ ACE_INET_Addr &remote_addr = addresses[i];
+
+ ACE_DEBUG ((LM_DEBUG,
+ "%t connecting to %s...\n",
+ hosts[i]));
+
+ // Create a new handler. It's what begin_connection() in TAO does.
+ Svc_Handler *svc_handler = 0;
+ result = this->base_connector_.connect (svc_handler,
+ remote_addr,
+ synch_options,
+ local_addr);
+ ACE_Event_Handler_var release_guard (svc_handler);
+
+ // Complete connection. It's what complete_connection() in TAO does.
+ // Not exactly but for the test it's enough.
+ int run_for_wfmo = 5;
+ while (this->reactor_.work_pending () && run_for_wfmo)
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon,
+ this->reactor_lock_, -1);
+
+ if (this->reactor_.work_pending () && run_for_wfmo)
+ {
+ --run_for_wfmo;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "%t waiting for connection to %s...\n",
+ hosts[i]));
+
+ this->reactor_.owner (ACE_Thread::self ());
+
+ this->reactor_.handle_events ();
+ }
+ }
+
+ if (svc_handler->connected_)
+ {
+ this->reactor_.register_handler (svc_handler,
+ ACE_Event_Handler::READ_MASK);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "%t connection for %s completed on handle %d.\n",
+ hosts[i],
+ svc_handler->get_handle ()));
+ // We are connected successfully. Close the connection.
+ this->reactor_.remove_handler (svc_handler->get_handle (),
+ ACE_Event_Handler::ALL_EVENTS_MASK |
+ ACE_Event_Handler::DONT_CALL);
+ }
+ else
+ {
+ // Connection didn't succeed. svc_handler will be deleted when
+ // we go out of the scope of this loop.
+ ACE_DEBUG ((LM_DEBUG,
+ "%t connection for %s did not complete.\n",
+ hosts[i]));
+ }
+ }
+
+ if (--this->threads_ == 0)
+ {
+ // Close the connector.
+ this->base_connector_.close ();
+ }
+
+ delete[] addresses;
+
+ return 0;
+}
+
+void
+test (ACE_Reactor_Impl *reactor_impl)
+{
+ ACE_Reactor reactor (reactor_impl, true);
+
+ ACE_Thread_Mutex reactor_lock;
+ ACE_Thread_Manager thread_manager;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Starting %d connection threads...\n",
+ number_of_threads));
+
+ Connect_Thread connect_thread (thread_manager, reactor, reactor_lock);
+ result = connect_thread.activate (THR_NEW_LWP|THR_JOINABLE,
+ number_of_threads);
+ ACE_TEST_ASSERT (result == 0);
+
+ // Wait for threads to exit.
+ result = thread_manager.wait ();
+ ACE_TEST_ASSERT (result == 0);
+}
+
+static int
+parse_args (int argc, ACE_TCHAR *argv[])
+{
+ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:lm:n:o:uz:"));
+
+ int cc;
+ while ((cc = get_opt ()) != -1)
+ {
+ switch (cc)
+ {
+ case 'a':
+ test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'b':
+ test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'c':
+ test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'd':
+ test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'f':
+ number_of_threads = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'z':
+ debug = ACE_OS::atoi (get_opt.opt_arg ());
+ break;
+ case 'u':
+ default:
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("\nusage: %s \n\n")
+ ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
+ ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
+ ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
+ ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
+ ACE_TEXT ("\t[-f number of threads] (defaults to %d)\n")
+ ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
+ ACE_TEXT ("\n"),
+ argv[0],
+ test_select_reactor,
+ test_tp_reactor,
+ test_wfmo_reactor,
+ test_dev_poll_reactor,
+ number_of_threads,
+ debug));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int
+run_main (int argc, ACE_TCHAR *argv[])
+{
+ ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
+
+ // Validate options.
+ result = parse_args (argc, argv);
+ if (result != 0)
+ return result;
+
+ if (test_select_reactor)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
+
+ test (new ACE_Select_Reactor);
+ }
+
+ if (test_tp_reactor)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
+
+ test (new ACE_TP_Reactor);
+ }
+
+#if defined (ACE_HAS_EVENT_POLL)
+
+ if (test_dev_poll_reactor)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
+
+ test (new ACE_Dev_Poll_Reactor);
+ }
+
+#endif
+
+#if defined (ACE_WIN32)
+
+ if (test_wfmo_reactor)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
+
+ test (new ACE_WFMO_Reactor);
+ }
+
+#endif /* ACE_WIN32 */
+
+ ACE_END_TEST;
+
+ return result;
+}
+
+#else /* ACE_HAS_THREADS */
+
+int
+run_main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("MT_NonBlocking_Connect_Test"));
+
+ ACE_ERROR ((LM_INFO,
+ ACE_TEXT ("threads not supported on this platform\n")));
+
+ ACE_END_TEST;
+
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */