From 0fc18561a47269cf4431e42034607ccd415ea163 Mon Sep 17 00:00:00 2001 From: thrall Date: Fri, 14 Nov 2003 22:09:24 +0000 Subject: Split TimeoutConsumer (consumes timeouts and supplies events to the EC) into a TimeoutConsumer (consumes timeouts and notifies observers) and a Supplier (modified from original supplier, supplies events to the EC) --- TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp | 171 +++++++++++++++ TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h | 64 +++++- .../examples/RtEC/test_driver/TimeoutConsumer.cpp | 239 +++++---------------- .../examples/RtEC/test_driver/TimeoutConsumer.h | 77 +++---- 4 files changed, 317 insertions(+), 234 deletions(-) diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp index badc65b011d..c85c56c1a23 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp @@ -1,13 +1,184 @@ // $Id$ #include "Supplier.h" +#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory +#include "orbsvcs/Event_Service_Constants.h" ACE_RCSID(EC_Examples, Supplier, "$Id$") Supplier::Supplier (void) + : timeoutconsumer(this) + , _supplier(this) { } +Supplier::~Supplier() +{ +} + +void +Supplier::update(ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_DEBUG((LM_DEBUG,"Supplier %d (%P|%t) received update\n",this->_supplier_id)); + + if (this->_num_sent < this->_to_send) + { + //send this->_events + this->_consumer_proxy->push(this->_events ACE_ENV_ARG_PARAMETER); + + ++this->_num_sent; + ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send)); + if (this->_num_sent >= this->_to_send) + { + //just finished; only want to do this once! + ACE_DEBUG((LM_DEBUG,"RELEASE read lock from Supplier %d\n", + this->_supplier_id)); + this->_done->release(); + this->_hold_mtx = 0; + } + } + else + { + //do nothing + } +} + +void +Supplier::connect (ACE_RW_Mutex* done, + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventComm::EventSourceID supplier_id, + size_t to_send, + const RtecEventComm::EventSet& events, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + this->_supplier_id = supplier_id; + this->_to_send = to_send; + this->_num_sent = 0; + this->_hold_mtx = 0; + this->_done = done; + if (this->_done!= 0 && this->_num_sent_to_send) + { + int ret = done->acquire_read(); + if (ret == -1) + { + ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for Supplier: %s\n", + ACE_OS::strerror(errno))); + } else + { + ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for Supplier %d\n",this->_supplier_id)); + this->_hold_mtx = 1; + } + } else + { + ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for Supplier %d\n",this->_supplier_id)); + } + + this->_events.length(events.length()); + for (size_t i=0; i_events[i] = events[i]; //copy event to local set + this->_events[i].header.source = this->_supplier_id; //make sure event source is this + } + + //create supplier RT_Info + std::ostringstream supp_entry_pt; + supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point + ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str())); + RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + ACE_Time_Value tv (0,0); + TimeBase::TimeT tmp; + ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); + scheduler->set (rt_info, + criticality, + tmp,tmp,tmp, + period, + importance, + tmp, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Register as supplier of events + ACE_SupplierQOS_Factory supplierQOS; + for (size_t i=0; i_supplier_id, + events[i].header.type, + rt_info, + 1); + } + + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushSupplier_var supplierv = + this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy->connect_push_supplier (supplierv.in (), + supplierQOS.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG,"Supplier %d connected\n",this->_supplier_id)); + for (size_t i=0; itimeoutconsumer.connect(scheduler,supp_entry_pt.str().c_str(),period, + importance,criticality,ec ACE_ENV_ARG_PARAMETER); + + //Add Scheduler dependency between TimeoutConsumer and Supplier + scheduler->add_dependency (this->timeoutconsumer.get_RT_Info(), + rt_info, + 1, + RtecBase::TWO_WAY_CALL + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + +} + +void +Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + if (! CORBA::is_nil (this->_consumer_proxy.in ())) + { + this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + + // Deactivate the servant + PortableServer::POA_var poa = + this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + this->timeoutconsumer.disconnect(ACE_ENV_SINGLE_ARG_PARAMETER); +} + void Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)) diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h index b0391f7602b..3d31dce16d3 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h @@ -10,37 +10,62 @@ // Supplier // // = AUTHOR -// Carlos O'Ryan (coryan@cs.wustl.edu) +// Bryan Thrall (thrall@cse.wustl.edu) // // ============================================================================ #ifndef SUPPLIER_H #define SUPPLIER_H -#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventCommC.h" +#include "orbsvcs/RtecEventCommC.h" +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/Channel_Clients_T.h" +#include "ace/RW_Mutex.h" +#include "TimeoutConsumer.h" +#include //for ostringstream + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class Supplier : public POA_RtecEventComm::PushSupplier +class Supplier : Timeout_Observer { // = TITLE - // Simple supplier object + // Simple supplier object which responds to timeout events. // // = DESCRIPTION - // This class is a supplier of events. - // It simply register for two event typesone event type - // The class is just a helper to simplify common tasks in EC - // tests, such as subscribing for a range of events, disconnecting - // from the EC, informing the driver of shutdown messages, etc. + // This class is an event supplier which responds to EC timeouts. + // For each timeout event it is notified of (via a TimeoutConsumer object), + // it pushes a specified EventSet into the EC. // // There are several ways to connect and disconnect this class, // and it is up to the driver program to use the right one. // public: Supplier (void); - // Constructor + // Default Constructor. + + virtual ~Supplier (void); + + virtual void update(ACE_ENV_SINGLE_ARG_DECL); + + void connect (ACE_RW_Mutex* done, + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventComm::EventSourceID supplier_id, + size_t to_send, + const RtecEventComm::EventSet& events, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); + // This method connects the supplier to the EC. + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the EC. // = The RtecEventComm::PushSupplier methods @@ -49,6 +74,25 @@ public: // The skeleton methods. private: + size_t _to_send; //number of times to push on timeout + size_t _num_sent; //number of pushes so far + int _hold_mtx; //1 when hold _done mutex; 0 else + ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send + + TimeoutConsumer timeoutconsumer; + + RtecEventComm::EventSourceID _supplier_id; + // We generate an id based on the name.... + + RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy; + // We talk to the EC (as a supplier) using this proxy. + + ACE_PushSupplier_Adapter _supplier; + // We connect to the EC as a supplier so we can push events + // every time we receive a timeout event. + + RtecEventComm::EventSet _events; + // set of events to push when a timeout event is received. }; #endif /* SUPPLIER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp index 8bd30cc0e82..70e6198a29b 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp @@ -4,155 +4,63 @@ #include //for ostringstream -#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory -#include "orbsvcs/Event_Service_Constants.h" -#include "orbsvcs/RtecSchedulerC.h" -#include "orbsvcs/RtecEventCommC.h" -#include "ace/RW_Mutex.h" +#include "orbsvcs/Event_Utilities.h" //for ACE_ConsumerQOS_Factory +//#include "orbsvcs/Event_Service_Constants.h" +//#include "orbsvcs/RtecSchedulerC.h" +//#include "orbsvcs/RtecEventCommC.h" ACE_RCSID(EC_Examples, Consumer, "$Id$") -TimeoutConsumer::TimeoutConsumer (void) - : _to_send(0) - , _num_sent(0) - , _hold_mtx(0) - , _done(0) - , _supplier(this) +TimeoutConsumer::TimeoutConsumer (Timeout_Observer* obs) + : _observer(obs) , _consumer(this) - , _events(0) { } TimeoutConsumer::~TimeoutConsumer (void) { // TODO this->disconnect() ??? +} - if (this->_hold_mtx && this->_done!=0) - { - this->_done->release(); - this->_hold_mtx = 0; - } +RtecScheduler::handle_t +TimeoutConsumer::get_RT_Info(void) +{ + return this->_rt_info; } void -TimeoutConsumer::connect (ACE_RW_Mutex* done, - RtecScheduler::Scheduler_ptr scheduler, +TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler, const char *entry_prefix, TimeBase::TimeT period, RtecScheduler::Importance_t importance, RtecScheduler::Criticality_t criticality, - RtecEventComm::EventSourceID supplier_id, - size_t to_send, - const RtecEventComm::EventSet& events, RtecEventChannelAdmin::EventChannel_ptr ec ACE_ENV_ARG_DECL) { - this->_supplier_id = supplier_id; - this->_to_send = to_send; - this->_num_sent = 0; - this->_hold_mtx = 0; - this->_done = done; - if (this->_done!= 0 && this->_num_sent_to_send) - { - int ret = done->acquire_read(); - if (ret == -1) - { - ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for TimeoutConsumer: %s\n", - ACE_OS::strerror(errno))); - } else - { - ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for TimeoutConsumer %d\n",this->_supplier_id)); - this->_hold_mtx = 1; - } - } else - { - ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for TimeoutConsumer\n")); - } - - this->_events.length(events.length()); - for (size_t i=0; i_events[i] = events[i]; //copy event to local set - this->_events[i].header.source = this->_supplier_id; //make sure event source is this - } - - //create supplier RT_Info - std::ostringstream supp_entry_pt; - supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point - ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str())); - RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str() - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - ACE_Time_Value tv (0,0); - TimeBase::TimeT tmp; - ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv); - scheduler->set (rt_info, - criticality, - tmp,tmp,tmp, - period, - importance, - tmp, - 0, - RtecScheduler::OPERATION - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - // Register as supplier of events - ACE_SupplierQOS_Factory supplierQOS; - for (size_t i=0; i_supplier_id, - events[i].header.type, - rt_info, - 1); - } - - RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->_consumer_proxy = - supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - RtecEventComm::PushSupplier_var supplierv = - this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->_consumer_proxy->connect_push_supplier (supplierv.in (), - supplierQOS.get_SupplierQOS () - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as event supplier\n")); - for (size_t i=0; i_scheduler = scheduler; + //create consumer RT_Info std::ostringstream cons_entry_pt; - cons_entry_pt << entry_prefix << " Consumer"; //unique RT_Info entry point + cons_entry_pt << entry_prefix << " TimeoutConsumer"; //unique RT_Info entry point ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str())); - rt_info = scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); + this->_rt_info = this->_scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); ACE_CHECK; - scheduler->set (rt_info, - criticality, - period, period, period, - period, - importance, - period, - 0, - RtecScheduler::OPERATION - ACE_ENV_ARG_PARAMETER); + this->_scheduler->set (this->_rt_info, + criticality, + period, period, period, + period, + importance, + period, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); ACE_CHECK; // Register as consumer of timeout events ACE_ConsumerQOS_Factory timeoutQOS; timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT /*??*/, period, //TimeBase::TimeT - rt_info); + this->_rt_info); RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); @@ -180,26 +88,7 @@ TimeoutConsumer::connect (ACE_RW_Mutex* done, void TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) { - if (! CORBA::is_nil (this->_consumer_proxy.in ())) - { - this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->_consumer_proxy = - RtecEventChannelAdmin::ProxyPushConsumer::_nil (); - - // Deactivate the servant - PortableServer::POA_var poa = - this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - - //disconnect consumer ??? + //disconnect consumer if (! CORBA::is_nil (this->_supplier_proxy.in())) { this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); @@ -217,61 +106,45 @@ TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER); ACE_CHECK; } - - if (this->_hold_mtx && this->_done!=0) - { - this->_done->release(); - this->_hold_mtx = 0; - } } void TimeoutConsumer::push (const RtecEventComm::EventSet& events - ACE_ENV_ARG_DECL_NOT_USED) + ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { + RtecScheduler::RT_Info* info = this->_scheduler->get(this->_rt_info ACE_ENV_ARG_PARAMETER); + if (events.length () == 0) { - ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer (%P|%t) push but no events\n")); + ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer %s (%P|%t) push but no events\n",info->entry_point.in())); return; } - ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received %d events:\n",this->_supplier_id, + + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received %d events:\n",info->entry_point.in(), events.length())); - if (this->_num_sent < this->_to_send) - { - for (size_t i=0; i_supplier_id)); - - //TODO send this->_events - ++this->_num_sent; - ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send)); - if (this->_num_sent >= this->_to_send) - { - //done - ACE_DEBUG((LM_DEBUG,"RELEASE read lock from TimeoutConsumer %d\n", - this->_supplier_id)); - this->_done->release(); - this->_hold_mtx = 0; - } - } - else - { - int prio = -1; - ACE_hthread_t handle; - ACE_Thread::self(handle); - ACE_Thread::getprio(handle,prio); - //ACE_thread_t tid = ACE_Thread::self(); - ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer @%d (%P|%t) we received event type %d\n", - prio,events[0].header.type)); - } - } - } else + for (size_t i=0; ientry_point.in())); + if (this->_observer != 0) + { + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",info->entry_point.in())); + this->_observer->update(); + } + } + else + { + int prio = -1; + ACE_hthread_t handle; + ACE_Thread::self(handle); + ACE_Thread::getprio(handle,prio); + //ACE_thread_t tid = ACE_Thread::self(); + ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer %s @%d (%P|%t) we received event type %d\n", + info->entry_point.in(),prio,events[0].header.type)); + } } } @@ -281,12 +154,6 @@ TimeoutConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) { } -void -TimeoutConsumer::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) - ACE_THROW_SPEC ((CORBA::SystemException)) -{ -} - // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h index db55b9970c3..81b5010bea3 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h +++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h @@ -21,43 +21,59 @@ #include "orbsvcs/RtecEventCommC.h" #include "orbsvcs/RtecSchedulerC.h" #include "orbsvcs/Channel_Clients_T.h" -//#include "ace/Task.h" -//#include "ace/Synch.h" -#include "ace/RW_Mutex.h" +#include "TestConfig.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +// TODO: Convert to Observer (or other similar) pattern. + +class Timeout_Observer { + // = TITLE + // Interface for all TimeoutConsumer observers. + // + // = DESCRIPTION + // Any class which wants to receive notifications of timeout events + // needs to be a subclass of this interface. + +public: + virtual void update(ACE_ENV_SINGLE_ARG_DECL) = 0; + // Called by the TimeoutConsumer when a timeout occurs. + // + // For now, the notification is binary ("A timeout occurred"), but + // in the future, it should be useful to pass the event type(s) or + // even the events themselves. +}; + class TimeoutConsumer { // = TITLE - // Simple consumer object which responds to timeout events. + // Simple consumer object of timeout events. // - // = DESCRIPTION - // This class is a consumer of timeout events. - // For each timeout event it consumes, it pushes a specified EventSet into the EC. + // = DESCRIPTION This class is a consumer of timeout events. For + // each timeout event it consumes, it notifies its observer + // (specified in its constructor). // // There are several ways to connect and disconnect this class, // and it is up to the driver program to use the right one. // public: - TimeoutConsumer (void); - // Default Constructor. + TimeoutConsumer (Timeout_Observer* obs); + // For now, only handle a single observer. In the future, handle any number. + // Note that the TimeoutConsumer does NOT take ownership of the observer. virtual ~TimeoutConsumer (void); - void connect (ACE_RW_Mutex* done, - RtecScheduler::Scheduler_ptr scheduler, - const char *entry_prefix, - TimeBase::TimeT period, - RtecScheduler::Importance_t importance, - RtecScheduler::Criticality_t criticality, - RtecEventComm::EventSourceID supplier_id, - size_t to_send, - const RtecEventComm::EventSet& events, - RtecEventChannelAdmin::EventChannel_ptr ec - ACE_ENV_ARG_DECL); + RtecScheduler::handle_t get_RT_Info(void); + + void connect (RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); // This method connects the supplier to the EC. void disconnect (ACE_ENV_SINGLE_ARG_DECL); @@ -74,27 +90,12 @@ public: ACE_THROW_SPEC ((CORBA::SystemException)); // The skeleton methods. - // = The RtecEventComm::PushSupplier methods - - virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) - ACE_THROW_SPEC ((CORBA::SystemException)); - // The skeleton methods. - private: - size_t _to_send; //number of times to push on timeout - size_t _num_sent; //number of pushes so far - int _hold_mtx; //1 when hold _done mutex; 0 else - ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send - - RtecEventComm::EventSourceID _supplier_id; - // We generate an id based on the name.... + Timeout_Observer* _observer; - RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy; - // We talk to the EC (as a supplier) using this proxy. + RtecScheduler::handle_t _rt_info; - ACE_PushSupplier_Adapter _supplier; - // We connect to the EC as a supplier so we can push events - // every time we receive a timeout event. + RtecScheduler::Scheduler_ptr _scheduler; RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy; // We talk to the EC (as a consumer) using this proxy. -- cgit v1.2.1