From 8038a4b8d6dade77cf177c6cb466744faeaaed31 Mon Sep 17 00:00:00 2001 From: thrall Date: Wed, 28 Jan 2004 21:43:30 +0000 Subject: Fixed some of Stephen's changes, uncommented some stuff. Should be able to run consumers on the supplier-side with appropriate XML files --- TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp | 42 ++++++++++++---------- TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp | 10 ++---- .../examples/RtEC/test_driver/TimeoutConsumer.cpp | 10 +++--- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp index 814f89487c4..98b4b33263e 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp @@ -56,7 +56,8 @@ const char *supplier_schedule = const char *consumer_schedule = "consumer_schedule.out"; const char *remote_inet_addr = - "bhangra.doc.wustl.edu"; +"humpty.doc.wustl.edu"; +// "bhangra.doc.wustl.edu"; int remote_inet_port = 42424; template @@ -266,6 +267,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) ACE_OS::fclose (output_file); //now we block until the client writes its IOR + //this->barrier(true); this->barrier(true); //block forever so I can debug effectively! @@ -273,7 +275,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) } //CONSUMER writes IORs and blocks - if (cons_size > 0 && this->use_federated) + if (cons_size > 0 && supp_size==0 && this->use_federated) { //since there are consumers, we assume we are a consumer only //save IOR for federation @@ -307,7 +309,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) this->barrier(false); } - if (this->use_federated && cons_size>0) //only federate on consumer side + if (this->use_federated && cons_size>0 && supp_size==0) //only federate on consumer side { //gateway EC_Control does not appear to setup gateway on //activation, so we need to set it up BEFORE any consumers @@ -361,7 +363,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) anomalies.in (), supplier_schedule); } - if (this->consumers.size() > 0) + if (this->consumers.size() > 0 && this->suppliers.size()==0) { ACE_Scheduler_Factory::dump_schedule (infos.in (), deps.in(), @@ -420,18 +422,19 @@ ECConfig::run (void) //printf("data->orb = %p\n",(void*)(data->orb)); data->ready = &(this->ready); data->use_federated = this->use_federated; - data->is_server = this->suppliers.size()>0; //assume client if no suppliers - //int ret = inst->spawn(ECConfig::run_orb,data); + data->is_server = (this->suppliers.size()>0); //assume client if no suppliers + ACE_DEBUG((LM_DEBUG,"data->is_server? %u\tsuppliers size: %u\n",data->is_server,this->suppliers.size())); + int ret = inst->spawn(ECConfig::run_orb,data); //int ret = inst->spawn(ECConfig::run_orb,reactor); //no need for getting tid? - /* + if (ret == -1) { ACE_DEBUG ((LM_DEBUG, "ERROR: Couldn't spawn ORB->run() thread: %s\n", ACE_OS::strerror(errno))); return 1; } - */ + orb->run(); //this method returns when orb->shutdown() is called; then thread exits @@ -649,7 +652,7 @@ ECConfig::connect_suppliers (ACE_ENV_SINGLE_ARG_DECL) template void ECConfig::connect_consumers (ACE_ENV_SINGLE_ARG_DECL) { - //ACE_DEBUG((LM_DEBUG,"Consumers to connect: %d\n",this->consumers.size())); + ACE_DEBUG((LM_DEBUG,"Consumers to connect: %d\n",this->consumers.size())); //this->consumers already has correct size size_t cons_idx = 0; for (size_t i=0; itestcfgs.size(); ++i) @@ -736,7 +739,7 @@ ECConfig::print_RT_Infos (ACE_Array cfg_se //ACE_DEBUG ((LM_DEBUG, "\n")); } /* - ACE_DEBUG ((LM_DEBUG, rt_info_format, + ACE_DEBUG ((LM_DEBUG, rt_info_format, (const char *) info.entry_point, info.handle, ACE_CU64_TO_CU32 (info.worst_case_execution_time), @@ -779,12 +782,12 @@ ECConfig::barrier(bool is_supplier) addr_str += port_str; remote_inet_port++; //increment port for next barrier - if (is_supplier) + if (is_supplier == 1) { //now we block on a socket connect until a consumer opens it //this way, we don't start running until the consumer is ready ACE_SOCK_Stream accstrm; - //ACE_DEBUG((LM_DEBUG,"Opening supplier socket %s\n",addr_str.c_str())); + ACE_DEBUG((LM_DEBUG,"Opening supplier socket %s\n",addr_str.c_str())); ACE_INET_Addr addr(addr_str.c_str()); ACE_SOCK_Acceptor acc(addr); if (acc.accept(accstrm,&addr) != 0) //blocks until consumer opens @@ -793,17 +796,17 @@ ECConfig::barrier(bool is_supplier) "Cannot accept socket: %s\n", ACE_OS::strerror(errno))); } - //ACE_DEBUG((LM_DEBUG,"Supplier: unblocked on socket\n")); + ACE_DEBUG((LM_DEBUG,"Supplier: unblocked on socket\n")); //once opened, no need for socket any more acc.close(); accstrm.close(); - //ACE_DEBUG((LM_DEBUG, "Supplier: closed socket\n")); + ACE_DEBUG((LM_DEBUG, "Supplier: closed socket\n")); } else { //now we open a socket to start up the supplier - //ACE_DEBUG((LM_DEBUG,"Connecting consumer socket %s\n",addr_str.c_str())); + ACE_DEBUG((LM_DEBUG,"Connecting consumer socket %s\n",addr_str.c_str())); ACE_SOCK_Stream connstrm; ACE_INET_Addr addr(addr_str.c_str()); ACE_SOCK_Connector conn; @@ -813,11 +816,11 @@ ECConfig::barrier(bool is_supplier) "Consumer cannot connect socket: %s\n", ACE_OS::strerror(errno))); } - //ACE_DEBUG((LM_DEBUG,"Consumer: connected socket\n")); + ACE_DEBUG((LM_DEBUG,"Consumer: connected socket\n")); //once opened, no need for socket any more connstrm.close(); - //ACE_DEBUG((LM_DEBUG, "Consumer: closed socket\n")); + ACE_DEBUG((LM_DEBUG, "Consumer: closed socket\n")); } } @@ -840,7 +843,8 @@ ECConfig::run_orb(void *data) if (data_ptr->use_federated) { //assume if have no suppliers that we are client - ACE_DEBUG((LM_DEBUG,"Barrier to wait until both apps configured and orbs running\n")); + ACE_DEBUG((LM_DEBUG,"Barrier to wait until both apps configured and orbs running (server? %u %d)\n", + data_ptr->is_server,data_ptr->is_server)); //not really sure orbs running, but certainly nothing else between the spawn and the run at this point! ECConfig::barrier(data_ptr->is_server); //wait until both apps are ready to run *(data_ptr->ready) = 1; //checked by suppliers to start reacting to timeouts @@ -879,7 +883,7 @@ ECConfig::run_orb(void *data) //ACE_DEBUG((LM_DEBUG,"DONE; stopping reactor event loop\n")); ACE_Reactor::instance()->end_reactor_event_loop(); */ - //ACE_DEBUG((LM_DEBUG,"ORB Thread exiting\n")); + ACE_DEBUG((LM_DEBUG,"ORB Thread exiting\n")); return 0; } diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp index 16468e1846e..ba58c062e94 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp @@ -20,9 +20,7 @@ Supplier::~Supplier() void Supplier::update(ACE_ENV_SINGLE_ARG_DECL) { - /* ACE_DEBUG((LM_DEBUG,"Supplier %d (%P|%t) received update\n",this->_supplier_id)); - */ //only react to update if ready=1 if (*this->_ready == 1 && this->_num_sent < this->_to_send) @@ -34,16 +32,12 @@ Supplier::update(ACE_ENV_SINGLE_ARG_DECL) this->_consumer_proxy->push(this->_events ACE_ENV_ARG_PARAMETER); ++this->_num_sent; - /* ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send)); - */ if (this->_num_sent >= this->_to_send) { //just finished; only want to do this once! - /* ACE_DEBUG((LM_DEBUG,"RELEASE read lock from Supplier %d\n", this->_supplier_id)); - */ this->_done->release(); this->_hold_mtx = 0; } @@ -151,8 +145,8 @@ Supplier::connect (int *ready, ACE_ENV_ARG_PARAMETER); ACE_DEBUG((LM_DEBUG, "Supplier (%t) ")); printf("object pointer (%p) ---> push_consumer (%p)\n", - this, - this->_consumer_proxy.in()); + this, + this->_consumer_proxy.in()); ACE_CHECK; diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp index 37e5af7ccd9..1e661a1cf7d 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp @@ -177,10 +177,10 @@ TimeoutConsumer::push (const RtecEventComm::EventSet& events { if (ACE_ES_EVENT_INTERVAL_TIMEOUT == events[i].header.type) { - // ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received timeout event\n",this->entry_pt.str().c_str())); + //ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received timeout event\n",this->entry_pt.str().c_str())); if (this->_observer != 0) { - // ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",this->entry_pt.str().c_str())); + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",this->entry_pt.str().c_str())); this->_observer->update(); } } @@ -190,9 +190,9 @@ TimeoutConsumer::push (const RtecEventComm::EventSet& events ACE_hthread_t handle; ACE_Thread::self(handle); ACE_Thread::getprio(handle,prio); - //ACE_thread_t tid = ACE_Thread::self(); -// ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer %s @%d (%P|%t) we received event type %d\n", -// this->entry_pt.str().c_str(),prio,events[0].header.type)); + ACE_thread_t tid = ACE_Thread::self(); + ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer %s @%d (%P|%t) we received event type %d\n", + this->entry_pt.str().c_str(),prio,events[0].header.type)); } } } -- cgit v1.2.1