// $Id$ // ============================================================================ // // = LIBRARY // tests // // = FILENAME // Process_Strategy_Test.cpp // // = DESCRIPTION // This is a test of the and // classes. The uses // either the (which forks a // process-per-connection and runs as a concurrent server // process), the (which spawns a // thread-per-connection and runs as a concurrent server thread), // or (which register the // with the and runs in the main thread of control as an // iterative server). This server queries and increments a // "counting value" in a file. // // This test program can be run in the following ways: // // # Run the server "reactively" (i.e., iteratively) // % Process_Strategy_Test -c REACTIVE // // # Run the server in multi-threads. // % Process_Strategy_Test -c THREAD // // # Run the server in multi-processes // % Process_Strategy_Test -c PROCESS // // = AUTHOR // Douglas C. Schmidt // and Kevin Boyle // // ============================================================================ #include "test_config.h" #include "ace/Acceptor.h" #include "ace/Handle_Set.h" #include "ace/Get_Opt.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Connector.h" #include "ace/Strategies_T.h" #include "ace/Singleton.h" #include "ace/File_Lock.h" // Counting_Service and Options in here #include "Process_Strategy_Test.h" ACE_RCSID(tests, Process_Strategy_Test, "$Id$") #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Accept_Strategy; template class ACE_Acceptor; template class ACE_Creation_Strategy; template class ACE_Concurrency_Strategy; template class ACE_Guard; template class ACE_Singleton; template class ACE_Process_Strategy; template class ACE_Thread_Strategy; template class ACE_Reactive_Strategy; template class ACE_Read_Guard; template class ACE_Scheduling_Strategy; template class ACE_Strategy_Acceptor; template class ACE_Svc_Handler; template class ACE_Write_Guard; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Accept_Strategy #pragma instantiate ACE_Acceptor #pragma instantiate ACE_Creation_Strategy #pragma instantiate ACE_Concurrency_Strategy #pragma instantiate ACE_Guard #pragma instantiate ACE_Singleton #pragma instantiate ACE_Process_Strategy #pragma instantiate ACE_Thread_Strategy #pragma instantiate ACE_Reactive_Strategy #pragma instantiate ACE_Read_Guard #pragma instantiate ACE_Scheduling_Strategy #pragma instantiate ACE_Strategy_Acceptor #pragma instantiate ACE_Svc_Handler #pragma instantiate ACE_Write_Guard #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ // Define a that's parameterized by the // . typedef ACE_Strategy_Acceptor ACCEPTOR; // Create an Options Singleton. typedef ACE_Singleton OPTIONS; // counter for connections static int connections = 0; // Use this to show down the process gracefully. static void connection_completed (void) { // Increment connection counter. connections++; // If all connections have been serviced. if (connections == ACE_MAX_ITERATIONS + 1) // Make sure that the event loop is interrupted. ACE_Reactor::instance()->wakeup_all_threads (); } // Constructor Process_Strategy::Process_Strategy (size_t n_processes, ACE_Event_Handler *acceptor, ACE_Reactor *r, int avoid_zombies) : ACE_Process_Strategy (n_processes, acceptor, r, avoid_zombies) { } // Destructor. g++ 2.7.2.3 gets very confused ("Internal compiler // error") without it. Process_Strategy::~Process_Strategy (void) { } // Overwrite the process creation method to include connection // counting. int Process_Strategy::activate_svc_handler (Counting_Service *svc_handler, void *arg) { // Call down to the base class int result = ACE_Process_Strategy::activate_svc_handler (svc_handler, arg); // Connection is now complete connection_completed (); return result; } ACE_File_Lock & Options::file_lock (void) { return this->file_lock_; } ACE_Concurrency_Strategy * Options::concurrency_strategy (void) { return this->concurrency_strategy_; } const ACE_TCHAR * Options::filename (void) { return this->filename_; } Options::Options (void) : // Choose to use processes by default. #if !defined (ACE_LACKS_FORK) concurrency_type_ (PROCESS) #elif defined (ACE_HAS_THREADS) concurrency_type_ (THREAD) #else concurrency_type_ (REACTIVE) #endif /* !ACE_LACKS_FORK */ { } Options::~Options (void) { delete this->concurrency_strategy_; this->concurrency_strategy_ = 0; } int Options::parse_args (int argc, ACE_TCHAR *argv[]) { ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:c:f:")); // - 26 is for the "process_strategy_test_temp" that is appended if (ACE::get_temp_dir (this->filename_, MAXPATHLEN - 26) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Temporary path too long\n")), -1); ACE_OS_String::strcat (this->filename_, ACE_TEXT ("process_strategy_test_temp")); for (int c; (c = get_opt ()) != -1; ) switch (c) { case 'c': if (ACE_OS::strcmp (get_opt.optarg, ACE_TEXT ("REACTIVE")) == 0) OPTIONS::instance ()->concurrency_type (Options::REACTIVE); #if !defined (ACE_LACKS_FORK) else if (ACE_OS::strcmp (get_opt.optarg, ACE_TEXT ("PROCESS")) == 0) OPTIONS::instance ()->concurrency_type (Options::PROCESS); #endif /* !ACE_LACKS_FORK */ #if defined (ACE_HAS_THREADS) else if (ACE_OS::strcmp (get_opt.optarg, ACE_TEXT ("THREAD")) == 0) OPTIONS::instance ()->concurrency_type (Options::THREAD); #endif /* ACE_HAS_THREADS */ else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("WARNING: concurrency strategy \"%s\" ") ACE_TEXT ("is not supported\n"), get_opt.optarg)); break; case 'f': ACE_OS::strcpy (this->filename_, get_opt.optarg); break; default: ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("usage: %n [-f (filename)] ") ACE_TEXT ("[-c (concurrency strategy)]\n%a"), 1)); /* NOTREACHED */ } // Initialize the file lock. Note that this object lives beyond the // lifetime of the Acceptor. if (this->file_lock_.open (this->filename_, O_RDWR | O_CREAT, ACE_DEFAULT_FILE_PERMS) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) opening %s on handle %d.\n"), this->filename_, this->file_lock_.get_handle ())); int count = 0; // Store the initial value of the count in the file. if (ACE_OS::write (this->file_lock_.get_handle (), (const void *) &count, sizeof count) != sizeof count) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("write"))); // Initialize the Concurrency strategy. switch (this->concurrency_type_) { case Options::PROCESS: #if !defined (ACE_LACKS_FORK) ACE_NEW_RETURN (this->concurrency_strategy_, Process_Strategy (1, this, ACE_Reactor::instance (), 1), // Avoid zombies. -1); break; #else # if defined (ACE_PSOS_DIAB) // Workaround for compiler confusion with strings in assertions. const int PROCESS_INVALID_ON_THIS_PLATFORM = 1; ACE_ASSERT (PROCESS_INVALID_ON_THIS_PLATFORM == 0); # else /* ! defined (ACE_PSOS_DIAB) */ ACE_ASSERT ("PROCESS invalid on this platform" == 0); # endif /* defined (ACE_PSOS_DIAB) */ #endif /* !defined (ACE_LACKS_FORK) */ case Options::THREAD: #if defined (ACE_HAS_THREADS) ACE_NEW_RETURN (this->concurrency_strategy_, ACE_Thread_Strategy (ACE_Thread_Manager::instance (), THR_NEW_LWP, 1), -1); break; #else ACE_ASSERT (!"THREAD invalid on this platform"); #endif /* !ACE_HAS_THREADS */ case Options::REACTIVE: // Settle for the purely Reactive strategy. ACE_NEW_RETURN (this->concurrency_strategy_, ACE_Reactive_Strategy (ACE_Reactor::instance ()), -1); break; } return 0; } Options::Concurrency_Type Options::concurrency_type (void) { return this->concurrency_type_; } void Options::concurrency_type (Options::Concurrency_Type cs) { this->concurrency_type_ = cs; } Counting_Service::Counting_Service (ACE_Thread_Manager *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) creating the Counting_Service\n"))); } // Read the current value from the shared file and return it to the // client. int Counting_Service::read (void) { ACE_READ_GUARD_RETURN (ACE_File_Lock, ace_mon, OPTIONS::instance ()->file_lock (), -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) reading on handle %d.\n"), OPTIONS::instance ()->file_lock ().get_handle ())); int count; if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (), (void *) &count, sizeof count, 0) != sizeof count) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("read")), -1); char buf[BUFSIZ]; int n = ACE_OS::sprintf (buf, "count = %d\n", count); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) count = %d\n"), count)); if (this->peer ().send_n (buf, n) != n) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("send_n")), -1); return 0; } // Increment the current value in the shared file by 1. int Counting_Service::inc (void) { ACE_WRITE_GUARD_RETURN (ACE_File_Lock, ace_mon, OPTIONS::instance ()->file_lock (), -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) incrementing on handle %d.\n"), OPTIONS::instance ()->file_lock ().get_handle ())); int count; if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (), (void *) &count, sizeof count, 0) != sizeof count) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("read")), -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) incrementing count from %d to %d\n"), count, count + 1)); count++; if (ACE_OS::pwrite (OPTIONS::instance ()->file_lock ().get_handle (), (const void *) &count, sizeof count, 0) != sizeof count) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("write")), -1); return 0; } // Receive the request from the client and call the appropriate // operation. int Counting_Service::handle_input (ACE_HANDLE) { char buf[BUFSIZ]; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) reading from peer on %d\n"), this->peer ().get_handle ())); size_t len; // Read the PDU length first. ssize_t bytes = this->peer ().recv ((void *) &len, sizeof len); if (bytes <= 0) return -1; bytes = this->peer ().recv (buf, len); if (bytes <= 0 || buf[0] == (char) EOF) return -1; else { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) %d bytes of input on %d is %*s\n"), bytes, this->peer ().get_handle (), bytes, buf)); // Read and return the current value in the file. if (ACE_OS::strncmp (buf, "read", 4) == 0) return this->read (); // Increment the current value in the file. else if (ACE_OS::strncmp (buf, "inc", 3) == 0) return this->inc (); else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) no match...\n"))); return 0; } } int Counting_Service::svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) handling thread\n"))); while (this->handle_input () >= 0) continue; return 0; } int Counting_Service::handle_close (ACE_HANDLE, ACE_Reactor_Mask) { // Done with another connection. connection_completed (); // Call down to base class return ACE_Svc_Handler::handle_close (); } // This method is called back by the once the client has // connected and the process is forked or spawned. int Counting_Service::open (void *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) opening service\n"))); if (OPTIONS::instance ()->concurrency_type () == Options::PROCESS) { // We need to rerun the event loop here since we ended up here // due to being fork'd and we can't just return to our context // because it's in the middle of a different event loop that // won't behave properly since it's meant to handle connection // establishment, *not* data transfer. while (this->handle_input () >= 0) continue; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) About to exit from the child\n"))); // Exit the child. ACE_OS::exit (0); } else if (OPTIONS::instance ()->concurrency_type () == Options::THREAD) // We need to set this to 0 so that our method doesn't // try to deregister from the Reactor. this->reactor (0); return 0; } #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS) // Execute the client tests. static void * client (void *arg) { ACE_INET_Addr *remote_addr = ACE_reinterpret_cast (ACE_INET_Addr *, arg); ACE_INET_Addr server_addr (remote_addr->get_port_number (), ACE_DEFAULT_SERVER_HOST); ACE_SOCK_Stream stream; ACE_SOCK_Connector connector; char buf[BUFSIZ]; const char *command; size_t command_len; size_t i; for (i = 0; i < ACE_MAX_ITERATIONS; i++) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) client iteration %d\n"), i)); if (connector.connect (stream, server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), 0); command = "inc"; command_len = ACE_OS::strlen (command); if (stream.send (4, &command_len, sizeof command_len, command, command_len) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("send")), 0); command = "read"; command_len = ACE_OS::strlen (command); if (stream.send (4, &command_len, sizeof command_len, command, command_len) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("send")), 0); else if (stream.recv (buf, sizeof buf) <= 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n"), ACE_TEXT ("recv")), 0); // ACE_DEBUG ((LM_DEBUG, // ACE_TEXT ("(%P|%t) client iteration %d, buf = %s\n"), // i, buf)); if (stream.close () == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("close")), 0); } command = "read"; command_len = ACE_OS::strlen (command); int bytes_read = 0; if (connector.connect (stream, server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), 0); else if (stream.send (4, &command_len, sizeof command_len, command, command_len) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("send")), 0); else if ((bytes_read = stream.recv (buf, sizeof buf)) <= 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("recv")), 0); else { // Null terminate buf to avoid an uninitialized memory read in // the call to ACE_OS::strrchr (). buf [bytes_read] = '\0'; size_t count = ACE_OS::atoi (ACE_OS::strrchr (ACE_TEXT_CHAR_TO_TCHAR (buf), ACE_TEXT (' '))); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) count = %d\n"), count)); // Make sure that the count is correct. ACE_ASSERT (count == ACE_MAX_ITERATIONS); } if (stream.close () == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("close")), 0); // Remove the filename. ACE_OS::unlink (OPTIONS::instance ()->filename ()); return 0; } // Performs the server activities. // Have all connections been serviced? static int done (void) { return connections == ACE_MAX_ITERATIONS + 1; } static void * server (void *) { int result = 0; ACE_Reactor::instance ()->owner (ACE_Thread::self ()); while (!done () && result != -1) // Run the main event loop. result = ACE_Reactor::instance ()->handle_events (); return 0; } #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */ int main (int argc, ACE_TCHAR *argv[]) { ACE_START_TEST (ACE_TEXT ("Process_Strategy_Test")); if (OPTIONS::instance ()->parse_args (argc, argv) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("parse_args")), -1); ACCEPTOR acceptor; ACE_INET_Addr server_addr; // Bind acceptor to any port and then find out what the port was. // Note that this implicitly creates the Reactor singleton. if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &), ACE_Reactor::instance(), 0, 0, OPTIONS::instance ()->concurrency_strategy ()) == -1 || acceptor.acceptor ().get_local_addr (server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), -1); else { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) starting server at port %d\n"), server_addr.get_port_number ())); #if !defined (ACE_LACKS_FORK) // We're running the client and serve as separate processes. pid_t pid = ACE::fork ("child", 1); // Avoid zombies. switch (pid) { case -1: ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n%a"), ACE_TEXT ("fork failed"))); exit (-1); /* NOTREACHED */ case 0: client (&server_addr); break; /* NOTREACHED */ default: server (0); break; /* NOTREACHED */ } #elif defined (ACE_HAS_THREADS) if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (server), (void *) 0, THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n%a"), ACE_TEXT ("thread create failed"))); if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (client), (void *) &server_addr, THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) %p\n%a"), ACE_TEXT ("thread create failed"))); // Wait for the threads to exit. ACE_Thread_Manager::instance ()->wait (); #else ACE_ERROR ((LM_INFO, ACE_TEXT ("(%P|%t) only one thread may be run ") ACE_TEXT ("in a process on this platform\n"))); #endif /* ACE_HAS_THREADS */ } ACE_END_TEST; return 0; }