summaryrefslogtreecommitdiff
path: root/TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp')
-rw-r--r--TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp357
1 files changed, 357 insertions, 0 deletions
diff --git a/TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp b/TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp
new file mode 100644
index 00000000000..b8cd5ed04a1
--- /dev/null
+++ b/TAO/examples/CSD_Strategy/ThreadPool5/ServerApp.cpp
@@ -0,0 +1,357 @@
+// $Id$
+#include "ServerApp.h"
+#include "OrbTask.h"
+#include "FooServantList.h"
+#include "ClientTask.h"
+#include "OrbShutdownTask.h"
+#include "ace/Get_Opt.h"
+#include "tao/CSD_ThreadPool/CSD_TP_Strategy.h"
+#include "tao/Intrusive_Ref_Count_Handle_T.h"
+// To force static load the service.
+#include "tao/PI/PI.h"
+#include "tao/CSD_ThreadPool/CSD_ThreadPool.h"
+
+
+ServerApp::ServerApp()
+ : ior_filename_("foo"),
+ num_servants_(1),
+ num_csd_threads_ (1),
+ num_clients_(1),
+ num_orb_threads_ (1),
+ collocated_test_ (0),
+ servant_to_deactivate_ (-1)
+{
+}
+
+
+ServerApp::~ServerApp()
+{
+}
+
+
+int
+ServerApp::run(int argc, char* argv[] ACE_ENV_ARG_DECL)
+{
+ CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Parse the command-line args for this application.
+ // * Raises -1 if problems are encountered.
+ // * Returns 1 if the usage statement was explicitly requested.
+ // * Returns 0 otherwise.
+ int result = this->parse_args(argc, argv);
+ if (result != 0)
+ {
+ return result;
+ }
+
+ TheOrbShutdownTask::instance()->orb (orb.in ());
+
+ CORBA::Object_var obj
+ = orb->resolve_initial_references("RootPOA"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (CORBA::is_nil(obj.in()))
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Failed to resolve initial ref for 'RootPOA'.\n"));
+ ACE_THROW_RETURN (TestException(), -1);
+ }
+
+ PortableServer::POA_var root_poa
+ = PortableServer::POA::_narrow(obj.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (CORBA::is_nil(root_poa.in()))
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Failed to narrow obj ref to POA interface.\n"));
+ ACE_THROW_RETURN (TestException(), -1);
+ }
+
+ PortableServer::POAManager_var poa_manager
+ = root_poa->the_POAManager(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Create the child POA.
+ CORBA::PolicyList policies(1);
+ policies.length(1);
+
+ policies[0]
+ = root_poa->create_id_assignment_policy(PortableServer::USER_ID
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ PortableServer::POA_var child_poa
+ = root_poa->create_POA("ChildPoa",
+ poa_manager.in(),
+ policies
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (CORBA::is_nil(child_poa.in()))
+ {
+ ACE_ERROR((LM_ERROR, "(%P|%t) ERROR [ServerApp::run()]: "
+ "Failed to create the child POA.\n"));
+ ACE_THROW_RETURN (TestException(), -1);
+ }
+
+ policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Create the thread pool servant dispatching strategy object, and
+ // hold it in a (local) smart pointer variable.
+ TAO_Intrusive_Ref_Count_Handle<TAO::CSD::TP_Strategy> csd_tp_strategy =
+ new TAO::CSD::TP_Strategy();
+
+ csd_tp_strategy->set_num_threads(this->num_csd_threads_);
+
+ // Tell the strategy to apply itself to the child poa.
+ if (csd_tp_strategy->apply_to(child_poa.in() ACE_ENV_ARG_PARAMETER) == false)
+ {
+ ACE_ERROR((LM_ERROR, "(%P|%t) ERROR [ServerApp::run()]: "
+ "Failed to apply custom dispatching strategy to child poa.\n"));
+ ACE_THROW_RETURN (TestException(), -1);
+ }
+ ACE_CHECK_RETURN (-1);
+
+ FooServantList servants(this->ior_filename_.c_str(),
+ this->num_servants_,
+ this->num_clients_,
+ this->collocated_test_,
+ this->servant_to_deactivate_,
+ orb.in());
+
+ // Activate the POA Manager before start the ClientTask thread so that
+ // we do not need coordinate the ClientTask and main thread for the
+ // collocated test.
+ poa_manager->activate(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ servants.create_and_activate(orb.in (),
+ child_poa.in()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_DEBUG((LM_DEBUG,
+ "(%P|%t) ServerApp is ready.\n"));
+
+ // If the num_orb_threads_ is exactly one, then just use the current
+ // (mainline) thread to run the ORB event loop.
+ if (this->num_orb_threads_ == 1)
+ {
+ // Since the num_orb_threads_ is exactly one, we just use the current
+ // (mainline) thread to run the ORB event loop.
+ orb->run(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ }
+ else
+ {
+ // The num_orb_threads_ is greater than 1, so we will use an OrbTask
+ // (active object) to run the ORB event loop in (num_orb_threads_ - 1)
+ // threads. We use the current (mainline) thread as the other thread
+ // running the ORB event loop.
+ OrbTask orb_task(orb.in(), this->num_orb_threads_ - 1);
+
+ // Activate the OrbTask worker threads
+ if (orb_task.open() != 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Failed to open the OrbTask.\n"));
+ ACE_THROW_RETURN (TestException(), -1);
+ }
+
+ // This will use the current (mainline) thread to run the ORB event loop.
+ orb->run(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Now that the current thread has unblocked from running the orb,
+ // make sure to wait for all of the worker threads to complete.
+ orb_task.wait();
+ }
+
+ ACE_DEBUG((LM_DEBUG,
+ "(%P|%t) ServerApp is waiting for OrbShutdownTask.\n"));
+ TheOrbShutdownTask::instance()->wait ();
+
+ // Sleep for 2 second to let the done() two-way call complete
+ // before cleanup.
+ ACE_OS::sleep (2);
+
+ if (collocated_test_)
+ {
+ servants.collocated_client ()->wait ();
+ }
+
+ ACE_DEBUG((LM_DEBUG,
+ "(%P|%t) ServerApp is destroying the Root POA.\n"));
+
+ // Tear-down the root poa and orb.
+ root_poa->destroy(1, 1 ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_DEBUG((LM_DEBUG,
+ "(%P|%t) ServerApp is destroying the ORB.\n"));
+
+ orb->destroy(ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ ACE_DEBUG((LM_DEBUG,
+ "(%P|%t) ServerApp has completed running successfully.\n"));
+
+ return 0;
+}
+
+
+int
+ServerApp::parse_args(int argc, char* argv[])
+{
+ this->exe_name_ = argv[0];
+
+ ACE_Get_Opt get_opts(argc, argv, "p:s:c:t:l:d:n:");
+
+ int c;
+ int tmp;
+
+ while ((c = get_opts()) != -1)
+ {
+ int parse_error = 0;
+
+ switch (c)
+ {
+ case 'p':
+ this->ior_filename_ = get_opts.opt_arg();
+ break;
+
+ case 's':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 1)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -s must be followed by an integer "
+ "value greater than 0.\n"));
+ parse_error = 1;
+ }
+
+ this->num_servants_ = tmp;
+ break;
+
+ case 'c':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 1)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -c must be followed by an integer "
+ "value greater than 0.\n"));
+ parse_error = 1;
+ }
+
+ this->num_clients_ = tmp;
+ break;
+
+ case 't':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 1)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -t must be followed by an integer "
+ "value greater than 0.\n"));
+ parse_error = 1;
+ }
+
+ this->num_orb_threads_ = tmp;
+ break;
+
+ case 'n':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 1)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -n must be followed by an integer "
+ "value greater than 0.\n"));
+ parse_error = 1;
+ }
+
+ this->num_csd_threads_ = tmp;
+ break;
+
+ case 'l':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -l must be followed by an integer "
+ "value greater than -1.\n"));
+ parse_error = 1;
+ }
+
+ this->collocated_test_ = tmp;
+ break;
+
+ case 'd':
+ tmp = ACE_OS::atoi(get_opts.opt_arg());
+ if (tmp < 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. -d must be followed by an integer "
+ "value >= 0.\n"));
+ parse_error = 1;
+ }
+
+ this->servant_to_deactivate_ = tmp;
+ break;
+
+ case '?':
+ this->usage_statement();
+ return 1;
+
+ default:
+ this->usage_statement();
+ return -1;
+ }
+
+ if (parse_error != 0)
+ {
+ this->usage_statement();
+ return parse_error;
+ }
+ }
+
+ // The deadlock will happen with the collocated callback test
+ // when we have one working thread, so create at least one more
+ // working thread would resolve the deadlock.
+ if (this->collocated_test_ == 1 && this->num_csd_threads_ == 1)
+ {
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) Error. The num_csd_threads_ should be "
+ ">= 1.\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+void
+ServerApp::usage_statement()
+{
+ ACE_ERROR((LM_ERROR,
+ "(%P|%t) usage: %s \n"
+ "\t[-p <ior_filename_prefix>] \n"
+ "\t[-s <num_servants>] \n"
+ "\t[-c <num_clients>] \n"
+ "\t[-n <num_csd_threads>] \n"
+ "\t[-t <num_orb_threads>] \n"
+ "\t[-l <collocation_test>] \n"
+ "\t[-d <servant_to_deactivate>] \n"
+ "Default ior_filename_prefix is 'foo'.\n"
+ "Default num_servants is 1.\n"
+ "Default num_clients is 1.\n"
+ "Default num_orb_threads is 1.\n"
+ "Default collocation_test flag is 0.\n"
+ "Default servant_to_deactivate is -1 means not deactivate servant.\n"
+ " 0 means deactivate all servant. \n"
+ " >0 means the index (servant_to_deactivate-1) of the servant in the servant list.\n",
+ this->exe_name_.c_str ()));
+}