summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-11-14 22:09:24 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-11-14 22:09:24 +0000
commit0fc18561a47269cf4431e42034607ccd415ea163 (patch)
treecfd043a768885c568267a946b84cb3f0839e995c
parent47fbd5a91a3e9dd3ef2693fcbbef8c6d1b535ac7 (diff)
downloadATCD-0fc18561a47269cf4431e42034607ccd415ea163.tar.gz
Split TimeoutConsumer (consumes timeouts and supplies events to the EC) into a TimeoutConsumer (consumes timeouts and notifies observers) and a Supplier (modified from original supplier, supplies events to the EC)
-rw-r--r--TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp171
-rw-r--r--TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h64
-rw-r--r--TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp239
-rw-r--r--TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h77
4 files changed, 317 insertions, 234 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp
index badc65b011d..c85c56c1a23 100644
--- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp
+++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp
@@ -1,13 +1,184 @@
// $Id$
#include "Supplier.h"
+#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory
+#include "orbsvcs/Event_Service_Constants.h"
ACE_RCSID(EC_Examples, Supplier, "$Id$")
Supplier::Supplier (void)
+ : timeoutconsumer(this)
+ , _supplier(this)
{
}
+Supplier::~Supplier()
+{
+}
+
+void
+Supplier::update(ACE_ENV_SINGLE_ARG_DECL)
+{
+ ACE_DEBUG((LM_DEBUG,"Supplier %d (%P|%t) received update\n",this->_supplier_id));
+
+ if (this->_num_sent < this->_to_send)
+ {
+ //send this->_events
+ this->_consumer_proxy->push(this->_events ACE_ENV_ARG_PARAMETER);
+
+ ++this->_num_sent;
+ ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send));
+ if (this->_num_sent >= this->_to_send)
+ {
+ //just finished; only want to do this once!
+ ACE_DEBUG((LM_DEBUG,"RELEASE read lock from Supplier %d\n",
+ this->_supplier_id));
+ this->_done->release();
+ this->_hold_mtx = 0;
+ }
+ }
+ else
+ {
+ //do nothing
+ }
+}
+
+void
+Supplier::connect (ACE_RW_Mutex* done,
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventComm::EventSourceID supplier_id,
+ size_t to_send,
+ const RtecEventComm::EventSet& events,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL)
+{
+ this->_supplier_id = supplier_id;
+ this->_to_send = to_send;
+ this->_num_sent = 0;
+ this->_hold_mtx = 0;
+ this->_done = done;
+ if (this->_done!= 0 && this->_num_sent<this->_to_send)
+ {
+ int ret = done->acquire_read();
+ if (ret == -1)
+ {
+ ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for Supplier: %s\n",
+ ACE_OS::strerror(errno)));
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for Supplier %d\n",this->_supplier_id));
+ this->_hold_mtx = 1;
+ }
+ } else
+ {
+ ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for Supplier %d\n",this->_supplier_id));
+ }
+
+ this->_events.length(events.length());
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ this->_events[i] = events[i]; //copy event to local set
+ this->_events[i].header.source = this->_supplier_id; //make sure event source is this
+ }
+
+ //create supplier RT_Info
+ std::ostringstream supp_entry_pt;
+ supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point
+ ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str()));
+ RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ACE_Time_Value tv (0,0);
+ TimeBase::TimeT tmp;
+ ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv);
+ scheduler->set (rt_info,
+ criticality,
+ tmp,tmp,tmp,
+ period,
+ importance,
+ tmp,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Register as supplier of events
+ ACE_SupplierQOS_Factory supplierQOS;
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ //insert type for each event
+ supplierQOS.insert (this->_supplier_id,
+ events[i].header.type,
+ rt_info,
+ 1);
+ }
+
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy =
+ supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RtecEventComm::PushSupplier_var supplierv =
+ this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy->connect_push_supplier (supplierv.in (),
+ supplierQOS.get_SupplierQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG((LM_DEBUG,"Supplier %d connected\n",this->_supplier_id));
+ for (size_t i=0; i<events.length(); ++i)
+ {
+ ACE_DEBUG((LM_DEBUG,"\tEvent Type: %d\n",events[i].header.type));
+ }
+
+ //connect TimeoutConsumer for timeouts.
+ this->timeoutconsumer.connect(scheduler,supp_entry_pt.str().c_str(),period,
+ importance,criticality,ec ACE_ENV_ARG_PARAMETER);
+
+ //Add Scheduler dependency between TimeoutConsumer and Supplier
+ scheduler->add_dependency (this->timeoutconsumer.get_RT_Info(),
+ rt_info,
+ 1,
+ RtecBase::TWO_WAY_CALL
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+}
+
+void
+Supplier::disconnect (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (! CORBA::is_nil (this->_consumer_proxy.in ()))
+ {
+ this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->_consumer_proxy =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+
+ // Deactivate the servant
+ PortableServer::POA_var poa =
+ this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+ this->timeoutconsumer.disconnect(ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
void
Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h
index b0391f7602b..3d31dce16d3 100644
--- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h
+++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.h
@@ -10,37 +10,62 @@
// Supplier
//
// = AUTHOR
-// Carlos O'Ryan (coryan@cs.wustl.edu)
+// Bryan Thrall (thrall@cse.wustl.edu)
//
// ============================================================================
#ifndef SUPPLIER_H
#define SUPPLIER_H
-#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/RtecEventCommC.h"
+#include "orbsvcs/RtecEventCommC.h"
+#include "orbsvcs/RtecSchedulerC.h"
+#include "orbsvcs/Channel_Clients_T.h"
+#include "ace/RW_Mutex.h"
+#include "TimeoutConsumer.h"
+#include <sstream> //for ostringstream
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class Supplier : public POA_RtecEventComm::PushSupplier
+class Supplier : Timeout_Observer
{
// = TITLE
- // Simple supplier object
+ // Simple supplier object which responds to timeout events.
//
// = DESCRIPTION
- // This class is a supplier of events.
- // It simply register for two event typesone event type
- // The class is just a helper to simplify common tasks in EC
- // tests, such as subscribing for a range of events, disconnecting
- // from the EC, informing the driver of shutdown messages, etc.
+ // This class is an event supplier which responds to EC timeouts.
+ // For each timeout event it is notified of (via a TimeoutConsumer object),
+ // it pushes a specified EventSet into the EC.
//
// There are several ways to connect and disconnect this class,
// and it is up to the driver program to use the right one.
//
public:
Supplier (void);
- // Constructor
+ // Default Constructor.
+
+ virtual ~Supplier (void);
+
+ virtual void update(ACE_ENV_SINGLE_ARG_DECL);
+
+ void connect (ACE_RW_Mutex* done,
+ RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventComm::EventSourceID supplier_id,
+ size_t to_send,
+ const RtecEventComm::EventSet& events,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
+ // This method connects the supplier to the EC.
+
+ void disconnect (ACE_ENV_SINGLE_ARG_DECL);
+ // Disconnect from the EC.
// = The RtecEventComm::PushSupplier methods
@@ -49,6 +74,25 @@ public:
// The skeleton methods.
private:
+ size_t _to_send; //number of times to push on timeout
+ size_t _num_sent; //number of pushes so far
+ int _hold_mtx; //1 when hold _done mutex; 0 else
+ ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send
+
+ TimeoutConsumer timeoutconsumer;
+
+ RtecEventComm::EventSourceID _supplier_id;
+ // We generate an id based on the name....
+
+ RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy;
+ // We talk to the EC (as a supplier) using this proxy.
+
+ ACE_PushSupplier_Adapter<Supplier> _supplier;
+ // We connect to the EC as a supplier so we can push events
+ // every time we receive a timeout event.
+
+ RtecEventComm::EventSet _events;
+ // set of events to push when a timeout event is received.
};
#endif /* SUPPLIER_H */
diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp
index 8bd30cc0e82..70e6198a29b 100644
--- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp
+++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp
@@ -4,155 +4,63 @@
#include <sstream> //for ostringstream
-#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory
-#include "orbsvcs/Event_Service_Constants.h"
-#include "orbsvcs/RtecSchedulerC.h"
-#include "orbsvcs/RtecEventCommC.h"
-#include "ace/RW_Mutex.h"
+#include "orbsvcs/Event_Utilities.h" //for ACE_ConsumerQOS_Factory
+//#include "orbsvcs/Event_Service_Constants.h"
+//#include "orbsvcs/RtecSchedulerC.h"
+//#include "orbsvcs/RtecEventCommC.h"
ACE_RCSID(EC_Examples, Consumer, "$Id$")
-TimeoutConsumer::TimeoutConsumer (void)
- : _to_send(0)
- , _num_sent(0)
- , _hold_mtx(0)
- , _done(0)
- , _supplier(this)
+TimeoutConsumer::TimeoutConsumer (Timeout_Observer* obs)
+ : _observer(obs)
, _consumer(this)
- , _events(0)
{
}
TimeoutConsumer::~TimeoutConsumer (void)
{
// TODO this->disconnect() ???
+}
- if (this->_hold_mtx && this->_done!=0)
- {
- this->_done->release();
- this->_hold_mtx = 0;
- }
+RtecScheduler::handle_t
+TimeoutConsumer::get_RT_Info(void)
+{
+ return this->_rt_info;
}
void
-TimeoutConsumer::connect (ACE_RW_Mutex* done,
- RtecScheduler::Scheduler_ptr scheduler,
+TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler,
const char *entry_prefix,
TimeBase::TimeT period,
RtecScheduler::Importance_t importance,
RtecScheduler::Criticality_t criticality,
- RtecEventComm::EventSourceID supplier_id,
- size_t to_send,
- const RtecEventComm::EventSet& events,
RtecEventChannelAdmin::EventChannel_ptr ec
ACE_ENV_ARG_DECL)
{
- this->_supplier_id = supplier_id;
- this->_to_send = to_send;
- this->_num_sent = 0;
- this->_hold_mtx = 0;
- this->_done = done;
- if (this->_done!= 0 && this->_num_sent<this->_to_send)
- {
- int ret = done->acquire_read();
- if (ret == -1)
- {
- ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for TimeoutConsumer: %s\n",
- ACE_OS::strerror(errno)));
- } else
- {
- ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for TimeoutConsumer %d\n",this->_supplier_id));
- this->_hold_mtx = 1;
- }
- } else
- {
- ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for TimeoutConsumer\n"));
- }
-
- this->_events.length(events.length());
- for (size_t i=0; i<events.length(); ++i)
- {
- this->_events[i] = events[i]; //copy event to local set
- this->_events[i].header.source = this->_supplier_id; //make sure event source is this
- }
-
- //create supplier RT_Info
- std::ostringstream supp_entry_pt;
- supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point
- ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str()));
- RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- ACE_Time_Value tv (0,0);
- TimeBase::TimeT tmp;
- ORBSVCS_Time::Time_Value_to_TimeT (tmp, tv);
- scheduler->set (rt_info,
- criticality,
- tmp,tmp,tmp,
- period,
- importance,
- tmp,
- 0,
- RtecScheduler::OPERATION
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- // Register as supplier of events
- ACE_SupplierQOS_Factory supplierQOS;
- for (size_t i=0; i<events.length(); ++i)
- {
- //insert type for each event
- supplierQOS.insert (this->_supplier_id,
- events[i].header.type,
- rt_info,
- 1);
- }
-
- RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->_consumer_proxy =
- supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- RtecEventComm::PushSupplier_var supplierv =
- this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->_consumer_proxy->connect_push_supplier (supplierv.in (),
- supplierQOS.get_SupplierQOS ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as event supplier\n"));
- for (size_t i=0; i<events.length(); ++i)
- {
- ACE_DEBUG((LM_DEBUG,"\tEvent Type: %d\n",events[i].header.type));
- }
-
+ this->_scheduler = scheduler;
+
//create consumer RT_Info
std::ostringstream cons_entry_pt;
- cons_entry_pt << entry_prefix << " Consumer"; //unique RT_Info entry point
+ cons_entry_pt << entry_prefix << " TimeoutConsumer"; //unique RT_Info entry point
ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str()));
- rt_info = scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
+ this->_rt_info = this->_scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- scheduler->set (rt_info,
- criticality,
- period, period, period,
- period,
- importance,
- period,
- 0,
- RtecScheduler::OPERATION
- ACE_ENV_ARG_PARAMETER);
+ this->_scheduler->set (this->_rt_info,
+ criticality,
+ period, period, period,
+ period,
+ importance,
+ period,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Register as consumer of timeout events
ACE_ConsumerQOS_Factory timeoutQOS;
timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT /*??*/,
period, //TimeBase::TimeT
- rt_info);
+ this->_rt_info);
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
@@ -180,26 +88,7 @@ TimeoutConsumer::connect (ACE_RW_Mutex* done,
void
TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
{
- if (! CORBA::is_nil (this->_consumer_proxy.in ()))
- {
- this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->_consumer_proxy =
- RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
-
- // Deactivate the servant
- PortableServer::POA_var poa =
- this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
-
- //disconnect consumer ???
+ //disconnect consumer
if (! CORBA::is_nil (this->_supplier_proxy.in()))
{
this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
@@ -217,61 +106,45 @@ TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL)
poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
-
- if (this->_hold_mtx && this->_done!=0)
- {
- this->_done->release();
- this->_hold_mtx = 0;
- }
}
void
TimeoutConsumer::push (const RtecEventComm::EventSet& events
- ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException))
{
+ RtecScheduler::RT_Info* info = this->_scheduler->get(this->_rt_info ACE_ENV_ARG_PARAMETER);
+
if (events.length () == 0)
{
- ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer (%P|%t) push but no events\n"));
+ ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer %s (%P|%t) push but no events\n",info->entry_point.in()));
return;
}
- ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received %d events:\n",this->_supplier_id,
+
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received %d events:\n",info->entry_point.in(),
events.length()));
- if (this->_num_sent < this->_to_send)
- {
- for (size_t i=0; i<events.length(); ++i)
- {
- if (ACE_ES_EVENT_INTERVAL_TIMEOUT == events[i].header.type)
- {
- ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received timeout event\n",this->_supplier_id));
-
- //TODO send this->_events
- ++this->_num_sent;
- ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send));
- if (this->_num_sent >= this->_to_send)
- {
- //done
- ACE_DEBUG((LM_DEBUG,"RELEASE read lock from TimeoutConsumer %d\n",
- this->_supplier_id));
- this->_done->release();
- this->_hold_mtx = 0;
- }
- }
- else
- {
- int prio = -1;
- ACE_hthread_t handle;
- ACE_Thread::self(handle);
- ACE_Thread::getprio(handle,prio);
- //ACE_thread_t tid = ACE_Thread::self();
- ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer @%d (%P|%t) we received event type %d\n",
- prio,events[0].header.type));
- }
- }
- } else
+ for (size_t i=0; i<events.length(); ++i)
{
- //do nothing
+ if (ACE_ES_EVENT_INTERVAL_TIMEOUT == events[i].header.type)
+ {
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received timeout event\n",info->entry_point.in()));
+ if (this->_observer != 0)
+ {
+ ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",info->entry_point.in()));
+ this->_observer->update();
+ }
+ }
+ else
+ {
+ int prio = -1;
+ ACE_hthread_t handle;
+ ACE_Thread::self(handle);
+ ACE_Thread::getprio(handle,prio);
+ //ACE_thread_t tid = ACE_Thread::self();
+ ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer %s @%d (%P|%t) we received event type %d\n",
+ info->entry_point.in(),prio,events[0].header.type));
+ }
}
}
@@ -281,12 +154,6 @@ TimeoutConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
{
}
-void
-TimeoutConsumer::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
-}
-
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h
index db55b9970c3..81b5010bea3 100644
--- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h
+++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h
@@ -21,43 +21,59 @@
#include "orbsvcs/RtecEventCommC.h"
#include "orbsvcs/RtecSchedulerC.h"
#include "orbsvcs/Channel_Clients_T.h"
-//#include "ace/Task.h"
-//#include "ace/Synch.h"
-#include "ace/RW_Mutex.h"
+#include "TestConfig.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+// TODO: Convert to Observer (or other similar) pattern.
+
+class Timeout_Observer {
+ // = TITLE
+ // Interface for all TimeoutConsumer observers.
+ //
+ // = DESCRIPTION
+ // Any class which wants to receive notifications of timeout events
+ // needs to be a subclass of this interface.
+
+public:
+ virtual void update(ACE_ENV_SINGLE_ARG_DECL) = 0;
+ // Called by the TimeoutConsumer when a timeout occurs.
+ //
+ // For now, the notification is binary ("A timeout occurred"), but
+ // in the future, it should be useful to pass the event type(s) or
+ // even the events themselves.
+};
+
class TimeoutConsumer
{
// = TITLE
- // Simple consumer object which responds to timeout events.
+ // Simple consumer object of timeout events.
//
- // = DESCRIPTION
- // This class is a consumer of timeout events.
- // For each timeout event it consumes, it pushes a specified EventSet into the EC.
+ // = DESCRIPTION This class is a consumer of timeout events. For
+ // each timeout event it consumes, it notifies its observer
+ // (specified in its constructor).
//
// There are several ways to connect and disconnect this class,
// and it is up to the driver program to use the right one.
//
public:
- TimeoutConsumer (void);
- // Default Constructor.
+ TimeoutConsumer (Timeout_Observer* obs);
+ // For now, only handle a single observer. In the future, handle any number.
+ // Note that the TimeoutConsumer does NOT take ownership of the observer.
virtual ~TimeoutConsumer (void);
- void connect (ACE_RW_Mutex* done,
- RtecScheduler::Scheduler_ptr scheduler,
- const char *entry_prefix,
- TimeBase::TimeT period,
- RtecScheduler::Importance_t importance,
- RtecScheduler::Criticality_t criticality,
- RtecEventComm::EventSourceID supplier_id,
- size_t to_send,
- const RtecEventComm::EventSet& events,
- RtecEventChannelAdmin::EventChannel_ptr ec
- ACE_ENV_ARG_DECL);
+ RtecScheduler::handle_t get_RT_Info(void);
+
+ void connect (RtecScheduler::Scheduler_ptr scheduler,
+ const char *entry_prefix,
+ TimeBase::TimeT period,
+ RtecScheduler::Importance_t importance,
+ RtecScheduler::Criticality_t criticality,
+ RtecEventChannelAdmin::EventChannel_ptr ec
+ ACE_ENV_ARG_DECL);
// This method connects the supplier to the EC.
void disconnect (ACE_ENV_SINGLE_ARG_DECL);
@@ -74,27 +90,12 @@ public:
ACE_THROW_SPEC ((CORBA::SystemException));
// The skeleton methods.
- // = The RtecEventComm::PushSupplier methods
-
- virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
- ACE_THROW_SPEC ((CORBA::SystemException));
- // The skeleton methods.
-
private:
- size_t _to_send; //number of times to push on timeout
- size_t _num_sent; //number of pushes so far
- int _hold_mtx; //1 when hold _done mutex; 0 else
- ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send
-
- RtecEventComm::EventSourceID _supplier_id;
- // We generate an id based on the name....
+ Timeout_Observer* _observer;
- RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy;
- // We talk to the EC (as a supplier) using this proxy.
+ RtecScheduler::handle_t _rt_info;
- ACE_PushSupplier_Adapter<TimeoutConsumer> _supplier;
- // We connect to the EC as a supplier so we can push events
- // every time we receive a timeout event.
+ RtecScheduler::Scheduler_ptr _scheduler;
RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy;
// We talk to the EC (as a consumer) using this proxy.