diff options
Diffstat (limited to 'tests/MT_Reference_Counted_Event_Handler_Test.cpp')
-rw-r--r-- | tests/MT_Reference_Counted_Event_Handler_Test.cpp | 1389 |
1 files changed, 1389 insertions, 0 deletions
diff --git a/tests/MT_Reference_Counted_Event_Handler_Test.cpp b/tests/MT_Reference_Counted_Event_Handler_Test.cpp new file mode 100644 index 00000000000..938669f403c --- /dev/null +++ b/tests/MT_Reference_Counted_Event_Handler_Test.cpp @@ -0,0 +1,1389 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// MT_Reference_Counted_Event_Handler_Test.cpp +// +// = DESCRIPTION +// +// This test tries to represents what happens in the ORB wrt to +// event handlers, reactors, timer queues, threads, and connection +// caches, minus the other complexities. The following three +// Reactors are tested: Select, TP, and WFMO. +// +// The test checks proper use and shutting down of client-side +// event handlers when it is used by invocation threads and/or +// event loop threads. Server-side event handlers are either +// threaded or reactive. A purger thread is introduced to check the +// connection recycling and cache purging. Nested upcalls are also +// tested. +// +// = AUTHOR +// Irfan Pyarali <irfan@oomworks.com> +// +// ============================================================================ + +#include "test_config.h" +#include "ace/Select_Reactor.h" +#include "ace/TP_Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Get_Opt.h" +#include "ace/Task.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Connector.h" + +ACE_RCSID(tests, MT_Reference_Counted_Event_Handler_Test, "$Id$") + +#if defined (ACE_HAS_THREADS) + +static const char message[] = "abcdefghijklmnopqrstuvwxyz"; +static const int message_size = 26; +static int test_select_reactor = 1; +static int test_tp_reactor = 1; +static int test_wfmo_reactor = 1; +static int debug = 0; +static int number_of_connections = 5; +static int max_nested_upcall_level = 10; +static int close_timeout = 500; +static int pipe_open_attempts = 10; +static int pipe_retry_timeout = 1; +static int make_invocations = -1; +static int run_event_loop_thread = -1; +static int run_purger_thread = -1; +static int run_receiver_thread = -1; +static int nested_upcalls = -1; + +static ACE_HANDLE server_handle = ACE_INVALID_HANDLE; +static ACE_HANDLE client_handle = ACE_INVALID_HANDLE; + +static int number_of_options = 5; +static int test_configs[][5] = + { + // + // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls + // + + // { 0, 0, 0, 0, 0, }, // At least one thread should be running. + // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations, + // no thread will know that the socket is closed. + // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, + // we cannot decide which socket to close. + // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations, + // no thread will know that the socket is closed. + // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, + // we cannot decide which socket to close. + { 0, 1, 0, 1, 0, }, + // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations. + // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded, + // we cannot decide which socket to close. + { 0, 1, 1, 1, 0, }, + // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations. + // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded, + // no thread can receive the messages. + { 1, 0, 0, 1, 0, }, + // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. + // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded, + // no thread can receive the messages. + { 1, 0, 1, 1, 0, }, + // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. + { 1, 1, 0, 0, 0, }, + { 1, 1, 0, 0, 1, }, + { 1, 1, 0, 1, 0, }, + // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. + { 1, 1, 1, 0, 0, }, + { 1, 1, 1, 0, 1, }, + { 1, 1, 1, 1, 0, }, + // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver. + }; + +/* Replication of the ACE_Pipe class. Only difference is that this + class always uses two sockets to create the pipe, even on platforms + that support pipes. */ + +class Pipe +{ +public: + + Pipe (void); + + int open (void); + + ACE_HANDLE read_handle (void) const; + + ACE_HANDLE write_handle (void) const; + +private: + ACE_HANDLE handles_[2]; +}; + +int +Pipe::open (void) +{ + ACE_INET_Addr my_addr; + ACE_SOCK_Acceptor acceptor; + ACE_SOCK_Connector connector; + ACE_SOCK_Stream reader; + ACE_SOCK_Stream writer; + int result = 0; + + // Bind listener to any port and then find out what the port was. + if (acceptor.open (ACE_Addr::sap_any) == -1 + || acceptor.get_local_addr (my_addr) == -1) + result = -1; + else + { + ACE_INET_Addr sv_addr (my_addr.get_port_number (), + ACE_LOCALHOST); + + // Establish a connection within the same process. + if (connector.connect (writer, sv_addr) == -1) + result = -1; + else if (acceptor.accept (reader) == -1) + { + writer.close (); + result = -1; + } + } + + // Close down the acceptor endpoint since we don't need it anymore. + acceptor.close (); + if (result == -1) + return -1; + + this->handles_[0] = reader.get_handle (); + this->handles_[1] = writer.get_handle (); + + return 0; +} + +Pipe::Pipe (void) +{ + this->handles_[0] = ACE_INVALID_HANDLE; + this->handles_[1] = ACE_INVALID_HANDLE; +} + +ACE_HANDLE +Pipe::read_handle (void) const +{ + return this->handles_[0]; +} + +ACE_HANDLE +Pipe::write_handle (void) const +{ + return this->handles_[1]; +} + +class Connection_Cache; +class Event_Loop_Thread; + +static Event_Loop_Thread *global_event_loop_thread_variable = 0; + +class Sender : public ACE_Event_Handler +{ +public: + + Sender (ACE_HANDLE handle, + Connection_Cache &connection_cache); + + ~Sender (void); + + int handle_input (ACE_HANDLE); + + ssize_t send_message (void); + + void close (void); + + ACE_HANDLE handle_; + + Connection_Cache &connection_cache_; + +}; + +class Connection_Cache +{ +public: + + Connection_Cache (void); + + ~Connection_Cache (void); + + void add_connection (Sender *sender); + + void remove_connection (Sender *sender); + + Sender *acquire_connection (void); + + void release_connection (Sender *sender); + + int find (Sender *sender); + + ACE_SYNCH_MUTEX &lock (void); + + enum State + { + IDLE, + BUSY, + NOT_IN_CACHE + }; + + struct Entry + { + Sender *sender_; + State state_; + }; + + Entry *entries_; + + ACE_SYNCH_MUTEX lock_; +}; + +Sender::Sender (ACE_HANDLE handle, + Connection_Cache &connection_cache) + : handle_ (handle), + connection_cache_ (connection_cache) +{ + // Enable reference counting. + this->reference_counting_policy ().value + (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Reference count in Sender::Sender() is %d\n", + this->reference_count_.value ())); +} + +Sender::~Sender (void) +{ + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Reference count in ~Sender::Sender() is %d\n", + this->reference_count_.value ())); + + // Close the socket that we are responsible for. + ACE_OS::closesocket (this->handle_); +} + +int +Sender::handle_input (ACE_HANDLE) +{ + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Reference count in Sender::handle_input() is %d\n", + this->reference_count_.value ())); + + // + // In this test, this method is only called when the connection has + // been closed. Remove self from Reactor. + // + + ACE_DEBUG ((LM_DEBUG, + "Event loop thread calling Sender::close() for handle %d\n", + this->handle_)); + + this->close (); + + return 0; +} + +void +Sender::close (void) +{ + // Remove socket from Reactor (may fail if another thread has already + // removed the handle from the Reactor). + this->reactor ()->remove_handler (this->handle_, + ACE_Event_Handler::ALL_EVENTS_MASK); + + // Remove self from connection cache (may fail if another thread has + // already removed "this" from the cache). + this->connection_cache_.remove_connection (this); +} + +ssize_t +Sender::send_message (void) +{ + return ACE::send_n (this->handle_, + message, + message_size); +} + +class Event_Loop_Thread : public ACE_Task_Base +{ +public: + + Event_Loop_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor); + + int svc (void); + + ACE_Reactor &reactor_; + +}; + +class Receiver : public ACE_Task_Base +{ +public: + + Receiver (ACE_Thread_Manager &thread_manager, + ACE_HANDLE handle, + int nested_upcalls); + + ~Receiver (void); + + int svc (void); + + int close (u_long flags); + + int handle_input (ACE_HANDLE); + + int resume_handler (void); + + ACE_HANDLE handle_; + + int counter_; + + int nested_upcalls_; + + int nested_upcalls_level_; + +}; + +Receiver::Receiver (ACE_Thread_Manager &thread_manager, + ACE_HANDLE handle, + int nested_upcalls) + : ACE_Task_Base (&thread_manager), + handle_ (handle), + counter_ (1), + nested_upcalls_ (nested_upcalls), + nested_upcalls_level_ (0) +{ + // Enable reference counting. + this->reference_counting_policy ().value + (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Reference count in Receiver::Receiver() is %d\n", + this->reference_count_.value ())); +} + +Receiver::~Receiver (void) +{ + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Reference count in ~Receiver::Receiver() is %d\n", + this->reference_count_.value ())); + + // Close the socket that we are responsible for. + ACE_OS::closesocket (this->handle_); +} + +int +Receiver::svc (void) +{ + // + // Continuously receive messages from the Sender. On error, exit + // thread. + // + + int result = 0; + + while (result != -1) + { + result = + this->handle_input (this->handle_); + } + + return 0; +} + +int +Receiver::handle_input (ACE_HANDLE handle) +{ + char buf[message_size + 1]; + + // Receive message. + ssize_t result = + ACE::recv_n (handle, + buf, + sizeof buf - 1); + + if (this->reactor ()) + this->reactor ()->resume_handler (handle); + + if (result == message_size) + { + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Message %d received on handle %d\n", + this->counter_++, + handle)); + + if (this->thr_count () == 0 && + this->nested_upcalls_) + { + this->nested_upcalls_level_++; + + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Nesting level %d\n", + this->nested_upcalls_level_)); + + if (this->nested_upcalls_level_ != max_nested_upcall_level) + global_event_loop_thread_variable->svc (); + + this->nested_upcalls_level_--; + return 0; + } + else + return 0; + } + else + { + if (debug) + ACE_DEBUG ((LM_DEBUG, + "/*** Problem in receiving message %d on handle %d: shutting down receiving thread ***/\n", + this->counter_, + handle)); + + return -1; + } +} + +int +Receiver::resume_handler (void) +{ + /// The application takes responsibility of resuming the handler. + return ACE_APPLICATION_RESUMES_HANDLER; +} + +int +Receiver::close (u_long) +{ + // If threaded, we are responsible for deleting this instance when + // the thread completes. If not threaded, Reactor reference + // counting will handle the deletion of this instance. + delete this; + return 0; +} + +class Connector +{ +public: + + Connector (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + int nested_upcalls); + + int connect (ACE_HANDLE &client_handle, + ACE_HANDLE &server_handle, + int run_receiver_thread); + + ACE_Thread_Manager &thread_manager_; + + ACE_Reactor &reactor_; + + int nested_upcalls_; + +}; + +Connector::Connector (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + int nested_upcalls) + : thread_manager_ (thread_manager), + reactor_ (reactor), + nested_upcalls_ (nested_upcalls) +{ +} + +int +Connector::connect (ACE_HANDLE &client_handle, + ACE_HANDLE &server_handle, + int run_receiver_thread) +{ + // + // Create a connection and a receiver to receive messages on the + // connection. + // + + Pipe pipe; + int result = 0; + + for (int i = 0; i < pipe_open_attempts; ++i) + { + result = + pipe.open (); + + if (result == 0) + break; + + if (result == -1) + ACE_OS::sleep (pipe_retry_timeout); + } + + ACE_ASSERT (result == 0); + ACE_UNUSED_ARG (result); + + Receiver *receiver = + new Receiver (this->thread_manager_, + pipe.write_handle (), + this->nested_upcalls_); + + // Either the receiver is threaded or register it with the Reactor. + if (run_receiver_thread) + result = + receiver->activate (); + else + { + result = + this->reactor_.register_handler (pipe.write_handle (), + receiver, + ACE_Event_Handler::READ_MASK); + + // The reference count on the receiver was increased by the + // Reactor. + ACE_Event_Handler_var safe_receiver (receiver); + } + + ACE_ASSERT (result == 0); + ACE_UNUSED_ARG (result); + + client_handle = + pipe.read_handle (); + + server_handle = + pipe.write_handle (); + + if (debug) + ACE_DEBUG ((LM_DEBUG, + "New connection: client handle = %d, server handle = %d\n", + client_handle, server_handle)); + + return 0; +} + +Connection_Cache::Connection_Cache (void) +{ + // Initialize the connection cache. + this->entries_ = + new Entry[number_of_connections]; + + for (int i = 0; i < number_of_connections; ++i) + { + this->entries_[i].sender_ = 0; + this->entries_[i].state_ = NOT_IN_CACHE; + } +} + +int +Connection_Cache::find (Sender *sender) +{ + for (int i = 0; i < number_of_connections; ++i) + { + if (this->entries_[i].sender_ == sender) + return i; + } + + return -1; +} + +void +Connection_Cache::add_connection (Sender *sender) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + // Make sure that the state of the connection cache is as + // expected. <sender> should not be already in the cache. + ACE_ASSERT (this->find (sender) == -1); + + int empty_index = + this->find (0); + + sender->add_reference (); + this->entries_[empty_index].sender_ = sender; + this->entries_[empty_index].state_ = BUSY; +} + +void +Connection_Cache::remove_connection (Sender *sender) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + // Make sure that the state of the connection cache is as expected. + // remove_connection() may already have been called. + int index = + this->find (sender); + + if (index == -1) + return; + + // If we still have the sender, remove it. + sender->remove_reference (); + this->entries_[index].sender_ = 0; + this->entries_[index].state_ = NOT_IN_CACHE; +} + +Sender * +Connection_Cache::acquire_connection (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0); + + // Find a valid and IDLE sender. + + int index = -1; + + for (int i = 0; i < number_of_connections; ++i) + { + if (this->entries_[i].sender_ && + this->entries_[i].state_ == IDLE) + index = i; + } + + if (index == -1) + return 0; + + this->entries_[index].sender_->add_reference (); + this->entries_[index].state_ = BUSY; + + return this->entries_[index].sender_; +} + +void +Connection_Cache::release_connection (Sender *sender) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + // Make sure that the state of the connection cache is as expected. + // remove_connection() may have already removed the connection from + // the cache. + int index = + this->find (sender); + + if (index == -1) + return; + + // If we still have the sender, idle it. + this->entries_[index].state_ = IDLE; +} + +ACE_SYNCH_MUTEX & +Connection_Cache::lock (void) +{ + return this->lock_; +} + +Connection_Cache::~Connection_Cache (void) +{ + for (int i = 0; i < number_of_connections; ++i) + { + if (this->entries_[i].sender_) + this->remove_connection (this->entries_[i].sender_); + } + + delete[] this->entries_; +} + +class Invocation_Thread : public ACE_Task_Base +{ +public: + + Invocation_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + Connection_Cache &connection_cache, + ACE_Auto_Event &new_connection_event, + int make_invocations, + int run_receiver_thread, + int nested_upcalls); + + int svc (void); + + Sender *create_connection (void); + + Connection_Cache &connection_cache_; + + ACE_Reactor &reactor_; + + ACE_Thread_Manager &thread_manager_; + + ACE_Auto_Event &new_connection_event_; + + int make_invocations_; + + int run_receiver_thread_; + + int nested_upcalls_; + +}; + +Invocation_Thread::Invocation_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + Connection_Cache &connection_cache, + ACE_Auto_Event &new_connection_event, + int make_invocations, + int run_receiver_thread, + int nested_upcalls) + : ACE_Task_Base (&thread_manager), + connection_cache_ (connection_cache), + reactor_ (reactor), + thread_manager_ (thread_manager), + new_connection_event_ (new_connection_event), + make_invocations_ (make_invocations), + run_receiver_thread_ (run_receiver_thread), + nested_upcalls_ (nested_upcalls) +{ +} + +Sender * +Invocation_Thread::create_connection (void) +{ + int result = 0; + + // Connector for creating new connections. + Connector connector (this->thread_manager_, + this->reactor_, + this->nested_upcalls_); + + // <server_handle> is a global variable. It will be used later by + // the Close_Socket_Thread. + result = + connector.connect (client_handle, + server_handle, + this->run_receiver_thread_); + ACE_ASSERT (result == 0); + ACE_UNUSED_ARG (result); + + // Create a new sender. + Sender *sender = + new Sender (client_handle, + this->connection_cache_); + + // Register it with the cache. + this->connection_cache_.add_connection (sender); + + // + // There might be a race condition here. The sender has been added + // to the cache and is potentially available to other threads + // accessing the cache. Therefore, the other thread may use this + // sender and potentially close the sender before it even gets + // registered with the Reactor. + // + // This is resolved by marking the connection as busy when it is + // first added to the cache. And only once the thread creating the + // connection is done with it, it is marked a available in the + // cache. + // + // This order of registration is important. + // + + // Register the handle with the Reactor. + result = + this->reactor_.register_handler (client_handle, + sender, + ACE_Event_Handler::READ_MASK); + ACE_ASSERT (result == 0); + ACE_UNUSED_ARG (result); + + return sender; +} + +int +Invocation_Thread::svc (void) +{ + int connection_counter = 0; + for (int message_counter = 1;; ++message_counter) + { + // Get a connection from the cache. + Sender *sender = + this->connection_cache_.acquire_connection (); + + // If no connection is available in the cache, create a new one. + if (sender == 0) + { + if (connection_counter < number_of_connections) + { + sender = this->create_connection (); + + // This lets the Close_Socket_Thread know that the new + // connection has been created. + int result = + this->new_connection_event_.signal (); + ACE_ASSERT (result == 0); + ACE_UNUSED_ARG (result); + + ++connection_counter; + message_counter = 1; + } + else + // Stop the thread, if the maximum number of connections + // for the test has been reached. + break; + } + + // The reference count on the sender was increased by the cache + // before it was returned to us. + ACE_Event_Handler_var safe_sender (sender); + + // If the test does not require making invocations, immediately + // release the connection. + if (!this->make_invocations_) + { + this->connection_cache_.release_connection (sender); + + // Sleep for a short while + ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000)); + } + else + { + // Make invocation. + ssize_t result = + sender->send_message (); + + // If successful, release connection. + if (result == message_size) + { + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Message %d:%d delivered on handle %d\n", + connection_counter, + message_counter, + sender->handle_)); + + this->connection_cache_.release_connection (sender); + } + else + { + // If failure in making invocation, close the sender. + if (debug) + ACE_DEBUG ((LM_DEBUG, + "/*** Problem in delivering message %d:%d on handle %d: shutting down invocation thread ***/\n", + connection_counter, + message_counter, + sender->handle_)); + + ACE_DEBUG ((LM_DEBUG, + "Invocation thread calling Sender::close() for handle %d\n", + sender->handle_)); + + sender->close (); + } + } + } + + // Close the Reactor event loop. + this->reactor_.end_reactor_event_loop (); + + return 0; +} + +class Close_Socket_Thread : public ACE_Task_Base +{ +public: + + Close_Socket_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + ACE_Auto_Event &new_connection_event, + int make_invocations, + int run_receiver_thread); + + int svc (void); + + ACE_Auto_Event &new_connection_event_; + + ACE_Reactor &reactor_; + + int make_invocations_; + + int run_receiver_thread_; + +}; + +Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + ACE_Auto_Event &new_connection_event, + int make_invocations, + int run_receiver_thread) + : ACE_Task_Base (&thread_manager), + new_connection_event_ (new_connection_event), + reactor_ (reactor), + make_invocations_ (make_invocations), + run_receiver_thread_ (run_receiver_thread) +{ +} + +int +Close_Socket_Thread::svc (void) +{ + ACE_OS::srand ((u_int) ACE_OS::time ()); + ACE_Time_Value timeout (0, close_timeout * 1000); + + for (; !this->reactor_.reactor_event_loop_done ();) + { + // Wait for the new connection to be established. + int result = + this->new_connection_event_.wait (&timeout, + 0); + ACE_ASSERT (result == 0 || + (result == -1 && errno == ETIME)); + + if (result == -1 && + errno == ETIME) + continue; + + // Sleep for half a second. + ACE_OS::sleep (timeout); + + int close_client = 0; + + // If the invocation thread is making invocations and if the + // receiver is threaded, either socket can be closed. + if (this->make_invocations_ && + this->run_receiver_thread_) + // Randomize which socket to close. + close_client = ACE_OS::rand () % 2; + + // If the invocation thread is making invocations, only close + // the client socket. + else if (this->make_invocations_) + close_client = 1; + else + // If the receiver is threaded, only close the server socket. + close_client = 0; + + if (close_client) + { + // Close the client socket. + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Close socket thread closing client handle %d\n", + client_handle)); + + ACE_OS::closesocket (client_handle); + } + else + { + // Close the server socket. + if (debug) + ACE_DEBUG ((LM_DEBUG, + "Close socket thread closing server handle %d\n", + server_handle)); + + ACE_OS::closesocket (server_handle); + } + } + + return 0; +} + +Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor) + : ACE_Task_Base (&thread_manager), + reactor_ (reactor) +{ +} + +int +Event_Loop_Thread::svc (void) +{ + // Simply run the event loop. + this->reactor_.owner (ACE_Thread::self ()); + + while (!this->reactor_.reactor_event_loop_done ()) + { + this->reactor_.handle_events (); + } + + return 0; +} + +class Purger_Thread : public ACE_Task_Base +{ +public: + + Purger_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + Connection_Cache &connection_cache); + + int svc (void); + + ACE_Reactor &reactor_; + + Connection_Cache &connection_cache_; + +}; + +Purger_Thread::Purger_Thread (ACE_Thread_Manager &thread_manager, + ACE_Reactor &reactor, + Connection_Cache &connection_cache) + : ACE_Task_Base (&thread_manager), + reactor_ (reactor), + connection_cache_ (connection_cache) +{ +} + +int +Purger_Thread::svc (void) +{ + for (; !this->reactor_.reactor_event_loop_done ();) + { + // Get a connection from the cache. + Sender *sender = + this->connection_cache_.acquire_connection (); + + // If no connection is available in the cache, sleep for a while. + if (sender == 0) + ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000)); + else + { + // The reference count on the sender was increased by the + // cache before it was returned to us. + ACE_Event_Handler_var safe_sender (sender); + + // Actively close the connection. + ACE_DEBUG ((LM_DEBUG, + "Purger thread calling Sender::close() for handle %d\n", + sender->handle_)); + + sender->close (); + } + } + + return 0; +} + +void +testing (ACE_Reactor *reactor, + int make_invocations, + int run_event_loop_thread, + int run_purger_thread, + int run_receiver_thread, + int nested_upcalls) +{ + ACE_DEBUG ((LM_DEBUG, + "\nConfiguration: \n" + "\tInvocation thread = %d\n" + "\tEvent Loop thread = %d\n" + "\tPurger thread = %d\n" + "\tReceiver thread = %d\n" + "\tNested Upcalls = %d\n\n", + make_invocations, + run_event_loop_thread, + run_purger_thread, + run_receiver_thread, + nested_upcalls)); + + ACE_Thread_Manager thread_manager; + + int result = 0; + + // Create the connection cache. + Connection_Cache connection_cache; + ACE_Auto_Event new_connection_event; + + // Create the invocation thread. + Invocation_Thread invocation_thread (thread_manager, + *reactor, + connection_cache, + new_connection_event, + make_invocations, + run_receiver_thread, + nested_upcalls); + + result = + invocation_thread.activate (); + ACE_ASSERT (result == 0); + + // Create the thread for closing the server socket. + Close_Socket_Thread close_socket_thread (thread_manager, + *reactor, + new_connection_event, + make_invocations, + run_receiver_thread); + result = + close_socket_thread.activate (); + ACE_ASSERT (result == 0); + + global_event_loop_thread_variable = 0; + + // Create a thread to run the event loop. + Event_Loop_Thread event_loop_thread (thread_manager, + *reactor); + if (run_event_loop_thread) + { + result = + event_loop_thread.activate (); + ACE_ASSERT (result == 0); + + global_event_loop_thread_variable = + &event_loop_thread; + } + + // Create a thread to run the purger. + Purger_Thread purger_thread (thread_manager, + *reactor, + connection_cache); + if (run_purger_thread) + { + result = + purger_thread.activate (); + ACE_ASSERT (result == 0); + } + + // Wait for threads to exit. + result = thread_manager.wait (); + ACE_ASSERT (result == 0); +} + +template <class REACTOR_IMPL> +class test +{ +public: + test (int ignore_nested_upcalls, + int require_event_loop_thread); +}; + +template <class REACTOR_IMPL> +test<REACTOR_IMPL>::test (int ignore_nested_upcalls, + int require_event_loop_thread) +{ + for (int i = 0; + i < (int) (sizeof test_configs / (sizeof (int) * number_of_options)); + i++) + { + if ((make_invocations == -1 || + make_invocations == test_configs[i][0]) && + (run_event_loop_thread == -1 || + run_event_loop_thread == test_configs[i][1]) && + (run_purger_thread == -1 || + run_purger_thread == test_configs[i][2]) && + (run_receiver_thread == -1 || + run_receiver_thread == test_configs[i][3]) && + (nested_upcalls == -1 || + nested_upcalls == test_configs[i][4])) + { + +#if defined (linux) + + // @@ I am not sure why but when <make_invocations> is 0 and + // there is no purger thread, the receiver thread does not + // notice that the connection has been closed. + if (!test_configs[i][0] && !test_configs[i][2]) + continue; + + // @@ Linux also does not work correctly in the following + // case: Invocation thread starts and sends messages filling + // the socket buffer. It then blocks in write(). In the + // meantime, the close connection thread closes the socket + // used by invocation thread. However, the invocation thread + // does not notice this as it does not return from write(). + // Meanwhile, the event loop thread notices that a socket in + // it's wait set has been closed, and starts to spin in + // handle_events() since the invocation thread is not taking + // out the closed handle from the Reactor's wait set. + if (test_configs[i][0] && test_configs[i][1] && !test_configs[i][3]) + continue; + +#endif /* linux */ + + if (test_configs[i][4] && ignore_nested_upcalls) + continue; + + if (!test_configs[i][1] && require_event_loop_thread) + continue; + + ACE_Reactor reactor (new REACTOR_IMPL, + 1); + + testing (&reactor, + test_configs[i][0], + test_configs[i][1], + test_configs[i][2], + test_configs[i][3], + test_configs[i][4]); + } + } +} + +static int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:f:g:k:l:m:n:o:uz:")); + + int cc; + while ((cc = get_opt ()) != -1) + { + switch (cc) + { + case 'a': + test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'b': + test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'c': + test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'f': + number_of_connections = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'g': + close_timeout = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'k': + make_invocations = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'l': + run_event_loop_thread = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'm': + run_purger_thread = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'n': + run_receiver_thread = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'o': + nested_upcalls = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'z': + debug = ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'u': + default: + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("\nusage: %s \n\n") + ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n") + ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n") + ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n") + ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n") + ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n") + ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n") + ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n") + ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n") + ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n") + ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n") + ACE_TEXT ("\t[-z debug] (defaults to %d)\n") + ACE_TEXT ("\n"), + argv[0], + test_select_reactor, + test_tp_reactor, + test_wfmo_reactor, + number_of_connections, + close_timeout, + make_invocations, + run_event_loop_thread, + run_purger_thread, + run_receiver_thread, + nested_upcalls, + debug)); + return -1; + } + } + + return 0; +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test")); + + // Validate options. + int result = + parse_args (argc, argv); + if (result != 0) + return result; + +#if defined (SIGPIPE) && !defined (ACE_LACKS_UNIX_SIGNALS) + // There's really no way to deal with this in a portable manner, so + // we just have to suck it up and get preprocessor conditional and + // ugly. + // + // Impractical to have each call to the ORB protect against the + // implementation artifact of potential writes to dead connections, + // as it'd be way expensive. Do it here; who cares about SIGPIPE in + // these kinds of applications, anyway? + (void) ACE_OS::signal (SIGPIPE, (ACE_SignalHandler) SIG_IGN); +#endif /* SIGPIPE */ + + int ignore_nested_upcalls = 1; + int perform_nested_upcalls = 0; + + int event_loop_thread_required = 1; + int event_loop_thread_not_required = 0; + + if (test_select_reactor) + { + ACE_DEBUG ((LM_DEBUG, + "\n\nTesting Select Reactor....\n\n")); + + test<ACE_Select_Reactor> test (ignore_nested_upcalls, + event_loop_thread_not_required); + ACE_UNUSED_ARG (test); + } + + if (test_tp_reactor) + { + ACE_DEBUG ((LM_DEBUG, + "\n\nTesting TP Reactor....\n\n")); + + test<ACE_TP_Reactor> test (perform_nested_upcalls, + event_loop_thread_not_required); + ACE_UNUSED_ARG (test); + } + +#if defined (ACE_WIN32) + + if (test_wfmo_reactor) + { + ACE_DEBUG ((LM_DEBUG, + "\n\nTesting WFMO Reactor....\n\n")); + + test<ACE_WFMO_Reactor> test (ignore_nested_upcalls, + event_loop_thread_required); + ACE_UNUSED_ARG (test); + } + +#else /* ACE_WIN32 */ + + ACE_UNUSED_ARG (event_loop_thread_required); + +#endif /* ACE_WIN32 */ + + ACE_END_TEST; + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class test<ACE_Select_Reactor>; +template class test<ACE_TP_Reactor>; +#if defined (ACE_WIN32) +template class test<ACE_WFMO_Reactor>; +#endif /* ACE_WIN32 */ +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate test<ACE_Select_Reactor> +#pragma instantiate test<ACE_TP_Reactor> +#if defined (ACE_WIN32) +#pragma instantiate test<ACE_WFMO_Reactor> +#endif /* ACE_WIN32 */ +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + +#else /* ACE_HAS_THREADS */ + +int +ACE_TMAIN (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("Timer_Cancellation_Test")); + + ACE_ERROR ((LM_INFO, + ACE_TEXT ("threads not supported on this platform\n"))); + + ACE_END_TEST; + + return 0; +} + +#endif /* ACE_HAS_THREADS */ |