From 8908b74b2e1a848d809a4d1ec478f234194c362f Mon Sep 17 00:00:00 2001 From: thrall Date: Wed, 19 May 2004 16:26:06 +0000 Subject: Replaced EC_Event_Limit with timeout in orb->run(), moved START and STOP events to right next to orb->run() --- .../dynamic_topology_test/Consumer_EC.cpp | 81 ++++++------------- .../dynamic_topology_test/Consumer_Supplier_EC.cpp | 66 +++++---------- .../dynamic_topology_test/Kokyu_EC.cpp | 94 +++++++++++++++++++--- .../dynamic_topology_test/Supplier_EC.cpp | 66 +++++---------- 4 files changed, 148 insertions(+), 159 deletions(-) diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp index 605d63e8092..d0182f04b00 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp @@ -13,7 +13,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" -#include "orbsvcs/Event/EC_Event_Limit.h" +#include "tao/ORB_Core.h" #include "Kokyu_EC.h" #include "Consumer.h" @@ -32,7 +32,7 @@ namespace ACE_CString sched_type ="rms"; FILE * ior_output_file; } - +/* class Once_Handler: public Service_Handler { public: @@ -74,27 +74,6 @@ public: ACE_DEBUG((LM_DEBUG,"Once_Handler (%P|%t) handle_service_start() START\n")); this->handled_start_++; //set to true - // Uncommenting this causes the Supplier_EC event type 18 to never be pushed again (despite the timeout happening) - //trigger Task 3! - /* - kokyu_ec_->add_timeout_consumer( - supplier_impl_, - timeout_handler_impl_, - timeout_entry_point_, - period_, - crit_, - imp_ - ACE_ENV_ARG_PARAMETER - ); - ACE_CHECK; - - //should be able to just call Kokyu_EC::start() to recompute schedule - //BEWARE if kokyu_ec_ overrides start() to do stuff we don't want to redo! - //which is why we specify the Kokyu_EC version of the function! - kokyu_ec_->Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - */ - //WARNING: depending on Reactor, might not be a RT solution! this->timer_handle_ = this->reactor_->schedule_timer(this->timeout_handler_impl_, @@ -127,15 +106,8 @@ private: ACE_Time_Value period_; long timer_handle_; - /* - Timeout_Consumer * timeout_consumer_impl_; - Supplier * supplier_impl_; - const char * timeout_entry_point_; - RtecScheduler::Criticality_t crit_; - RtecScheduler::Importance_t imp_; - Kokyu_EC * kokyu_ec_; - */ }; +*/ class Consumer_EC : public Kokyu_EC { @@ -199,24 +171,15 @@ int main (int argc, char* argv[]) { //TAO_EC_Default_Factory::init_svcs (); -#ifdef ACE_HAS_DSUI - ds_control* ds_cntl = new ds_control ("Dynamic_Test_Consumer","consumer_enabled.dsui"); -#endif // ACE_HAS_DSUI - TAO_EC_Kokyu_Factory::init_svcs (); - //@BT -#ifdef ACE_HAS_DSUI - // ACE_Object_Counter::object_id oid = ACE_OBJECT_COUNTER->increment(); - // DSTRM_EVENT(MAIN_GROUP_FAM, START, 1, sizeof(EC_Event_Counter::event_id), (char*)&eid); - ACE_Time_Value now(ACE_OS::gettimeofday()); - ACE_OS::printf("Consumer_EC START at %isec %iusec\n",now.sec(),now.usec()); - DSTRM_EVENT(MAIN_GROUP_FAM, START, 0, 0, NULL); -#endif //ACE_HAS_DSUI - ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { +#ifdef ACE_HAS_DSUI + ds_control ds_cntl("Dynamic_Test_Consumer","consumer_enabled.dsui"); +#endif // ACE_HAS_DSUI + // ORB initialization boiler plate... CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); @@ -291,24 +254,28 @@ main (int argc, char* argv[]) //DSTRM_EVENT (MAIN_GROUP_FAM, WORKER_ACTIVATED, 1, 0, NULL); ACE_DEBUG((LM_DEBUG,"Consumer_EC thread %t WORKER_ACTIVATED at %u\n",ACE_OS::gettimeofday().msec())); DSTRM_EVENT (MAIN_GROUP_FAM, WORKER_ACTIVATED, 0, 0, NULL); -#endif //ACE_HAS_DSUI -#ifdef ACE_HAS_DSUI - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance(), ds_cntl); -#else - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance()); + //@BT + // ACE_Object_Counter::object_id oid = ACE_OBJECT_COUNTER->increment(); + // DSTRM_EVENT(MAIN_GROUP_FAM, START, 1, sizeof(EC_Event_Counter::event_id), (char*)&eid); + ACE_Time_Value now(ACE_OS::gettimeofday()); + ACE_OS::printf("Consumer_EC START at %isec %iusec\n",now.sec(),now.usec()); + DSTRM_EVENT(MAIN_GROUP_FAM, START, 0, 0, NULL); #endif //ACE_HAS_DSUI - ACE_Time_Value ticker (300); - long timer_id = rt.reactor()->schedule_timer(e_limit,0, ticker); - if (timer_id < 0) - { - ACE_DEBUG((LM_DEBUG,"Consumer_EC (%t) Could not schedule EC_Event_Limit timeout\n")); - } rt.activate(); //need thread creation flags? or priority? - orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_Time_Value stop_time(310,0); + orb->run (stop_time ACE_ENV_ARG_PARAMETER); + //orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; +#ifdef ACE_HAS_DSUI + //@BT + //DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); + ACE_DEBUG((LM_DEBUG,"Consumer_EC thread %t STOP at %u\n",ACE_OS::gettimeofday().msec())); + DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); +#endif //ACE_HAS_DSUI + // **************************************************************** // We should do a lot of cleanup (disconnect from the EC, @@ -318,7 +285,7 @@ main (int argc, char* argv[]) } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer_EC - Service"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer_EC"); return 1; } ACE_ENDTRY; diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp index 7130fabaffc..bd3e89bdc49 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp @@ -12,7 +12,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" -#include "orbsvcs/Event/EC_Event_Limit.h" +#include "tao/ORB_Core.h" #include "Kokyu_EC.h" #include "Consumer.h" @@ -199,21 +199,16 @@ int main (int argc, char* argv[]) { //TAO_EC_Default_Factory::init_svcs (); -#ifdef ACE_HAS_DSUI - ds_control* ds_cntl = new ds_control ("Dynamic_Test_Consumer_Supplier","consumer_supplier_enabled.dsui"); -#endif //ACE_HAS_DSUI - TAO_EC_Kokyu_Factory::init_svcs (); TAO_EC_Gateway_IIOP_Factory::init_svcs (); - //@BT - ACE_Time_Value now(ACE_OS::gettimeofday()); - ACE_OS::printf("Consumer_Supplier_EC START at %isec %iusec\n",now.sec(),now.usec()); - DSTRM_EVENT(MAIN_GROUP_FAM, START,0,0,NULL); - ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { +#ifdef ACE_HAS_DSUI + ds_control ds_cntl("Dynamic_Test_Consumer_Supplier","consumer_supplier_enabled.dsui"); +#endif //ACE_HAS_DSUI + // ORB initialization boiler plate... CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); @@ -221,10 +216,6 @@ main (int argc, char* argv[]) if (parse_args (argc, argv) == -1) { - ACE_ERROR ((LM_ERROR, - "Usage: %s -s " - "\n", - argv [0])); return 1; } @@ -288,56 +279,39 @@ main (int argc, char* argv[]) DSTRM_EVENT (WORKER_GROUP_FAM, WORKER_STARTED, 0, 0, NULL); #ifdef ACE_HAS_DSUI - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance(), ds_cntl); -#else - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance()); + //@BT + ACE_Time_Value now(ACE_OS::gettimeofday()); + ACE_OS::printf("Consumer_Supplier_EC START at %isec %iusec\n",now.sec(),now.usec()); + DSTRM_EVENT(MAIN_GROUP_FAM, START,0,0,NULL); #endif //ACE_HAS_DSUI - ACE_Time_Value ticker (305); - //orb->orb_core()->reactor()->schedule_timer(e_limit,0, ticker); - long timer_id = rt.reactor()->schedule_timer(e_limit,0,ticker); - if (timer_id < 0) - { - ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC (%t) could not schedule EC_Event_Limit timer\n")); - } rt.activate(); //need thread creation flags? or priority? - orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_Time_Value stop_time(305,0); + orb->run (stop_time ACE_ENV_ARG_PARAMETER); + //orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; - // **************************************************************** - - //@BT: ORB shutting down; currently, this isn't expected to happen - //DSTRM_EVENT (MAIN_GROUP_FAM, CALL_SERVER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC thread %t CALL_SERVER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, CALL_SERVER_SHUTDOWN, 0, 0, NULL); +#ifdef ACE_HAS_DSUI + //@BT + //DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); + ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC thread %t STOP at %u\n",ACE_OS::gettimeofday().msec())); + DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); +#endif //ACE_HAS_DSUI - //@BT: Scheduler shuts down with the EC and ORB - //DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC thread %t SCHEDULER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_SHUTDOWN, 0, 0, NULL); + // **************************************************************** // We should do a lot of cleanup (disconnect from the EC, // deactivate all the objects with the POA, etc.) but this is // just a simple demo so we are going to be lazy. - //@BT: Done clean up - //DSTRM_EVENT (MAIN_GROUP_FAM, AFTER_SERVER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC thread %t AFTER_SERVER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, AFTER_SERVER_SHUTDOWN, 0, 0, NULL); - } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Consumer_Supplier_EC"); return 1; } ACE_ENDTRY; - //@BT - //DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Consumer_Supplier_EC thread %t STOP at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); - return 0; } diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp index 9d66d8d7a97..3d75827452c 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp @@ -1,19 +1,21 @@ // $Id$ -#include "Kokyu_EC.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include //for ACE_OS::strcasecmp -#include // for ACE_OS::gettimeofday - +#include "orbsvcs/Sched/Reconfig_Scheduler.h" +#include "orbsvcs/Runtime_Scheduler.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Scheduler_Factory.h" +#include "orbsvcs/Event/EC_Event_Channel.h" +#include "orbsvcs/Event/EC_Default_Factory.h" +#include "orbsvcs/Event/EC_Kokyu_Factory.h" +#include "ace/OS_NS_strings.h" //for ACE_OS::strcasecmp +#include "ace/OS_NS_sys_time.h" // for ACE_OS::gettimeofday #include +#include "Kokyu/Dispatch_Deferrer.h" + +#include "Kokyu_EC.h" + namespace { typedef TAO_Reconfig_Scheduler RECONFIG_RMS_SCHED_TYPE; @@ -562,3 +564,71 @@ Kokyu_EC::add_consumer( //ACE_DEBUG((LM_DEBUG,"Kokyu_EC add_consumer() DONE\n")); } //add_consumer() + +//************************************************************* + +Reactor_Task::Reactor_Task (void) + : initialized_(0) + , react_(0) +{ +} + +Reactor_Task::~Reactor_Task (void) +{ +} + +int +Reactor_Task::initialize(void) +{ + /* + //We need to set the ACE_Reactor::instance() to be the ORB + //reactor so Kokyu's RG implementation can use it w/o creating + //an extra thread to run the reactor event loop. I hope this + //doesn't screw something else up! + //use Select_Reactor explicitly? + ACE_Reactor *reactor; //TODO: how clean up reactor and stop thread? + ACE_NEW_RETURN(reactor, + ACE_Reactor, + -1); + reactor->open(ACE_Select_Reactor_Impl::DEFAULT_SIZE); + ACE_Reactor::instance(reactor); + + this->react_ = reactor; + */ + //this->react_ = ACE_Reactor::instance(); + this->react_ = Kokyu::Dispatch_Deferrer::Singleton_Reactor::instance(); + //assume reactor is already opened! + + this->initialized_ = 1; + + return 0; +} //initialize() + +ACE_Reactor * +Reactor_Task::reactor(void) +{ + return this->react_; +} + +/// Process the events in the queue. +int +Reactor_Task::svc (void) +{ + ACE_DEBUG((LM_DEBUG,"Reactor_Task (%P|%t) svc(): ENTER\n")); + + if (!this->initialized_) + { + this->initialize(); + } + + this->react_->owner(ACE_Thread::self()); //set this thread as owner + + int err = this->react_->run_reactor_event_loop(); + if (err < 0) + { + ACE_DEBUG((LM_ERROR,"Reactor_Task (%t) error running Reactor event loop\n")); + } + + ACE_DEBUG((LM_DEBUG,"Reactor_Task (%P|%t) svc(): LEAVE\n")); + return 0; +} //svc() diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp index d9c21381752..b3c89ce15bb 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp @@ -14,7 +14,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" -#include "orbsvcs/Event/EC_Event_Limit.h" +#include "tao/ORB_Core.h" #include "Kokyu_EC.h" #include "Consumer.h" @@ -232,7 +232,7 @@ public: this->handler_ = mode_handler; ACE_NEW(timeout_consumer_impl1_1, Timeout_Consumer(supplier_impl1_1)); - ACE_Time_Value tv(10,200000); //period DEBUG: set to much longer period + ACE_Time_Value tv(1,200000); //period DEBUG: set to much longer period add_supplier_with_timeout(supplier_impl1_1, "supplier1_1", supp1_1_types, @@ -298,22 +298,16 @@ int main (int argc, char* argv[]) { //TAO_EC_Default_Factory::init_svcs (); -#ifdef ACE_HAS_DSUI - ds_control* ds_cntl = new ds_control ("Dynamic_Test_Supplier","supplier_enabled.dsui"); -#endif //ACE_HAS_DSUI - TAO_EC_Kokyu_Factory::init_svcs (); TAO_EC_Gateway_IIOP_Factory::init_svcs (); - //@BT - //DSTRM_EVENT(MAIN_GROUP_FAM, START,1,0,NULL); - ACE_Time_Value now(ACE_OS::gettimeofday()); - ACE_OS::printf("Supplier_EC START at %isec %iusec\n",now.sec(),now.usec()); - DSTRM_EVENT(MAIN_GROUP_FAM, START,0,0,NULL); - ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { +#ifdef ACE_HAS_DSUI + ds_control ds_cntl("Dynamic_Test_Supplier","supplier_enabled.dsui"); +#endif //ACE_HAS_DSUI + // ORB initialization boiler plate... CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); @@ -388,56 +382,40 @@ main (int argc, char* argv[]) DSTRM_EVENT (WORKER_GROUP_FAM, WORKER_STARTED, 0, 0, NULL); #ifdef ACE_HAS_DSUI - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance(), ds_cntl); -#else - EC_Event_Limit* e_limit = new EC_Event_Limit (TAO_ORB_Core_instance()); + //@BT + //DSTRM_EVENT(MAIN_GROUP_FAM, START,1,0,NULL); + ACE_Time_Value now(ACE_OS::gettimeofday()); + ACE_OS::printf("Supplier_EC START at %isec %iusec\n",now.sec(),now.usec()); + DSTRM_EVENT(MAIN_GROUP_FAM, START,0,0,NULL); #endif //ACE_HAS_DSUI - ACE_Time_Value ticker (120); - //orb->orb_core()->reactor()->schedule_timer(e_limit,0, ticker); - long timer_id = rt.reactor()->schedule_timer(e_limit,0,ticker); - if (timer_id < 0) - { - ACE_DEBUG((LM_DEBUG,"Supplier_EC (%t) could not schedule EC_Event_Limit timer\n")); - } rt.activate(); //need thread creation flags? or priority? - orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_Time_Value stop_time(300,0); + orb->run (stop_time ACE_ENV_ARG_PARAMETER); + //orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; - // **************************************************************** - - //@BT: ORB shutting down; currently, this isn't expected to happen - //DSTRM_EVENT (MAIN_GROUP_FAM, CALL_SERVER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Supplier_EC thread %t CALL_SERVER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, CALL_SERVER_SHUTDOWN, 0, 0, NULL); +#ifdef ACE_HAS_DSUI + //@BT + //DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); + ACE_DEBUG((LM_DEBUG,"Supplier_EC thread %t STOP at %u\n",ACE_OS::gettimeofday().msec())); + DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); +#endif //ACE_HAS_DSUI - //@BT: Scheduler shuts down with the EC and ORB - //DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Supplier_EC thread %t SCHEDULER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_SHUTDOWN, 0, 0, NULL); + // **************************************************************** // We should do a lot of cleanup (disconnect from the EC, // deactivate all the objects with the POA, etc.) but this is // just a simple demo so we are going to be lazy. - //@BT: Done clean up - //DSTRM_EVENT (MAIN_GROUP_FAM, AFTER_SERVER_SHUTDOWN, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Supplier_EC thread %t AFTER_SERVER_SHUTDOWN at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT (MAIN_GROUP_FAM, AFTER_SERVER_SHUTDOWN, 0, 0, NULL); - } ACE_CATCHANY { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Service"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Supplier_EC"); return 1; } ACE_ENDTRY; - //@BT - //DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); - ACE_DEBUG((LM_DEBUG,"Supplier_EC thread %t STOP at %u\n",ACE_OS::gettimeofday().msec())); - DSTRM_EVENT(MAIN_GROUP_FAM, STOP, 1, 0, NULL); - return 0; } -- cgit v1.2.1