diff options
Diffstat (limited to 'tests/MT_Reference_Counted_Event_Handler_Test.cpp')
-rw-r--r-- | tests/MT_Reference_Counted_Event_Handler_Test.cpp | 1424 |
1 files changed, 0 insertions, 1424 deletions
diff --git a/tests/MT_Reference_Counted_Event_Handler_Test.cpp b/tests/MT_Reference_Counted_Event_Handler_Test.cpp deleted file mode 100644 index f2e52a5c24f..00000000000 --- a/tests/MT_Reference_Counted_Event_Handler_Test.cpp +++ /dev/null @@ -1,1424 +0,0 @@ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// tests -// -// = FILENAME -// MT_Reference_Counted_Event_Handler_Test.cpp -// -// = DESCRIPTION -// -// This test tries to represents what happens in the ORB wrt to -// event handlers, reactors, timer queues, threads, and connection -// caches, minus the other complexities. The following reactors -// are tested: Select, TP, WFMO, and Dev Poll (if enabled). -// -// The test checks proper use and shutting down of client-side -// event handlers when it is used by invocation threads and/or -// event loop threads. Server-side event handlers are either -// threaded or reactive. A purger thread is introduced to check the -// connection recycling and cache purging. Nested upcalls are also -// tested. -// -// = AUTHOR -// Irfan Pyarali <irfan@oomworks.com> -// -// ============================================================================ - -#include "test_config.h" -#include "ace/Reactor.h" -#include "ace/Select_Reactor.h" -#include "ace/TP_Reactor.h" -#include "ace/WFMO_Reactor.h" -#include "ace/Dev_Poll_Reactor.h" -#include "ace/Get_Opt.h" -#include "ace/Task.h" -#include "ace/SOCK_Acceptor.h" -#include "ace/SOCK_Connector.h" -#include "ace/Auto_Event.h" -#include "ace/OS_NS_signal.h" -#include "ace/OS_NS_time.h" -#include "ace/OS_NS_sys_socket.h" -#include "ace/OS_NS_unistd.h" - -ACE_RCSID(tests, MT_Reference_Counted_Event_Handler_Test, "$Id$") - -#if defined (ACE_HAS_THREADS) - -static const char message[] = "abcdefghijklmnopqrstuvwxyz"; -static const int message_size = 26; -static int test_select_reactor = 1; -static int test_tp_reactor = 1; -static int test_wfmo_reactor = 1; -static int test_dev_poll_reactor = 1; -static int debug = 0; -static int number_of_connections = 5; -static int max_nested_upcall_level = 10; -static int close_timeout = 500; -static int pipe_open_attempts = 10; -static int pipe_retry_timeout = 1; -static int make_invocations = -1; -static int run_event_loop_thread = -1; -static int run_purger_thread = -1; -static int run_receiver_thread = -1; -static int nested_upcalls = -1; - -static ACE_HANDLE server_handle = ACE_INVALID_HANDLE; -static ACE_HANDLE client_handle = ACE_INVALID_HANDLE; - -static int number_of_options = 5; -static int test_configs[][5] = - { - // - // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls - // - - // { 0, 0, 0, 0, 0, }, // At least one thread should be running. - // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations, - // no thread will know that the socket is closed. - // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, - // we cannot decide which socket to close. - // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations, - // no thread will know that the socket is closed. - // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, - // we cannot decide which socket to close. - { 0, 1, 0, 1, 0, }, - // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations. - // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, - // we cannot decide which socket to close. - { 0, 1, 1, 1, 0, }, - // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations. - // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded, - // no thread can receive the messages. - { 1, 0, 0, 1, 0, }, - // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. - // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded, - // no thread can receive the messages. - { 1, 0, 1, 1, 0, }, - // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. - { 1, 1, 0, 0, 0, }, - { 1, 1, 0, 0, 1, }, - { 1, 1, 0, 1, 0, }, - // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. - { 1, 1, 1, 0, 0, }, - { 1, 1, 1, 0, 1, }, - { 1, 1, 1, 1, 0, }, - // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. - }; - -/* Replication of the ACE_Pipe class. Only difference is that this - class always uses two sockets to create the pipe, even on platforms - that support pipes. */ - -class Pipe -{ -public: - - Pipe (void); - - int open (void); - - ACE_HANDLE read_handle (void) const; - - ACE_HANDLE write_handle (void) const; - -private: - ACE_HANDLE handles_[2]; -}; - -int -Pipe::open (void) -{ - ACE_INET_Addr my_addr; - ACE_SOCK_Acceptor acceptor; - ACE_SOCK_Connector connector; - ACE_SOCK_Stream reader; - ACE_SOCK_Stream writer; - int result = 0; - - // Bind listener to any port and then find out what the port was. - if (acceptor.open (ACE_Addr::sap_any) == -1 - || acceptor.get_local_addr (my_addr) == -1) - result = -1; - else - { - ACE_INET_Addr sv_addr (my_addr.get_port_number (), - ACE_LOCALHOST); - - // Establish a connection within the same process. - if (connector.connect (writer, sv_addr) == -1) - result = -1; - else if (acceptor.accept (reader) == -1) - { - writer.close (); - result = -1; - } - } - - // Close down the acceptor endpoint since we don't need it anymore. - acceptor.close (); - if (result == -1) - return -1; - - this->handles_[0] = reader.get_handle (); - this->handles_[1] = writer.get_handle (); - - return 0; -} - -Pipe::Pipe (void) -{ - this->handles_[0] = ACE_INVALID_HANDLE; - this->handles_[1] = ACE_INVALID_HANDLE; -} - -ACE_HANDLE -Pipe::read_handle (void) const -{ - return this->handles_[0]; -} - -ACE_HANDLE -Pipe::write_handle (void) const -{ - return this->handles_[1]; -} - -class Connection_Cache; -class Event_Loop_Thread; - -static Event_Loop_Thread *global_event_loop_thread_variable = 0; - -class Sender : public ACE_Event_Handler -{ -public: - - Sender (ACE_HANDLE handle, - Connection_Cache &connection_cache); - - ~Sender (void); - - int handle_input (ACE_HANDLE); - - ssize_t send_message (void); - - void close (void); - - ACE_HANDLE handle_; - - Connection_Cache &connection_cache_; - -}; - -class Connection_Cache -{ -public: - - Connection_Cache (void); - - ~Connection_Cache (void); - - void add_connection (Sender *sender); - - void remove_connection (Sender *sender); - - Sender *acquire_connection (void); - - void release_connection (Sender *sender); - - int find (Sender *sender); - - ACE_SYNCH_MUTEX &lock (void); - - enum State - { - IDLE, - BUSY, - NOT_IN_CACHE - }; - - struct Entry - { - Sender *sender_; - State state_; - }; - - Entry *entries_; - - ACE_SYNCH_MUTEX lock_; -}; - -Sender::Sender (ACE_HANDLE handle, - Connection_Cache &connection_cache) - : handle_ (handle), - connection_cache_ (connection_cache) -{ - // Enable reference counting. - this->reference_counting_policy ().value - (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); - - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Reference count in Sender::Sender() is %d\n"), - this->reference_count_.value ())); -} - -Sender::~Sender (void) -{ - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Reference count in ~Sender::Sender() is %d\n"), - this->reference_count_.value ())); - - // Close the socket that we are responsible for. - ACE_OS::closesocket (this->handle_); -} - -int -Sender::handle_input (ACE_HANDLE) -{ - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Reference count in Sender::handle_input() is %d\n"), - this->reference_count_.value ())); - - // - // In this test, this method is only called when the connection has - // been closed. Remove self from Reactor. - // - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Event loop thread calling Sender::close() ") - ACE_TEXT ("for handle %d\n"), - this->handle_)); - - this->close (); - - return 0; -} - -void -Sender::close (void) -{ - // Remove socket from Reactor (may fail if another thread has already - // removed the handle from the Reactor). - this->reactor ()->remove_handler (this->handle_, - ACE_Event_Handler::ALL_EVENTS_MASK); - - // Remove self from connection cache (may fail if another thread has - // already removed "this" from the cache). - this->connection_cache_.remove_connection (this); -} - -ssize_t -Sender::send_message (void) -{ - return ACE::send_n (this->handle_, - message, - message_size); -} - -class Event_Loop_Thread : public ACE_Task_Base -{ -public: - - Event_Loop_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor); - - int svc (void); - - ACE_Reactor &reactor_; - -}; - -class Receiver : public ACE_Task_Base -{ -public: - - Receiver (ACE_Thread_Manager &thread_manager, - ACE_HANDLE handle, - int nested_upcalls); - - ~Receiver (void); - - int svc (void); - - int close (u_long flags); - - int handle_input (ACE_HANDLE); - - int resume_handler (void); - - ACE_HANDLE handle_; - - int counter_; - - int nested_upcalls_; - - int nested_upcalls_level_; - -}; - -Receiver::Receiver (ACE_Thread_Manager &thread_manager, - ACE_HANDLE handle, - int nested_upcalls) - : ACE_Task_Base (&thread_manager), - handle_ (handle), - counter_ (1), - nested_upcalls_ (nested_upcalls), - nested_upcalls_level_ (0) -{ - // Enable reference counting. - this->reference_counting_policy ().value - (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); - - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Reference count in Receiver::Receiver() is %d\n"), - this->reference_count_.value ())); -} - -Receiver::~Receiver (void) -{ - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Reference count in ~Receiver::Receiver() is %d\n"), - this->reference_count_.value ())); - - // Close the socket that we are responsible for. - ACE_OS::closesocket (this->handle_); -} - -int -Receiver::svc (void) -{ - // - // Continuously receive messages from the Sender. On error, exit - // thread. - // - - int result = 0; - - while (result != -1) - { - result = - this->handle_input (this->handle_); - } - - return 0; -} - -int -Receiver::handle_input (ACE_HANDLE handle) -{ - char buf[message_size + 1]; - - // Receive message. - ssize_t result = - ACE::recv_n (handle, - buf, - sizeof buf - 1); - - if (this->reactor ()) - this->reactor ()->resume_handler (handle); - - if (result == message_size) - { - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Message %d received on handle %d\n"), - this->counter_++, - handle)); - - if (this->thr_count () == 0 && - this->nested_upcalls_) - { - this->nested_upcalls_level_++; - - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Nesting level %d\n"), - this->nested_upcalls_level_)); - - if ((this->nested_upcalls_level_ != max_nested_upcall_level) && - (global_event_loop_thread_variable != 0)) - global_event_loop_thread_variable->svc (); - - this->nested_upcalls_level_--; - return 0; - } - else - return 0; - } - else - { - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("/*** Problem in receiving message %d on handle") - ACE_TEXT (" %d: shutting down receiving thread ***/\n"), - this->counter_, - handle)); - - return -1; - } -} - -int -Receiver::resume_handler (void) -{ - /// The application takes responsibility of resuming the handler. - return ACE_APPLICATION_RESUMES_HANDLER; -} - -int -Receiver::close (u_long) -{ - // If threaded, we are responsible for deleting this instance when - // the thread completes. If not threaded, Reactor reference - // counting will handle the deletion of this instance. - delete this; - return 0; -} - -class Connector -{ -public: - - Connector (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - int nested_upcalls); - - int connect (ACE_HANDLE &client_handle, - ACE_HANDLE &server_handle, - int run_receiver_thread); - - ACE_Thread_Manager &thread_manager_; - - ACE_Reactor &reactor_; - - int nested_upcalls_; - -}; - -Connector::Connector (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - int nested_upcalls) - : thread_manager_ (thread_manager), - reactor_ (reactor), - nested_upcalls_ (nested_upcalls) -{ -} - -int -Connector::connect (ACE_HANDLE &client_handle, - ACE_HANDLE &server_handle, - int run_receiver_thread) -{ - // - // Create a connection and a receiver to receive messages on the - // connection. - // - - Pipe pipe; - int result = 0; - - for (int i = 0; i < pipe_open_attempts; ++i) - { - result = - pipe.open (); - - if (result == 0) - break; - - if (result == -1) - ACE_OS::sleep (pipe_retry_timeout); - } - - ACE_ASSERT (result == 0); - ACE_UNUSED_ARG (result); - - Receiver *receiver = - new Receiver (this->thread_manager_, - pipe.write_handle (), - this->nested_upcalls_); - - // Either the receiver is threaded or register it with the Reactor. - if (run_receiver_thread) - result = - receiver->activate (); - else - { - result = - this->reactor_.register_handler (pipe.write_handle (), - receiver, - ACE_Event_Handler::READ_MASK); - - // The reference count on the receiver was increased by the - // Reactor. - ACE_Event_Handler_var safe_receiver (receiver); - } - - ACE_ASSERT (result == 0); - ACE_UNUSED_ARG (result); - - client_handle = - pipe.read_handle (); - - server_handle = - pipe.write_handle (); - - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("New connection: client handle = %d, ") - ACE_TEXT ("server handle = %d\n"), - client_handle, server_handle)); - - return 0; -} - -Connection_Cache::Connection_Cache (void) -{ - // Initialize the connection cache. - this->entries_ = - new Entry[number_of_connections]; - - for (int i = 0; i < number_of_connections; ++i) - { - this->entries_[i].sender_ = 0; - this->entries_[i].state_ = NOT_IN_CACHE; - } -} - -int -Connection_Cache::find (Sender *sender) -{ - for (int i = 0; i < number_of_connections; ++i) - { - if (this->entries_[i].sender_ == sender) - return i; - } - - return -1; -} - -void -Connection_Cache::add_connection (Sender *sender) -{ - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - // Make sure that the state of the connection cache is as - // expected. <sender> should not be already in the cache. - ACE_ASSERT (this->find (sender) == -1); - - int empty_index = - this->find (0); - - sender->add_reference (); - this->entries_[empty_index].sender_ = sender; - this->entries_[empty_index].state_ = BUSY; -} - -void -Connection_Cache::remove_connection (Sender *sender) -{ - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - // Make sure that the state of the connection cache is as expected. - // remove_connection() may already have been called. - int index = - this->find (sender); - - if (index == -1) - return; - - // If we still have the sender, remove it. - sender->remove_reference (); - this->entries_[index].sender_ = 0; - this->entries_[index].state_ = NOT_IN_CACHE; -} - -Sender * -Connection_Cache::acquire_connection (void) -{ - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0); - - // Find a valid and IDLE sender. - - int index = -1; - - for (int i = 0; i < number_of_connections; ++i) - { - if (this->entries_[i].sender_ && - this->entries_[i].state_ == IDLE) - index = i; - } - - if (index == -1) - return 0; - - this->entries_[index].sender_->add_reference (); - this->entries_[index].state_ = BUSY; - - return this->entries_[index].sender_; -} - -void -Connection_Cache::release_connection (Sender *sender) -{ - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - // Make sure that the state of the connection cache is as expected. - // remove_connection() may have already removed the connection from - // the cache. - int index = - this->find (sender); - - if (index == -1) - return; - - // If we still have the sender, idle it. - this->entries_[index].state_ = IDLE; -} - -ACE_SYNCH_MUTEX & -Connection_Cache::lock (void) -{ - return this->lock_; -} - -Connection_Cache::~Connection_Cache (void) -{ - for (int i = 0; i < number_of_connections; ++i) - { - if (this->entries_[i].sender_) - this->remove_connection (this->entries_[i].sender_); - } - - delete[] this->entries_; -} - -class Invocation_Thread : public ACE_Task_Base -{ -public: - - Invocation_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - Connection_Cache &connection_cache, - ACE_Auto_Event &new_connection_event, - int make_invocations, - int run_receiver_thread, - int nested_upcalls); - - int svc (void); - - Sender *create_connection (void); - - Connection_Cache &connection_cache_; - - ACE_Reactor &reactor_; - - ACE_Thread_Manager &thread_manager_; - - ACE_Auto_Event &new_connection_event_; - - int make_invocations_; - - int run_receiver_thread_; - - int nested_upcalls_; - -}; - -Invocation_Thread::Invocation_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - Connection_Cache &connection_cache, - ACE_Auto_Event &new_connection_event, - int make_invocations, - int run_receiver_thread, - int nested_upcalls) - : ACE_Task_Base (&thread_manager), - connection_cache_ (connection_cache), - reactor_ (reactor), - thread_manager_ (thread_manager), - new_connection_event_ (new_connection_event), - make_invocations_ (make_invocations), - run_receiver_thread_ (run_receiver_thread), - nested_upcalls_ (nested_upcalls) -{ -} - -Sender * -Invocation_Thread::create_connection (void) -{ - int result = 0; - - // Connector for creating new connections. - Connector connector (this->thread_manager_, - this->reactor_, - this->nested_upcalls_); - - // <server_handle> is a global variable. It will be used later by - // the Close_Socket_Thread. - result = - connector.connect (client_handle, - server_handle, - this->run_receiver_thread_); - ACE_ASSERT (result == 0); - ACE_UNUSED_ARG (result); - - // Create a new sender. - Sender *sender = - new Sender (client_handle, - this->connection_cache_); - - // Register it with the cache. - this->connection_cache_.add_connection (sender); - - // - // There might be a race condition here. The sender has been added - // to the cache and is potentially available to other threads - // accessing the cache. Therefore, the other thread may use this - // sender and potentially close the sender before it even gets - // registered with the Reactor. - // - // This is resolved by marking the connection as busy when it is - // first added to the cache. And only once the thread creating the - // connection is done with it, it is marked a available in the - // cache. - // - // This order of registration is important. - // - - // Register the handle with the Reactor. - result = - this->reactor_.register_handler (client_handle, - sender, - ACE_Event_Handler::READ_MASK); -#if 0 - ACE_ASSERT (result == 0); - ACE_UNUSED_ARG (result); -#else - if (result != 0) - ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) create_connection h %d, %p\n"), - client_handle, - ACE_TEXT ("register_handler"))); -#endif - return sender; -} - -int -Invocation_Thread::svc (void) -{ - int connection_counter = 0; - for (int message_counter = 1;; ++message_counter) - { - // Get a connection from the cache. - Sender *sender = - this->connection_cache_.acquire_connection (); - - // If no connection is available in the cache, create a new one. - if (sender == 0) - { - if (connection_counter < number_of_connections) - { - sender = this->create_connection (); - - // This lets the Close_Socket_Thread know that the new - // connection has been created. - int result = - this->new_connection_event_.signal (); - ACE_ASSERT (result == 0); - ACE_UNUSED_ARG (result); - - ++connection_counter; - message_counter = 1; - } - else - // Stop the thread, if the maximum number of connections - // for the test has been reached. - break; - } - - // The reference count on the sender was increased by the cache - // before it was returned to us. - ACE_Event_Handler_var safe_sender (sender); - - // If the test does not require making invocations, immediately - // release the connection. - if (!this->make_invocations_) - { - this->connection_cache_.release_connection (sender); - - // Sleep for a short while - ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000)); - } - else - { - // Make invocation. - ssize_t result = - sender->send_message (); - - // If successful, release connection. - if (result == message_size) - { - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Message %d:%d delivered on handle %d\n"), - connection_counter, - message_counter, - sender->handle_)); - - this->connection_cache_.release_connection (sender); - } - else - { - // If failure in making invocation, close the sender. - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("/*** Problem in delivering message ") - ACE_TEXT ("%d:%d on handle %d: shutting down ") - ACE_TEXT ("invocation thread ***/\n"), - connection_counter, - message_counter, - sender->handle_)); - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Invocation thread calling ") - ACE_TEXT ("Sender::close() for handle %d\n"), - sender->handle_)); - - sender->close (); - } - } - } - - // Close the Reactor event loop. - this->reactor_.end_reactor_event_loop (); - - return 0; -} - -class Close_Socket_Thread : public ACE_Task_Base -{ -public: - - Close_Socket_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - ACE_Auto_Event &new_connection_event, - int make_invocations, - int run_receiver_thread); - - int svc (void); - - ACE_Auto_Event &new_connection_event_; - - ACE_Reactor &reactor_; - - int make_invocations_; - - int run_receiver_thread_; - -}; - -Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - ACE_Auto_Event &new_connection_event, - int make_invocations, - int run_receiver_thread) - : ACE_Task_Base (&thread_manager), - new_connection_event_ (new_connection_event), - reactor_ (reactor), - make_invocations_ (make_invocations), - run_receiver_thread_ (run_receiver_thread) -{ -} - -int -Close_Socket_Thread::svc (void) -{ - ACE_OS::srand ((u_int) ACE_OS::time ()); - ACE_Time_Value timeout (0, close_timeout * 1000); - - for (; !this->reactor_.reactor_event_loop_done ();) - { - // Wait for the new connection to be established. - int result = - this->new_connection_event_.wait (&timeout, - 0); - ACE_ASSERT (result == 0 || - (result == -1 && errno == ETIME)); - - if (result == -1 && - errno == ETIME) - continue; - - // Sleep for half a second. - ACE_OS::sleep (timeout); - - int close_client = 0; - - // If the invocation thread is making invocations and if the - // receiver is threaded, either socket can be closed. - if (this->make_invocations_ && - this->run_receiver_thread_) - // Randomize which socket to close. - close_client = ACE_OS::rand () % 2; - - // If the invocation thread is making invocations, only close - // the client socket. - else if (this->make_invocations_) - close_client = 1; - else - // If the receiver is threaded, only close the server socket. - close_client = 0; - - if (close_client) - { - // Close the client socket. - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Close socket thread closing client ") - ACE_TEXT ("handle %d\n"), - client_handle)); - - ACE_OS::shutdown (client_handle, ACE_SHUTDOWN_BOTH); - ACE_OS::closesocket (client_handle); - } - else - { - // Close the server socket. - if (debug) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Close socket thread closing server ") - ACE_TEXT ("handle %d\n"), - server_handle)); - ACE_OS::shutdown (server_handle, ACE_SHUTDOWN_BOTH); - ACE_OS::closesocket (server_handle); - } - } - - return 0; -} - -Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor) - : ACE_Task_Base (&thread_manager), - reactor_ (reactor) -{ -} - -int -Event_Loop_Thread::svc (void) -{ - // Simply run the event loop. - this->reactor_.owner (ACE_Thread::self ()); - - while (!this->reactor_.reactor_event_loop_done ()) - { - this->reactor_.handle_events (); - } - - return 0; -} - -class Purger_Thread : public ACE_Task_Base -{ -public: - - Purger_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - Connection_Cache &connection_cache); - - int svc (void); - - ACE_Reactor &reactor_; - - Connection_Cache &connection_cache_; - -}; - -Purger_Thread::Purger_Thread (ACE_Thread_Manager &thread_manager, - ACE_Reactor &reactor, - Connection_Cache &connection_cache) - : ACE_Task_Base (&thread_manager), - reactor_ (reactor), - connection_cache_ (connection_cache) -{ -} - -int -Purger_Thread::svc (void) -{ - for (; !this->reactor_.reactor_event_loop_done ();) - { - // Get a connection from the cache. - Sender *sender = - this->connection_cache_.acquire_connection (); - - // If no connection is available in the cache, sleep for a while. - if (sender == 0) - ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000)); - else - { - // The reference count on the sender was increased by the - // cache before it was returned to us. - ACE_Event_Handler_var safe_sender (sender); - - // Actively close the connection. - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Purger thread calling Sender::close() ") - ACE_TEXT ("for handle %d\n"), - sender->handle_)); - - sender->close (); - } - } - - return 0; -} - -void -testing (ACE_Reactor *reactor, - int make_invocations, - int run_event_loop_thread, - int run_purger_thread, - int run_receiver_thread, - int nested_upcalls) -{ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\nConfiguration: \n") - ACE_TEXT ("\tInvocation thread = %d\n") - ACE_TEXT ("\tEvent Loop thread = %d\n") - ACE_TEXT ("\tPurger thread = %d\n") - ACE_TEXT ("\tReceiver thread = %d\n") - ACE_TEXT ("\tNested Upcalls = %d\n\n"), - make_invocations, - run_event_loop_thread, - run_purger_thread, - run_receiver_thread, - nested_upcalls)); - - ACE_Thread_Manager thread_manager; - - int result = 0; - - // Create the connection cache. - Connection_Cache connection_cache; - ACE_Auto_Event new_connection_event; - - // Create the invocation thread. - Invocation_Thread invocation_thread (thread_manager, - *reactor, - connection_cache, - new_connection_event, - make_invocations, - run_receiver_thread, - nested_upcalls); - - result = - invocation_thread.activate (); - ACE_ASSERT (result == 0); - - // Create the thread for closing the server socket. - Close_Socket_Thread close_socket_thread (thread_manager, - *reactor, - new_connection_event, - make_invocations, - run_receiver_thread); - result = - close_socket_thread.activate (); - ACE_ASSERT (result == 0); - - global_event_loop_thread_variable = 0; - - // Create a thread to run the event loop. - Event_Loop_Thread event_loop_thread (thread_manager, - *reactor); - if (run_event_loop_thread) - { - global_event_loop_thread_variable = - &event_loop_thread; - - result = - event_loop_thread.activate (); - ACE_ASSERT (result == 0); - } - - // Create a thread to run the purger. - Purger_Thread purger_thread (thread_manager, - *reactor, - connection_cache); - if (run_purger_thread) - { - result = - purger_thread.activate (); - ACE_ASSERT (result == 0); - } - - // Wait for threads to exit. - result = thread_manager.wait (); - ACE_ASSERT (result == 0); - - // Set the global variable to zero again because the - // event_loop_thread exists on the stack and now - // gets destructed. - global_event_loop_thread_variable = 0; -} - -template <class REACTOR_IMPL> -class test -{ -public: - test (int ignore_nested_upcalls, - int require_event_loop_thread); -}; - -template <class REACTOR_IMPL> -test<REACTOR_IMPL>::test (int ignore_nested_upcalls, - int require_event_loop_thread) -{ - for (int i = 0; - i < (int) (sizeof test_configs / (sizeof (int) * number_of_options)); - i++) - { - if ((make_invocations == -1 || - make_invocations == test_configs[i][0]) && - (run_event_loop_thread == -1 || - run_event_loop_thread == test_configs[i][1]) && - (run_purger_thread == -1 || - run_purger_thread == test_configs[i][2]) && - (run_receiver_thread == -1 || - run_receiver_thread == test_configs[i][3]) && - (nested_upcalls == -1 || - nested_upcalls == test_configs[i][4])) - { - -#if 0 // defined (linux) - - // @@ I am not sure why but when <make_invocations> is 0 and - // there is no purger thread, the receiver thread does not - // notice that the connection has been closed. - if (!test_configs[i][0] && !test_configs[i][2]) - continue; - - // @@ Linux also does not work correctly in the following - // case: Invocation thread starts and sends messages filling - // the socket buffer. It then blocks in write(). In the - // meantime, the close connection thread closes the socket - // used by invocation thread. However, the invocation thread - // does not notice this as it does not return from write(). - // Meanwhile, the event loop thread notices that a socket in - // it's wait set has been closed, and starts to spin in - // handle_events() since the invocation thread is not taking - // out the closed handle from the Reactor's wait set. - if (test_configs[i][0] && test_configs[i][1] && !test_configs[i][3]) - continue; - -#endif /* linux */ - - if (test_configs[i][4] && ignore_nested_upcalls) - continue; - - if (!test_configs[i][1] && require_event_loop_thread) - continue; - - ACE_Reactor reactor (new REACTOR_IMPL, - 1); - - testing (&reactor, - test_configs[i][0], - test_configs[i][1], - test_configs[i][2], - test_configs[i][3], - test_configs[i][4]); - } - } -} - -static int -parse_args (int argc, ACE_TCHAR *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:l:m:n:o:uz:")); - - int cc; - while ((cc = get_opt ()) != -1) - { - switch (cc) - { - case 'a': - test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'b': - test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'c': - test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'd': - test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'f': - number_of_connections = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'g': - close_timeout = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'k': - make_invocations = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'l': - run_event_loop_thread = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'm': - run_purger_thread = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'n': - run_receiver_thread = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'o': - nested_upcalls = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'z': - debug = ACE_OS::atoi (get_opt.opt_arg ()); - break; - case 'u': - default: - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("\nusage: %s \n\n") - ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n") - ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n") - ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n") - ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n") - ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n") - ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n") - ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n") - ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n") - ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n") - ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n") - ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n") - ACE_TEXT ("\t[-z debug] (defaults to %d)\n") - ACE_TEXT ("\n"), - argv[0], - test_select_reactor, - test_tp_reactor, - test_wfmo_reactor, - test_dev_poll_reactor, - number_of_connections, - close_timeout, - make_invocations, - run_event_loop_thread, - run_purger_thread, - run_receiver_thread, - nested_upcalls, - debug)); - return -1; - } - } - - return 0; -} - -int -run_main (int argc, ACE_TCHAR *argv[]) -{ - ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test")); - - // Validate options. - int result = - parse_args (argc, argv); - if (result != 0) - return result; - -#if defined (SIGPIPE) && !defined (ACE_LACKS_UNIX_SIGNALS) - // There's really no way to deal with this in a portable manner, so - // we just have to suck it up and get preprocessor conditional and - // ugly. - // - // Impractical to have each call to the ORB protect against the - // implementation artifact of potential writes to dead connections, - // as it'd be way expensive. Do it here; who cares about SIGPIPE in - // these kinds of applications, anyway? - (void) ACE_OS::signal (SIGPIPE, (ACE_SignalHandler) SIG_IGN); -#endif /* SIGPIPE */ - - int ignore_nested_upcalls = 1; - int perform_nested_upcalls = 0; - - int event_loop_thread_required = 1; - int event_loop_thread_not_required = 0; - - if (test_select_reactor) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTesting Select Reactor....\n\n"))); - - test<ACE_Select_Reactor> test (ignore_nested_upcalls, - event_loop_thread_not_required); - ACE_UNUSED_ARG (test); - } - - if (test_tp_reactor) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTesting TP Reactor....\n\n"))); - - test<ACE_TP_Reactor> test (perform_nested_upcalls, - event_loop_thread_not_required); - ACE_UNUSED_ARG (test); - } - -#if defined (ACE_HAS_EVENT_POLL) - - if (test_dev_poll_reactor) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTesting Dev Poll Reactor....\n\n"))); - - test<ACE_Dev_Poll_Reactor> test (perform_nested_upcalls, - event_loop_thread_not_required); - ACE_UNUSED_ARG (test); - } - -#endif - -#if defined (ACE_WIN32) - - if (test_wfmo_reactor) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\n\nTesting WFMO Reactor....\n\n"))); - - test<ACE_WFMO_Reactor> test (ignore_nested_upcalls, - event_loop_thread_required); - ACE_UNUSED_ARG (test); - } - -#else /* ACE_WIN32 */ - - ACE_UNUSED_ARG (event_loop_thread_required); - -#endif /* ACE_WIN32 */ - - ACE_END_TEST; - - return 0; -} - -#else /* ACE_HAS_THREADS */ - -int -run_main (int, ACE_TCHAR *[]) -{ - ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test")); - - ACE_ERROR ((LM_INFO, - ACE_TEXT ("threads not supported on this platform\n"))); - - ACE_END_TEST; - - return 0; -} - -#endif /* ACE_HAS_THREADS */ |