diff options
Diffstat (limited to 'TAO/orbsvcs/examples/Notify/ThreadPool/Consumer_Client.cpp')
-rw-r--r-- | TAO/orbsvcs/examples/Notify/ThreadPool/Consumer_Client.cpp | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/TAO/orbsvcs/examples/Notify/ThreadPool/Consumer_Client.cpp b/TAO/orbsvcs/examples/Notify/ThreadPool/Consumer_Client.cpp new file mode 100644 index 00000000000..480dbb5005c --- /dev/null +++ b/TAO/orbsvcs/examples/Notify/ThreadPool/Consumer_Client.cpp @@ -0,0 +1,274 @@ +// $Id$ + +#include "Consumer_Client.h" +#include "Consumer.h" +#include "ORB_Run_Task.h" +#include "ace/Arg_Shifter.h" +#include "orbsvcs/NotifyExtC.h" +#include "orbsvcs/CosNamingC.h" +#include "tao/ORB_Core.h" +#include "ace/Sched_Params.h" +#include "ace/OS_NS_errno.h" + +ACE_RCSID (Notify, TAO_Notify_ThreadPool_Consumer_Client, "$Id$") + +TAO_Notify_ThreadPool_Consumer_Client::TAO_Notify_ThreadPool_Consumer_Client (TAO_Notify_ORB_Objects& orb_objects) + : orb_objects_ (orb_objects) + , consumer_ (0) + , proxy_supplier_thread_count_ (0) + , max_events_ (10) + , delay_ (0) +{ +} + +TAO_Notify_ThreadPool_Consumer_Client::~TAO_Notify_ThreadPool_Consumer_Client () +{ +} + +int +TAO_Notify_ThreadPool_Consumer_Client::parse_args (int argc, char *argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + const ACE_TCHAR *current_arg = 0; + + while (arg_shifter.is_anything_left ()) + { + if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-ProxySupplier_ThreadPool")))) // Specify a threadpool. + { + this->proxy_supplier_thread_count_ = ACE_OS::atoi (arg_shifter.get_current ()); + + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-MaxEvents")))) // Max Events + { + this->max_events_ = ACE_OS::atoi (arg_shifter.get_current ()); + + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-Delay")))) // seconds wait in consumer per push. + { + this->delay_ = ACE_OS::atoi (current_arg); + + arg_shifter.consume_arg (); + } + else + { + arg_shifter.ignore_arg (); + } + } + + return 0; +} + +void +TAO_Notify_ThreadPool_Consumer_Client::_init (ACE_ENV_SINGLE_ARG_DECL) +{ + PortableServer::POAManager_var poa_manager = + this->orb_objects_.root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Resolve the Notification Factory. + CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Find the EventChannel created by the supplier. + CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq = ecf->get_all_channels (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CosNotifyChannelAdmin::EventChannel_var ec; + + if (channel_seq->length() > 0) + { + ec = ecf->get_event_channel (channel_seq[0] ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + else + { + ACE_DEBUG ((LM_DEBUG, "No Event Channel active!\n")); + return; + } + + // Create a Consumer Admin + CosNotifyChannelAdmin::AdminID adminid = 0; + + CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->new_for_consumers (CosNotifyChannelAdmin::AND_OP, adminid ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_ASSERT (!CORBA::is_nil (consumer_admin.in ())); + + PortableServer::POA_var rt_poa = this->create_rt_poa (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Create a Consumer + this->consumer_ = new TAO_Notify_ThreadPool_Consumer (this->orb_objects_); + + // Initialize it. + this->consumer_->init (rt_poa, consumer_admin, this->proxy_supplier_thread_count_, this->max_events_, this->delay_ ACE_ENV_ARG_PARAMETER); +} + +PortableServer::POA_ptr +TAO_Notify_ThreadPool_Consumer_Client::create_rt_poa (ACE_ENV_SINGLE_ARG_DECL) +{ + PortableServer::POA_var rt_poa; + + // Create an RT POA with a lane at the given priority. + CORBA::Policy_var priority_model_policy; + CORBA::Policy_var thread_pool_policy; + + CORBA::Policy_var activation_policy = + this->orb_objects_.root_poa_->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + // Create a priority model policy. + priority_model_policy = + this->orb_objects_.rt_orb_->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED + , 0 + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + CORBA::ULong stacksize = 0; + CORBA::ULong static_threads = 1; + CORBA::ULong dynamic_threads = 0; + RTCORBA::Priority default_priority = 0; + CORBA::Boolean allow_request_buffering = 0; + CORBA::ULong max_buffered_requests = 0; + CORBA::ULong max_request_buffer_size = 0; + + // Create the thread-pool. + RTCORBA::ThreadpoolId threadpool_id = + this->orb_objects_.rt_orb_->create_threadpool (stacksize, + static_threads, + dynamic_threads, + default_priority, + allow_request_buffering, + max_buffered_requests, + max_request_buffer_size + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + thread_pool_policy = + this->orb_objects_.rt_orb_->create_threadpool_policy (threadpool_id + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + CORBA::PolicyList poa_policy_list; + + poa_policy_list.length (3); + poa_policy_list[0] = priority_model_policy; + poa_policy_list[1] = activation_policy; + poa_policy_list[2] = thread_pool_policy; + + PortableServer::POAManager_var poa_manager = + this->orb_objects_.root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + rt_poa = this->orb_objects_.root_poa_->create_POA ("RT POA!", + poa_manager.in (), + poa_policy_list + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (rt_poa._retn ()); + + return rt_poa._retn (); +} + +void +TAO_Notify_ThreadPool_Consumer_Client::run (ACE_ENV_SINGLE_ARG_DECL) +{ + this->consumer_->run (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +void +TAO_Notify_ThreadPool_Consumer_Client::dump_stats (void) +{ + this->consumer_->dump_throughput (); +} + +int +TAO_Notify_ThreadPool_Consumer_Client::svc (void) +{ + ACE_TRY_NEW_ENV + { + // Initialize this threads priority. + this->orb_objects_.current_->the_priority (0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + this->_init (ACE_ENV_SINGLE_ARG_PARAMETER); //Init the Client + ACE_TRY_CHECK; + + this->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, + ACE_TEXT ("Supplier error ")); + + } + ACE_ENDTRY; + + return 0; +} + +int +main (int argc, char *argv []) +{ + ACE_TRY_NEW_ENV + { + // Initialize an ORB + CORBA::ORB_var orb = CORBA::ORB_init (argc, + argv, + "" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + TAO_Notify_ORB_Objects orb_objects; + + orb_objects.init (orb ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + TAO_Notify_ORB_Run_Task orb_run_task (orb_objects); + + TAO_Notify_ThreadPool_Consumer_Client client (orb_objects); + + if (client.parse_args (argc, argv) != 0) + { + ACE_DEBUG ((LM_DEBUG, "Consumer_Client::Error parsing options\n")); + return -1; + } + + long flags = THR_NEW_LWP | THR_JOINABLE; + + flags |= + orb->orb_core ()->orb_params ()->thread_creation_flags (); + + + if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1) + { + if (ACE_OS::last_error () == EPERM) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")), + -1); + else + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("(%t) Task activation at priority %d failed. \n"))); + } + + orb_run_task.thr_mgr ()->wait (); + client.thr_mgr ()->wait (); + + client.dump_stats (); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, + ACE_TEXT ("Consumer Client error ")); + } + ACE_ENDTRY; + + return 0; +} |