diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-14 14:50:23 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-14 14:50:23 +0000 |
commit | 32d0085cb17faa23f7d293c8421b6bba8db1aa2b (patch) | |
tree | 0ec1033953e503e84c17f9e80d8b4894e3baada5 /TAO/orbsvcs/orbsvcs | |
parent | 9785229767b720fd29e52b4f9e1386d5203a1191 (diff) | |
download | ATCD-32d0085cb17faa23f7d293c8421b6bba8db1aa2b.tar.gz |
ChangeLogTag:Fri May 14 09:30:14 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs')
55 files changed, 1467 insertions, 94 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp index eebbaa2299c..31c65a2a770 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp @@ -10,6 +10,7 @@ #include "EC_ProxySupplier.h" #include "EC_SupplierFiltering.h" #include "EC_ObserverStrategy.h" +#include "EC_Null_Scheduling.h" #include "EC_ProxyPushSupplier_Set_T.h" #include "EC_Reactive_Timeout_Generator.h" @@ -141,6 +142,18 @@ TAO_EC_Basic_Factory::create_proxy_push_supplier_set (TAO_EC_Event_Channel *) return new TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH> (); } +TAO_EC_Scheduling_Strategy* +TAO_EC_Basic_Factory::create_scheduling_strategy (TAO_EC_Event_Channel*) +{ + return new TAO_EC_Null_Scheduling; +} + +void +TAO_EC_Basic_Factory::destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy* x) +{ + delete x; +} + void TAO_EC_Basic_Factory::destroy_proxy_push_supplier_set (TAO_EC_ProxyPushSupplier_Set *x) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h index f7b3239b0cd..cee001ac85c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.h @@ -92,6 +92,10 @@ public: create_observer_strategy (TAO_EC_Event_Channel*); virtual void destroy_observer_strategy (TAO_EC_ObserverStrategy*); + virtual TAO_EC_Scheduling_Strategy* + create_scheduling_strategy (TAO_EC_Event_Channel*); + virtual void + destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy*); virtual TAO_EC_ProxyPushSupplier_Set* create_proxy_push_supplier_set (TAO_EC_Event_Channel*); virtual void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.cpp index 5b58ad98a61..b180b3492cf 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.cpp @@ -20,7 +20,8 @@ TAO_EC_Basic_Filter_Builder::~TAO_EC_Basic_Filter_Builder (void) TAO_EC_Filter* TAO_EC_Basic_Filter_Builder::build ( TAO_EC_ProxyPushSupplier *supplier, - RtecEventChannelAdmin::ConsumerQOS& qos) const + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment&) const { CORBA::ULong pos = 0; return this->recursive_build (supplier, qos, pos); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.h b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.h index 516c4fbb37b..8b245bd0ce0 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Filter_Builder.h @@ -53,7 +53,8 @@ public: // = The TAO_EC_Filter_Builder methods... TAO_EC_Filter* build (TAO_EC_ProxyPushSupplier *supplier, - RtecEventChannelAdmin::ConsumerQOS& qos) const; + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment &env) const; private: TAO_EC_Filter* recursive_build (TAO_EC_ProxyPushSupplier *supplier, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp index 8bef3f88a71..9349a960ba3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.cpp @@ -62,6 +62,24 @@ TAO_EC_Conjunction_Filter::all_received (void) const return 0; } +TAO_EC_Filter::ChildrenIterator +TAO_EC_Conjunction_Filter::begin (void) const +{ + return this->children_; +} + +TAO_EC_Filter::ChildrenIterator +TAO_EC_Conjunction_Filter::end (void) const +{ + return this->children_ + this->n_; +} + +int +TAO_EC_Conjunction_Filter::size (void) const +{ + return this->n_; +} + int TAO_EC_Conjunction_Filter::filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, @@ -180,3 +198,12 @@ TAO_EC_Conjunction_Filter::can_match ( } return 0; } + +int +TAO_EC_Conjunction_Filter::add_dependencies ( + const RtecEventComm::EventHeader&, + const TAO_EC_QOS_Info&, + CORBA::Environment &) +{ + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h index 29b4c11c634..04d4efb7b61 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.h @@ -54,6 +54,9 @@ public: // = The TAO_EC_Filter methods, please check the documentation in // TAO_EC_Filter. + virtual ChildrenIterator begin (void) const; + virtual ChildrenIterator end (void) const; + virtual int size (void) const; virtual int filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env); @@ -69,13 +72,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader& header) const; - - typedef TAO_EC_Filter* value_type; - typedef TAO_EC_Filter* const const_value_type; - typedef const_value_type* ChildrenIterator; - ChildrenIterator begin (void) const; - ChildrenIterator end (void) const; - // STL-like iterators... + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); typedef unsigned int Word; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.i b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.i index b6621443d40..cfa1da318d3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Conjunction_Filter.i @@ -1,13 +1 @@ // $Id$ - -ACE_INLINE TAO_EC_Conjunction_Filter::ChildrenIterator -TAO_EC_Conjunction_Filter::begin (void) const -{ - return this->children_; -} - -ACE_INLINE TAO_EC_Conjunction_Filter::ChildrenIterator -TAO_EC_Conjunction_Filter::end (void) const -{ - return this->children_ + this->n_; -} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp index 086a6bcc1a8..c56aa9dda40 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp @@ -3,6 +3,7 @@ #include "EC_Default_Factory.h" #include "EC_Priority_Dispatching.h" #include "EC_Basic_Filter_Builder.h" +#include "EC_Sched_Filter_Builder.h" #include "EC_ConsumerAdmin.h" #include "EC_SupplierAdmin.h" #include "EC_ProxyConsumer.h" @@ -10,8 +11,11 @@ #include "EC_Trivial_Supplier_Filter.h" #include "EC_Per_Supplier_Filter.h" #include "EC_ObserverStrategy.h" +#include "EC_Null_Scheduling.h" +#include "EC_Priority_Scheduling.h" #include "EC_ProxyPushSupplier_Set_T.h" #include "EC_Reactive_Timeout_Generator.h" +#include "EC_Event_Channel.h" #include "ace/Arg_Shifter.h" #if ! defined (__ACE_INLINE__) @@ -81,6 +85,10 @@ TAO_EC_Default_Factory::init (int argc, char* argv[]) { this->filtering_ = 1; } + else if (ACE_OS::strcasecmp (opt, "priority") == 0) + { + this->filtering_ = 2; + } else { ACE_ERROR ((LM_ERROR, @@ -172,6 +180,32 @@ TAO_EC_Default_Factory::init (int argc, char* argv[]) } } + else if (ACE_OS::strcmp (arg, "-ECscheduling") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + char* opt = arg_shifter.get_current (); + if (ACE_OS::strcasecmp (opt, "null") == 0) + { + this->scheduling_ = 0; + } + else if (ACE_OS::strcasecmp (opt, "priority") == 0) + { + this->scheduling_ = 1; + } + else + { + ACE_ERROR ((LM_ERROR, + "EC_Default_Factory - " + "unsupported scheduling <%s>\n", + opt)); + } + arg_shifter.consume_arg (); + } + } + else if (ACE_OS::strcmp (arg, "-ECpushsupplierset") == 0) { arg_shifter.consume_arg (); @@ -343,12 +377,12 @@ TAO_EC_Default_Factory::fini (void) // **************************************************************** TAO_EC_Dispatching* -TAO_EC_Default_Factory::create_dispatching (TAO_EC_Event_Channel *) +TAO_EC_Default_Factory::create_dispatching (TAO_EC_Event_Channel *ec) { if (this->dispatching_ == 0) return new TAO_EC_Reactive_Dispatching (); else if (this->dispatching_ == 1) - return new TAO_EC_Priority_Dispatching (); + return new TAO_EC_Priority_Dispatching (ec); return 0; } @@ -365,6 +399,8 @@ TAO_EC_Default_Factory::create_filter_builder (TAO_EC_Event_Channel *ec) return new TAO_EC_Null_Filter_Builder (); else if (this->filtering_ == 1) return new TAO_EC_Basic_Filter_Builder (ec); + else if (this->filtering_ == 2) + return new TAO_EC_Sched_Filter_Builder (ec); return 0; } @@ -482,6 +518,25 @@ TAO_EC_Default_Factory::destroy_observer_strategy (TAO_EC_ObserverStrategy *x) delete x; } +TAO_EC_Scheduling_Strategy* +TAO_EC_Default_Factory::create_scheduling_strategy (TAO_EC_Event_Channel* ec) +{ + if (this->scheduling_ == 0) + return new TAO_EC_Null_Scheduling; + else if (this->scheduling_ == 1) + { + RtecScheduler::Scheduler_var scheduler = ec->scheduler (); + return new TAO_EC_Priority_Scheduling (scheduler.in ()); + } + return 0; +} + +void +TAO_EC_Default_Factory::destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy* x) +{ + delete x; +} + TAO_EC_ProxyPushSupplier_Set* TAO_EC_Default_Factory::create_proxy_push_supplier_set (TAO_EC_Event_Channel *) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h index 10e227d42ee..4c4ac433c98 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h @@ -99,6 +99,10 @@ public: create_observer_strategy (TAO_EC_Event_Channel*); virtual void destroy_observer_strategy (TAO_EC_ObserverStrategy*); + virtual TAO_EC_Scheduling_Strategy* + create_scheduling_strategy (TAO_EC_Event_Channel*); + virtual void + destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy*); virtual TAO_EC_ProxyPushSupplier_Set* create_proxy_push_supplier_set (TAO_EC_Event_Channel*); virtual void @@ -120,6 +124,7 @@ private: int supplier_filtering_; int timeout_; int observer_; + int scheduling_; int supplier_set_; int consumer_lock_; int supplier_lock_; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i index c6c761a7cf0..7518517f769 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i @@ -7,6 +7,7 @@ TAO_EC_Default_Factory::TAO_EC_Default_Factory (void) supplier_filtering_ (0), timeout_ (0), observer_ (0), + scheduling_ (0), supplier_set_ (0), consumer_lock_ (0), supplier_lock_ (0), diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp index 6e4f2ad1703..a2ecf5c688e 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.cpp @@ -38,6 +38,24 @@ TAO_EC_Disjunction_Filter::~TAO_EC_Disjunction_Filter (void) this->n_ = 0; } +TAO_EC_Filter::ChildrenIterator +TAO_EC_Disjunction_Filter::begin (void) const +{ + return this->children_; +} + +TAO_EC_Filter::ChildrenIterator +TAO_EC_Disjunction_Filter::end (void) const +{ + return this->children_ + this->n_; +} + +ACE_INLINE int +TAO_EC_Disjunction_Filter::size (void) const +{ + return this->n_; +} + int TAO_EC_Disjunction_Filter::filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, @@ -134,3 +152,12 @@ TAO_EC_Disjunction_Filter::can_match ( } return 0; } + +int +TAO_EC_Disjunction_Filter::add_dependencies ( + const RtecEventComm::EventHeader&, + const TAO_EC_QOS_Info &, + CORBA::Environment &) +{ + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h index 6a6cbbebfc9..2bb90482cb3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.h @@ -52,8 +52,12 @@ public: virtual ~TAO_EC_Disjunction_Filter (void); // Destructor + // = The TAO_EC_Filter methods, please check the documentation in // TAO_EC_Filter. + virtual ChildrenIterator begin (void) const; + virtual ChildrenIterator end (void) const; + virtual int size (void) const; virtual int filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env); @@ -69,13 +73,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader& header) const; - - typedef TAO_EC_Filter* value_type; - typedef TAO_EC_Filter* const const_value_type; - typedef const_value_type* ChildrenIterator; - ChildrenIterator begin (void) const; - ChildrenIterator end (void) const; - // STL-like iterators... + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); private: ACE_UNIMPLEMENTED_FUNC (TAO_EC_Disjunction_Filter diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.i b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.i index d5a62ac2a0f..cfa1da318d3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Disjunction_Filter.i @@ -1,13 +1 @@ // $Id$ - -ACE_INLINE TAO_EC_Disjunction_Filter::ChildrenIterator -TAO_EC_Disjunction_Filter::begin (void) const -{ - return this->children_; -} - -ACE_INLINE TAO_EC_Disjunction_Filter::ChildrenIterator -TAO_EC_Disjunction_Filter::end (void) const -{ - return this->children_ + this->n_; -} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp index b385968279d..a00800f2bf0 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp @@ -52,6 +52,25 @@ TAO_EC_Dispatching_Task::svc (void) return 0; } +void +TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy, + RtecEventComm::EventSet& event, + CORBA::Environment &ACE_TRY_ENV) +{ + void* buf = this->allocator_->malloc (sizeof (TAO_EC_Push_Command)); + + if (buf == 0) + ACE_THROW (CORBA::NO_MEMORY (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_NO)); + + ACE_Message_Block *mb = + new (mb) TAO_EC_Push_Command (proxy, + event, + this->data_block_.duplicate (), + this->allocator_); + this->putq (mb); +} + // **************************************************************** TAO_EC_Dispatch_Command::~TAO_EC_Dispatch_Command (void) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h index ab47084b785..3133fcdf1c4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h @@ -26,6 +26,7 @@ #define TAO_EC_DISPATCHING_TASK_H #include "ace/Task.h" +#include "ace/Message_Block.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -49,6 +50,17 @@ public: virtual int svc (void); // Process the events in the queue. + + virtual void push (TAO_EC_ProxyPushSupplier *proxy, + RtecEventComm::EventSet& event, + CORBA::Environment &env); + +private: + ACE_Allocator *allocator_; + // An per-task allocator + + ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> > data_block_; + // Helper data structure to minimize memory allocations... }; // **************************************************************** @@ -56,10 +68,11 @@ public: class TAO_ORBSVCS_Export TAO_EC_Dispatch_Command : public ACE_Message_Block { public: - TAO_EC_Dispatch_Command (void); + TAO_EC_Dispatch_Command (ACE_Allocator *mb_allocator = 0); // Constructor, it will allocate its own data block - TAO_EC_Dispatch_Command (ACE_Data_Block*); + TAO_EC_Dispatch_Command (ACE_Data_Block*, + ACE_Allocator *mb_allocator = 0); // Constructor, it assumes ownership of the data block virtual ~TAO_EC_Dispatch_Command (void); @@ -74,7 +87,7 @@ public: class TAO_ORBSVCS_Export TAO_EC_Shutdown_Command : public TAO_EC_Dispatch_Command { public: - TAO_EC_Shutdown_Command (void); + TAO_EC_Shutdown_Command (ACE_Allocator *mb_allocator = 0); // Constructor virtual int execute (CORBA::Environment&); @@ -88,7 +101,8 @@ class TAO_ORBSVCS_Export TAO_EC_Push_Command : public TAO_EC_Dispatch_Command public: TAO_EC_Push_Command (TAO_EC_ProxyPushSupplier* proxy, RtecEventComm::EventSet& event, - ACE_Data_Block* data_block); + ACE_Data_Block* data_block, + ACE_Allocator *mb_allocator); // Constructor virtual int execute (CORBA::Environment&); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i index df7dc8a925d..550d4859ce8 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i @@ -10,21 +10,23 @@ TAO_EC_Dispatching_Task (ACE_Thread_Manager* thr_manager) // **************************************************************** ACE_INLINE -TAO_EC_Dispatch_Command::TAO_EC_Dispatch_Command (void) - : ACE_Message_Block () +TAO_EC_Dispatch_Command::TAO_EC_Dispatch_Command (ACE_Allocator *mb_allocator) + : ACE_Message_Block (mb_allocator) { } ACE_INLINE -TAO_EC_Dispatch_Command::TAO_EC_Dispatch_Command (ACE_Data_Block* data_block) - : ACE_Message_Block (data_block) +TAO_EC_Dispatch_Command::TAO_EC_Dispatch_Command (ACE_Data_Block *data_block, + ACE_Allocator *mb_allocator) + : ACE_Message_Block (data_block, mb_allocator) { } // **************************************************************** ACE_INLINE -TAO_EC_Shutdown_Command::TAO_EC_Shutdown_Command (void) +TAO_EC_Shutdown_Command::TAO_EC_Shutdown_Command (ACE_Allocator *mb_allocator) + : TAO_EC_Dispatch_Command (mb_allocator) { } @@ -33,8 +35,9 @@ TAO_EC_Shutdown_Command::TAO_EC_Shutdown_Command (void) ACE_INLINE TAO_EC_Push_Command::TAO_EC_Push_Command (TAO_EC_ProxyPushSupplier* proxy, RtecEventComm::EventSet& event, - ACE_Data_Block* data_block) - : TAO_EC_Dispatch_Command (data_block), + ACE_Data_Block* data_block, + ACE_Allocator *mb_allocator) + : TAO_EC_Dispatch_Command (data_block, mb_allocator), proxy_ (proxy) { // diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp index 510ea24b02d..55a36426f62 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp @@ -54,6 +54,16 @@ TAO_EC_Event_Channel (const TAO_EC_Event_Channel_Attributes& attr, this->factory_->create_timeout_generator (this); this->observer_strategy_ = this->factory_->create_observer_strategy (this); + + this->scheduler_ = + RtecScheduler::Scheduler::_duplicate (attr.scheduler); + + this->scheduling_strategy_ = + this->factory_->create_scheduling_strategy (this); + + this->consumer_admin_->busy_hwm (attr.consumer_admin_busy_hwm); + this->consumer_admin_->max_write_delay (attr.consumer_admin_max_write_delay); + } TAO_EC_Event_Channel::~TAO_EC_Event_Channel (void) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h index ee9509bd103..fd213d32c8b 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h @@ -67,9 +67,14 @@ public: // Can consumers or suppliers invoke connect_push_* multiple times? int consumer_admin_busy_hwm; - int max_write_delay; + int consumer_admin_max_write_delay; // Flags for the Consumer Admin + RtecScheduler::Scheduler_ptr scheduler; + // The scheduling service that we will use with this event channel. + // Notice that this is optional and will only take effect if the EC + // is configured with the right filtering strategies. + private: friend class TAO_EC_Event_Channel; // Only the EC can read the private fields. @@ -132,6 +137,9 @@ public: TAO_EC_Timeout_Generator* timeout_generator (void) const; // Access the timer module... + TAO_EC_Scheduling_Strategy* scheduling_strategy (void) const; + // Access the scheduling strategy + // = The factory methods, they delegate on the EC_Factory. TAO_EC_ProxyPushSupplier* create_proxy_push_supplier (void); void destroy_proxy_push_supplier (TAO_EC_ProxyPushSupplier*); @@ -186,6 +194,9 @@ public: int supplier_reconnect (void) const; // Can the suppliers reconnect to the EC? + RtecScheduler::Scheduler_ptr scheduler (void); + // Obtain the scheduler, the user must release + // = The RtecEventChannelAdmin::EventChannel methods... virtual RtecEventChannelAdmin::ConsumerAdmin_ptr for_consumers (CORBA::Environment& env); @@ -248,6 +259,12 @@ private: TAO_EC_ObserverStrategy *observer_strategy_; // The observer strategy + RtecScheduler::Scheduler_var scheduler_; + // The scheduler (may be nil) + + TAO_EC_Scheduling_Strategy *scheduling_strategy_; + // The scheduling strategy + int consumer_reconnect_; int supplier_reconnect_; // Consumer/Supplier reconnection flags diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i index 979da8826eb..7a767f662a6 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i @@ -6,6 +6,9 @@ TAO_EC_Event_Channel_Attributes (PortableServer::POA_ptr s_poa, PortableServer::POA_ptr c_poa) : consumer_reconnect (0), supplier_reconnect (0), + consumer_admin_busy_hwm (0), + consumer_admin_max_write_delay (0), + scheduler (RtecScheduler::Scheduler::_nil ()), supplier_poa (s_poa), consumer_poa (c_poa) { @@ -47,6 +50,12 @@ TAO_EC_Event_Channel::timeout_generator (void) const return this->timeout_generator_; } +ACE_INLINE TAO_EC_Scheduling_Strategy* +TAO_EC_Event_Channel::scheduling_strategy (void) const +{ + return this->scheduling_strategy_; +} + ACE_INLINE TAO_EC_ProxyPushSupplier* TAO_EC_Event_Channel::create_proxy_push_supplier (void) { @@ -154,3 +163,9 @@ TAO_EC_Event_Channel::supplier_reconnect (void) const { return this->supplier_reconnect_; } + +ACE_INLINE RtecScheduler::Scheduler_ptr +TAO_EC_Event_Channel::scheduler (void) +{ + return RtecScheduler::Scheduler::_duplicate (this->scheduler_.in ()); +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h index ec49f151e58..20fb1863401 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Factory.h @@ -48,6 +48,7 @@ class TAO_EC_ProxyPushSupplier; class TAO_EC_ObserverStrategy; class TAO_EC_ProxyPushSupplier_Set; class TAO_EC_Timeout_Generator; +class TAO_EC_Scheduling_Strategy; class TAO_ORBSVCS_Export TAO_EC_Factory : public ACE_Service_Object { @@ -121,6 +122,12 @@ public: destroy_observer_strategy (TAO_EC_ObserverStrategy*) = 0; // Create and destroy the observer strategy. + virtual TAO_EC_Scheduling_Strategy* + create_scheduling_strategy (TAO_EC_Event_Channel*) = 0; + virtual void + destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy*) = 0; + // Create and destroy the observer strategy. + virtual TAO_EC_ProxyPushSupplier_Set* create_proxy_push_supplier_set (TAO_EC_Event_Channel*) = 0; virtual void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp index 9295afc37d0..cee603fc208 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.cpp @@ -18,6 +18,32 @@ TAO_EC_Filter::adopt_child (TAO_EC_Filter* child) child->parent_ = this; } +TAO_EC_Filter::ChildrenIterator +TAO_EC_Filter::begin (void) const +{ + return 0; +} + +TAO_EC_Filter::ChildrenIterator +TAO_EC_Filter::end (void) const +{ + return 0; +} + +int +TAO_EC_Filter::size (void) const +{ + return 0; +} + +void +TAO_EC_Filter::get_qos_info (TAO_EC_QOS_Info&, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_THROW (CORBA::NO_IMPLEMENT (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_NO)); +} + // **************************************************************** int @@ -81,6 +107,16 @@ TAO_EC_Null_Filter::can_match (const RtecEventComm::EventHeader&) const return 1; } +int +TAO_EC_Null_Filter::add_dependencies ( + const RtecEventComm::EventHeader &, + const TAO_EC_QOS_Info &, + CORBA::Environment &) + +{ + return 0; +} + // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h index 00afafc720c..ccc1ccc2d51 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter.h @@ -72,6 +72,18 @@ public: // matches two event headers. // @@ TODO: strategize this... + typedef TAO_EC_Filter* value_type; + typedef TAO_EC_Filter* const const_value_type; + typedef const_value_type* ChildrenIterator; + + virtual ChildrenIterator begin (void) const; + virtual ChildrenIterator end (void) const; + virtual int size (void) const; + // STL-like iterators + // Filters follow the Composite pattern. All filters expose the same + // interface as if they all had children, but for simple filters the + // iterators return an empty range. + virtual int filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& env) = 0; @@ -105,6 +117,29 @@ public: // Returns 0 if an event with that header could never be accepted. // This can used by the suppliers to filter out consumers that // couldn't possibly be interested in their events. + // The rt_info and + + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) = 0; + // This is used for computing the scheduling dependencies: + // + // Leaf filters check if the header could be matched, similar to the + // can_match() method; if it does they return 1, and 0 otherwise. + // Intermediate nodes always return 0. + // + // This is used to build precise dependencies between the suppliers + // and the leaf of the filters that accept that event. Notice that + // only the nodes doing scheduling recurse through the list, so in + // configurations that do no require scheduling the recursion stops + // fairly soon. + + virtual void get_qos_info (TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV); + // Obtain the QOS information for this filter, the default + // implementation returns an invalid QOS. Only the filters that + // support scheduling information implement this method. + // Returns 0 on success and -1 on failure private: TAO_EC_Filter* parent_; @@ -150,6 +185,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader& header) const; + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); }; // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.cpp index 2dee250a79b..dfcc830e1d7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.cpp @@ -22,7 +22,8 @@ TAO_EC_Null_Filter_Builder::~TAO_EC_Null_Filter_Builder (void) TAO_EC_Filter* TAO_EC_Null_Filter_Builder::build ( TAO_EC_ProxyPushSupplier *, - RtecEventChannelAdmin::ConsumerQOS&) const + RtecEventChannelAdmin::ConsumerQOS&, + CORBA::Environment&) const { return new TAO_EC_Null_Filter; } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.h b/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.h index 0d4b010aea9..e0cfe6dddf4 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Filter_Builder.h @@ -52,7 +52,8 @@ public: virtual TAO_EC_Filter* build (TAO_EC_ProxyPushSupplier *supplier, - RtecEventChannelAdmin::ConsumerQOS& qos) const = 0; + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment &ACE_TRY_ENV) const = 0; // Create the filter, the caller must assume ownership of the filter // returned. @@ -77,7 +78,8 @@ public: // = The TAO_EC_Filter_Builder methods... TAO_EC_Filter* build (TAO_EC_ProxyPushSupplier *supplier, - RtecEventChannelAdmin::ConsumerQOS& qos) const; + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment &ACE_TRY_ENV) const; }; // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp index 63f5869946f..71acd51ec2e 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp @@ -3,12 +3,14 @@ #include "EC_Null_Factory.h" #include "EC_Dispatching.h" #include "EC_Filter_Builder.h" +#include "EC_Trivial_Supplier_Filter.h" #include "EC_ConsumerAdmin.h" #include "EC_SupplierAdmin.h" #include "EC_ProxyConsumer.h" #include "EC_ProxySupplier.h" #include "EC_SupplierFiltering.h" #include "EC_ObserverStrategy.h" +#include "EC_Null_Scheduling.h" #include "EC_ProxyPushSupplier_Set_T.h" #include "EC_Reactive_Timeout_Generator.h" @@ -46,6 +48,18 @@ TAO_EC_Null_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x) delete x; } +TAO_EC_Supplier_Filter_Builder* +TAO_EC_Null_Factory::create_supplier_filter_builder (TAO_EC_Event_Channel *ec) +{ + return new TAO_EC_Trivial_Supplier_Filter_Builder (ec); +} + +void +TAO_EC_Null_Factory::destroy_supplier_filter_builder (TAO_EC_Supplier_Filter_Builder *x) +{ + delete x; +} + TAO_EC_ConsumerAdmin* TAO_EC_Null_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec) { @@ -120,6 +134,18 @@ TAO_EC_Null_Factory::destroy_observer_strategy (TAO_EC_ObserverStrategy *x) delete x; } +TAO_EC_Scheduling_Strategy* +TAO_EC_Null_Factory::create_scheduling_strategy (TAO_EC_Event_Channel*) +{ + return new TAO_EC_Null_Scheduling; +} + +void +TAO_EC_Null_Factory::destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy* x) +{ + delete x; +} + TAO_EC_ProxyPushSupplier_Set* TAO_EC_Null_Factory::create_proxy_push_supplier_set (TAO_EC_Event_Channel *) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h index 9f987cf2846..0d31d1f9627 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.h @@ -63,6 +63,10 @@ public: create_filter_builder (TAO_EC_Event_Channel*); virtual void destroy_filter_builder (TAO_EC_Filter_Builder*); + virtual TAO_EC_Supplier_Filter_Builder* + create_supplier_filter_builder (TAO_EC_Event_Channel*); + virtual void + destroy_supplier_filter_builder (TAO_EC_Supplier_Filter_Builder*); virtual TAO_EC_ConsumerAdmin* create_consumer_admin (TAO_EC_Event_Channel*); virtual void @@ -87,6 +91,10 @@ public: create_observer_strategy (TAO_EC_Event_Channel*); virtual void destroy_observer_strategy (TAO_EC_ObserverStrategy*); + virtual TAO_EC_Scheduling_Strategy* + create_scheduling_strategy (TAO_EC_Event_Channel*); + virtual void + destroy_scheduling_strategy (TAO_EC_Scheduling_Strategy*); virtual TAO_EC_ProxyPushSupplier_Set* create_proxy_push_supplier_set (TAO_EC_Event_Channel*); virtual void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.cpp new file mode 100644 index 00000000000..df75136d3b9 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.cpp @@ -0,0 +1,27 @@ +// $Id$ + +#include "EC_Null_Scheduling.h" +#include "EC_QOS_Info.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Null_Scheduling.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Null_Scheduling, "$Id$") + +void +TAO_EC_Null_Scheduling::add_proxy_supplier_dependencies ( + TAO_EC_ProxyPushSupplier *, + TAO_EC_ProxyPushConsumer *, + CORBA::Environment &) +{ +} + +void +TAO_EC_Null_Scheduling::init_event_qos ( + const RtecEventComm::EventHeader&, + TAO_EC_ProxyPushConsumer *, + TAO_EC_QOS_Info&, + CORBA::Environment &) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.h b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.h new file mode 100644 index 00000000000..56c5070acd5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.h @@ -0,0 +1,74 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Null_Scheduling +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_NULL_SCHEDULING_H +#define TAO_EC_NULL_SCHEDULING_H + +#include "EC_Scheduling_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_ORBSVCS_Export TAO_EC_Null_Scheduling : public TAO_EC_Scheduling_Strategy +{ + // = TITLE + // A scheduling strategy that uses TAO's real-time scheduler + // + // = DESCRIPTION + // This implementation of the Scheduling_Strategy uses TAO's + // real-time scheduler. + // + // = MEMORY MANAGMENT + // +public: + TAO_EC_Null_Scheduling (void); + // Constructor. + + virtual void add_proxy_supplier_dependencies ( + TAO_EC_ProxyPushSupplier *supplier, + TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV); + // Add all the dependencies between <supplier> and <consumer> + + virtual void init_event_qos ( + const RtecEventComm::EventHeader& header, + TAO_EC_ProxyPushConsumer *consumer, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV); + // Initializes <qos_info> based on the QoS information for + // <consumer> and the event header. + +private: + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Null_Scheduling + (const TAO_EC_Null_Scheduling&)) + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Null_Scheduling& operator= + (const TAO_EC_Null_Scheduling&)) +}; + +#if defined (__ACE_INLINE__) +#include "EC_Null_Scheduling.i" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_EC_NULL_SCHEDULING_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.i b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.i new file mode 100644 index 00000000000..60c72cb9e56 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Scheduling.i @@ -0,0 +1,7 @@ +// $Id$ + +ACE_INLINE +TAO_EC_Null_Scheduling::TAO_EC_Null_Scheduling (void) +{ +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp index 3df5936021b..d7c8740deff 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp @@ -5,6 +5,7 @@ #include "EC_ProxyPushSupplier_Set.h" #include "EC_ProxySupplier.h" #include "EC_ProxyConsumer.h" +#include "EC_Scheduling_Strategy.h" #include "EC_QOS_Info.h" #include "orbsvcs/Event_Service_Constants.h" @@ -87,6 +88,8 @@ void TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event, CORBA::Environment &ACE_TRY_ENV) { + TAO_EC_Scheduling_Strategy* scheduling_strategy = + this->event_channel_->scheduling_strategy (); for (CORBA::ULong j = 0; j < event.length (); ++j) { const RtecEventComm::Event& e = event[j]; @@ -94,6 +97,13 @@ TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event, ACE_const_cast(RtecEventComm::Event*, &e); RtecEventComm::EventSet single_event (1, 1, buffer, 0); + TAO_EC_QOS_Info event_info; + scheduling_strategy->init_event_qos (e.header, + this->consumer_, + event_info, + ACE_TRY_ENV); + ACE_CHECK; + ACE_GUARD_THROW_EX (TAO_EC_ProxyPushSupplier_Set::Busy_Lock, ace_mon, this->supplier_set_->busy_lock (), RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); @@ -107,7 +117,7 @@ TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event, i != end; ++i) { - TAO_EC_QOS_Info qos_info; + TAO_EC_QOS_Info qos_info = event_info; (*i)->filter (single_event, qos_info, ACE_TRY_ENV); ACE_CHECK; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.cpp index e2e41517cfc..525560c8d3f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.cpp @@ -2,6 +2,8 @@ #include "EC_Priority_Dispatching.h" #include "EC_Dispatching_Task.h" +#include "EC_Event_Channel.h" +#include "EC_QOS_Info.h" #include "orbsvcs/Event_Service_Constants.h" #include "ace/Sched_Params.h" @@ -11,6 +13,14 @@ ACE_RCSID(Event, EC_Priority_Dispatching, "$Id$") +ACE_INLINE +TAO_EC_Priority_Dispatching::TAO_EC_Priority_Dispatching (TAO_EC_Event_Channel *ec) + : ntasks_ (0), + tasks_ (0), + scheduler_ (ec->scheduler ()) +{ +} + void TAO_EC_Priority_Dispatching::activate (void) { @@ -21,6 +31,7 @@ TAO_EC_Priority_Dispatching::activate (void) this->ntasks_ = ACE_Scheduler_MAX_PRIORITIES; ACE_NEW (this->tasks_, TAO_EC_Dispatching_Task*[this->ntasks_]); + // @@ Query the scheduler to obtain the priorities! int priority = (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO) + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2; @@ -77,13 +88,18 @@ void TAO_EC_Priority_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy, RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, - CORBA::Environment& ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) { if (this->tasks_ == 0) this->activate (); - // @@ Use the QOS_Info to select the right queue.... - this->tasks_[0]->putq (new TAO_EC_Push_Command (proxy, - event, - this->data_block_.duplicate ())); + int i = qos_info.preemption_priority; + if (i < 0 || i >= this->ntasks_) + { + // @@ Throw something? + i = 0; + } + + + this->tasks_[i]->push (proxy, event, ACE_TRY_ENV); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.h b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.h index 90661564892..121e9bdc452 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.h @@ -25,15 +25,16 @@ #ifndef TAO_EC_PRIORITY_DISPATCHING_H #define TAO_EC_PRIORITY_DISPATCHING_H -#include "EC_Dispatching.h" +#include "orbsvcs/RtecSchedulerC.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "ace/Message_Block.h" +#include "EC_Dispatching.h" class TAO_EC_Dispatching_Task; +class TAO_EC_Event_Channel; class TAO_ORBSVCS_Export TAO_EC_Priority_Dispatching : public TAO_EC_Dispatching { @@ -53,7 +54,7 @@ class TAO_ORBSVCS_Export TAO_EC_Priority_Dispatching : public TAO_EC_Dispatching // analyze and schedule. // public: - TAO_EC_Priority_Dispatching (void); + TAO_EC_Priority_Dispatching (TAO_EC_Event_Channel* ec); // The scheduler is used to find the range of priorities and similar // info. @@ -79,9 +80,8 @@ private: TAO_EC_Dispatching_Task** tasks_; // The tasks.. - ACE_Locked_Data_Block<ACE_Lock_Adapter<ACE_SYNCH_MUTEX> > data_block_; - // Helper data structure to minimize memory allocations... - // @@ Should be a per-queue object! + RtecScheduler::Scheduler_var scheduler_; + // The scheduler }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.i b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.i index 204e43cd7c6..74e88caa0c5 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Dispatching.i @@ -1,8 +1,2 @@ // $Id$ -ACE_INLINE -TAO_EC_Priority_Dispatching::TAO_EC_Priority_Dispatching (void) - : ntasks_ (0), - tasks_ (0) -{ -} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.cpp new file mode 100644 index 00000000000..51c28b3fa4e --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.cpp @@ -0,0 +1,79 @@ +// $Id$ + +#include "EC_Priority_Scheduling.h" +#include "EC_QOS_Info.h" +#include "EC_ProxyConsumer.h" +#include "EC_ProxySupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Priority_Scheduling.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Priority_Scheduling, "$Id$") + +TAO_EC_Priority_Scheduling::~TAO_EC_Priority_Scheduling (void) +{ +} + +void +TAO_EC_Priority_Scheduling::add_proxy_supplier_dependencies ( + TAO_EC_ProxyPushSupplier *supplier, + TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV) +{ + const RtecEventChannelAdmin::SupplierQOS& qos = + consumer->publications (); + for (CORBA::ULong i = 0; i < qos.publications.length (); ++i) + { + const RtecEventComm::EventHeader &header = + qos.publications[i].event.header; + TAO_EC_QOS_Info qos_info; + qos_info.rt_info = qos.publications[i].dependency_info.rt_info; + + RtecScheduler::OS_Priority os_priority; + RtecScheduler::Preemption_Subpriority_t p_subpriority; + RtecScheduler::Preemption_Priority_t p_priority; + this->scheduler_->priority (qos_info.rt_info, + os_priority, + p_subpriority, + p_priority, + ACE_TRY_ENV); + ACE_CHECK; + qos_info.preemption_priority = p_priority; + + supplier->add_dependencies (header, qos_info, ACE_TRY_ENV); + ACE_CHECK; + } +} + +void +TAO_EC_Priority_Scheduling::init_event_qos ( + const RtecEventComm::EventHeader &header, + TAO_EC_ProxyPushConsumer *consumer, + TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + const RtecEventChannelAdmin::SupplierQOS& qos = + consumer->publications (); + for (CORBA::ULong i = 0; i < qos.publications.length (); ++i) + { + const RtecEventComm::EventHeader &qos_header = + qos.publications[i].event.header; + + if (TAO_EC_Filter::matches (header, qos_header) == 0) + continue; + + qos_info.rt_info = qos.publications[i].dependency_info.rt_info; + + RtecScheduler::OS_Priority os_priority; + RtecScheduler::Preemption_Subpriority_t p_subpriority; + RtecScheduler::Preemption_Priority_t p_priority; + this->scheduler_->priority (qos_info.rt_info, + os_priority, + p_subpriority, + p_priority, + ACE_TRY_ENV); + ACE_CHECK; + qos_info.preemption_priority = p_priority; + } +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.h b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.h new file mode 100644 index 00000000000..f07673689b7 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.h @@ -0,0 +1,84 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Priority_Scheduling +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_PRIORITY_SCHEDULING_H +#define TAO_EC_PRIORITY_SCHEDULING_H + +#include "EC_Scheduling_Strategy.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_ORBSVCS_Export TAO_EC_Priority_Scheduling : public TAO_EC_Scheduling_Strategy +{ + // = TITLE + // A scheduling strategy that uses TAO's real-time scheduler + // + // = DESCRIPTION + // This implementation of the Scheduling_Strategy uses TAO's + // real-time scheduler. + // + // = MEMORY MANAGMENT + // +public: + TAO_EC_Priority_Scheduling (RtecScheduler::Scheduler_ptr scheduler); + // Constructor. + + virtual ~TAO_EC_Priority_Scheduling (void); + // Destructor + + virtual void add_proxy_supplier_dependencies ( + TAO_EC_ProxyPushSupplier *supplier, + TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV); + // Add all the dependencies between <supplier> and <consumer> + + virtual void init_event_qos ( + const RtecEventComm::EventHeader& header, + TAO_EC_ProxyPushConsumer *consumer, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV); + // Initializes <qos_info> based on the QoS information for + // <consumer> and the event header. + +private: + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Priority_Scheduling + (const TAO_EC_Priority_Scheduling&)) + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Priority_Scheduling& operator= + (const TAO_EC_Priority_Scheduling&)) + + void init_rt_info (CORBA::Environment& env); + // Initialize our RT_Info handle and dependencies + +private: + RtecScheduler::Scheduler_var scheduler_; + // The scheduler we are going to use +}; + +#if defined (__ACE_INLINE__) +#include "EC_Priority_Scheduling.i" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_EC_PRIORITY_SCHEDULING_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.i b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.i new file mode 100644 index 00000000000..af004f6fcde --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Priority_Scheduling.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +TAO_EC_Priority_Scheduling::TAO_EC_Priority_Scheduling ( + RtecScheduler::Scheduler_ptr scheduler) + : scheduler_ (RtecScheduler::Scheduler::_duplicate (scheduler)) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index e506c0bd59d..090bb115750 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -3,6 +3,7 @@ #include "EC_ProxySupplier.h" #include "EC_Dispatching.h" #include "EC_Filter_Builder.h" +#include "EC_QOS_Info.h" #include "EC_Event_Channel.h" #if ! defined (__ACE_INLINE__) @@ -187,7 +188,10 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer ( this->child_ = this->event_channel_->filter_builder ()->build (this, - this->qos_); + this->qos_, + ACE_TRY_ENV); + ACE_CHECK; + this->adopt_child (this->child_); } @@ -433,6 +437,22 @@ TAO_EC_ProxyPushSupplier::can_match ( return this->child_->can_match (header); } +int +TAO_EC_ProxyPushSupplier::add_dependencies ( + const RtecEventComm::EventHeader &header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK_RETURN (0); + + return this->child_->add_dependencies (header, + qos_info, + ACE_TRY_ENV); +} + PortableServer::POA_ptr TAO_EC_ProxyPushSupplier::_default_POA (CORBA::Environment&) { diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h index e5edd9cf6a7..fc79092b87f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h @@ -141,6 +141,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader &header) const; + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); // = The Servant methods virtual PortableServer::POA_ptr _default_POA (CORBA::Environment& env); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.h b/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.h index 6a59096ddcb..a5597b40d3d 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.h @@ -24,7 +24,7 @@ #ifndef TAO_EC_QOS_INFO_H #define TAO_EC_QOS_INFO_H -#include "orbsvcs/orbsvcs_export.h" +#include "orbsvcs/RtecSchedulerC.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -37,7 +37,7 @@ class TAO_ORBSVCS_Export TAO_EC_QOS_Info // filters. // // = DESCRIPTION - // Filters compute QOS information for real-time dispatching, this + // Filters compute QOS information for real-time dispatching, this // class encapsulates that information. // This first implementation is just a place-holder. // @@ -45,6 +45,8 @@ public: TAO_EC_QOS_Info (void); // constructor + RtecScheduler::handle_t rt_info; + RtecScheduler::Preemption_Priority_t preemption_priority; }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.i b/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.i index 2e50c43a45f..df63cf59ad5 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_QOS_Info.i @@ -2,5 +2,7 @@ ACE_INLINE TAO_EC_QOS_Info::TAO_EC_QOS_Info (void) + : rt_info (-1), + preemption_priority (0) { } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.cpp new file mode 100644 index 00000000000..d5fa6eafba3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.cpp @@ -0,0 +1,231 @@ +// $Id$ + +#include "EC_Sched_Filter.h" +#include "EC_QOS_Info.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Sched_Filter.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Sched_Filter, "$Id$") + +TAO_EC_Sched_Filter:: + TAO_EC_Sched_Filter (const char* name, + RtecScheduler::Scheduler_ptr scheduler, + TAO_EC_Filter* body, + RtecScheduler::handle_t body_info, + RtecScheduler::Info_Type_t info_type) + + : rt_info_ (-1), + name_ (name), + scheduler_ (RtecScheduler::Scheduler::_duplicate (scheduler)), + body_ (body), + body_info_ (body_info), + info_type_ (info_type) +{ + this->adopt_child (this->body_); +} + +TAO_EC_Sched_Filter::~TAO_EC_Sched_Filter (void) +{ + delete this->body_; +} + +TAO_EC_Filter::ChildrenIterator +TAO_EC_Sched_Filter::begin (void) const +{ + return this->body_->begin (); +} + +TAO_EC_Filter::ChildrenIterator +TAO_EC_Sched_Filter::end (void) const +{ + return this->body_->end (); +} + +int +TAO_EC_Sched_Filter::size (void) const +{ + return this->body_->size (); +} + +int +TAO_EC_Sched_Filter::filter (const RtecEventComm::EventSet &event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + return this->body_->filter (event, qos_info, ACE_TRY_ENV); +} + +int +TAO_EC_Sched_Filter::filter_nocopy (RtecEventComm::EventSet &event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + return this->body_->filter_nocopy (event, qos_info, ACE_TRY_ENV); +} + +// This is private, so we can make it inline in the .cpp file... +ACE_INLINE void +TAO_EC_Sched_Filter::compute_qos_info (TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + this->init_rt_info (ACE_TRY_ENV); + ACE_CHECK; + + qos_info.rt_info = this->rt_info_; + switch (this->info_type_) + { + default: + case RtecScheduler::DISJUNCTION: + break; + + case RtecScheduler::CONJUNCTION: + case RtecScheduler::OPERATION: + { + RtecScheduler::OS_Priority os_priority; + RtecScheduler::Preemption_Subpriority_t p_subpriority; + RtecScheduler::Preemption_Priority_t p_priority; + this->scheduler_->priority (this->rt_info_, + os_priority, + p_subpriority, + p_priority, + ACE_TRY_ENV); + ACE_CHECK; + qos_info.preemption_priority = p_priority; + } + } +} + +void +TAO_EC_Sched_Filter::push (const RtecEventComm::EventSet &event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + if (this->parent () != 0) + { + this->compute_qos_info (qos_info, ACE_TRY_ENV); + ACE_CHECK; + + this->parent ()->push (event, qos_info, ACE_TRY_ENV); + } +} + +void +TAO_EC_Sched_Filter::push_nocopy (RtecEventComm::EventSet &event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + if (this->parent () != 0) + { + this->compute_qos_info (qos_info, ACE_TRY_ENV); + ACE_CHECK; + + this->parent ()->push_nocopy (event, qos_info, ACE_TRY_ENV); + } +} + +void +TAO_EC_Sched_Filter::clear (void) +{ + this->body_->clear (); +} + +CORBA::ULong +TAO_EC_Sched_Filter::max_event_size (void) const +{ + return this->body_->max_event_size (); +} + +int +TAO_EC_Sched_Filter::can_match (const RtecEventComm::EventHeader& header) const +{ + return this->body_->can_match (header); +} + +int +TAO_EC_Sched_Filter::add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + this->init_rt_info (ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + int matches = this->body_->add_dependencies (header, + qos_info, + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + if (matches != 0) + this->scheduler_->add_dependency (this->rt_info_, qos_info.rt_info, 1, + RtecScheduler::TWO_WAY_CALL, + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + ChildrenIterator end = this->end (); + for (ChildrenIterator i = this->begin (); i != end; ++i) + { + (*i)->add_dependencies (header, qos_info, ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + } + return 0; +} + +void +TAO_EC_Sched_Filter::get_qos_info (TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) +{ + this->init_rt_info (ACE_TRY_ENV); + ACE_CHECK; + + qos_info.rt_info = this->rt_info_; +} + +void +TAO_EC_Sched_Filter::init_rt_info (CORBA::Environment &ACE_TRY_ENV) +{ + if (this->rt_info_ != -1) + return; + + // Create an entry in the Scheduling service... + RtecScheduler::handle_t rt_info = + this->scheduler_->create (this->name_.c_str (), + ACE_TRY_ENV); + ACE_CHECK; + + // Provide dummy values the scheduler will compute them based on the + // dependencies and the fact that this is a DISJUNCTION. + this->scheduler_->set (rt_info, + RtecScheduler::VERY_LOW_CRITICALITY, + 0, // worst_cast_execution_time + 0, // typical_cast_execution_time + 0, // cached_cast_execution_time + 0, // period + RtecScheduler::VERY_LOW_IMPORTANCE, + 0, // quantum + 0, // threads + this->info_type_, + ACE_TRY_ENV); + ACE_CHECK; + + ChildrenIterator end = this->end (); + for (ChildrenIterator i = this->begin (); i != end; ++i) + { + TAO_EC_Filter* filter = *i; + + TAO_EC_QOS_Info child; + filter->get_qos_info (child, ACE_TRY_ENV); + ACE_CHECK; + + this->scheduler_->add_dependency (rt_info, child.rt_info, 1, + RtecScheduler::TWO_WAY_CALL, + ACE_TRY_ENV); + ACE_CHECK; + } + this->scheduler_->add_dependency (this->body_info_, rt_info, 1, + RtecScheduler::TWO_WAY_CALL, + ACE_TRY_ENV); + ACE_CHECK; + + this->rt_info_ = rt_info; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.h new file mode 100644 index 00000000000..8779eda1d55 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.h @@ -0,0 +1,125 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Sched_Filter +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_SCHED_FILTER_H +#define TAO_EC_SCHED_FILTER_H + +#include "orbsvcs/RtecSchedulerC.h" +#include "EC_Filter.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_ORBSVCS_Export TAO_EC_Sched_Filter : public TAO_EC_Filter +{ + // = TITLE + // Decorate a filter with scheduling information + // + // = DESCRIPTION + // This filter decorates a regular filter with scheduling + // information. It creates a new RT_Info entry for the filter and + // it adds the dependencies between the filter and any childrens + // it may have. + // + // = MEMORY MANAGMENT + // It assumes ownership of the children. + // +public: + TAO_EC_Sched_Filter (const char* name, + RtecScheduler::Scheduler_ptr scheduler, + TAO_EC_Filter* body, + RtecScheduler::handle_t body_info, + RtecScheduler::Info_Type_t info_type); + // Constructor. + // It assumes ownership of the <body>, makes a copy of the other + // parameters + + virtual ~TAO_EC_Sched_Filter (void); + // Destructor + + // = The TAO_EC_Filter methods, please check the documentation in + // TAO_EC_Filter. + virtual ChildrenIterator begin (void) const; + virtual ChildrenIterator end (void) const; + virtual int size (void) const; + virtual int filter (const RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment& env); + virtual int filter_nocopy (RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment& env); + virtual void push (const RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment& env); + virtual void push_nocopy (RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment& env); + virtual void clear (void); + virtual CORBA::ULong max_event_size (void) const; + virtual int can_match (const RtecEventComm::EventHeader& header) const; + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); + virtual void get_qos_info (TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV); + +private: + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Sched_Filter + (const TAO_EC_Sched_Filter&)) + ACE_UNIMPLEMENTED_FUNC (TAO_EC_Sched_Filter& operator= + (const TAO_EC_Sched_Filter&)) + + void init_rt_info (CORBA::Environment& env); + // Initialize our RT_Info handle and dependencies + + void compute_qos_info (TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV); + // Compute a new qos_info to push up. + +private: + RtecScheduler::handle_t rt_info_; + // The RT_Info handle for this object + + ACE_CString name_; + // Our operation name + + RtecScheduler::Scheduler_var scheduler_; + // The scheduler we are going to use + + TAO_EC_Filter* body_; + // The implementation + + RtecScheduler::handle_t body_info_; + // The RT_Info handle for the body + + RtecScheduler::Info_Type_t info_type_; + // Required for the scheduling service +}; + +#if defined (__ACE_INLINE__) +#include "EC_Sched_Filter.i" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_EC_SCHED_FILTER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.i b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter.i @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.cpp new file mode 100644 index 00000000000..8211c29f723 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.cpp @@ -0,0 +1,171 @@ +// $Id$ + +#include "EC_Sched_Filter.h" +#include "orbsvcs/Event_Service_Constants.h" +#include "EC_Sched_Filter_Builder.h" +#include "EC_Type_Filter.h" +#include "EC_Conjunction_Filter.h" +#include "EC_Disjunction_Filter.h" +#include "EC_Timeout_Filter.h" +#include "EC_Event_Channel.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Sched_Filter_Builder.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Sched_Filter_Builder, "$Id$") + +TAO_EC_Sched_Filter_Builder::~TAO_EC_Sched_Filter_Builder (void) +{ +} + +TAO_EC_Filter* +TAO_EC_Sched_Filter_Builder::build ( + TAO_EC_ProxyPushSupplier *supplier, + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment &ACE_TRY_ENV) const +{ + CORBA::ULong pos = 0; + RtecScheduler::Scheduler_var scheduler = + this->event_channel_->scheduler (); + return this->recursive_build (supplier, qos, pos, + scheduler.in (), 0, + ACE_TRY_ENV); +} + +TAO_EC_Filter* +TAO_EC_Sched_Filter_Builder:: recursive_build ( + TAO_EC_ProxyPushSupplier *supplier, + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::ULong& pos, + RtecScheduler::Scheduler_ptr scheduler, + const char* base_name, + CORBA::Environment& ACE_TRY_ENV) const +{ + const RtecEventComm::Event& e = qos.dependencies[pos].event; + ACE_CString name; + RtecScheduler::handle_t body_info = qos.dependencies[pos].rt_info; + if (base_name == 0) + { + RtecScheduler::RT_Info_var info = + scheduler->get (body_info, ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + name = info->entry_point.in (); + } + else + name = base_name; + + if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR) + { + pos++; // Consume the designator + CORBA::ULong n = this->count_children (qos, pos); + + TAO_EC_Filter** children; + ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0); + for (CORBA::ULong i = 0; i != n; ++i) + { + ACE_CString child_name = name; + char buf[16]; + ACE_OS::sprintf (buf, "/%04.4x", i); + child_name += buf; + children[i] = this->recursive_build (supplier, qos, pos, + scheduler, + child_name.c_str (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + pos++; + } + return new TAO_EC_Sched_Filter (name.c_str (), + scheduler, + new TAO_EC_Conjunction_Filter(children, + n), + body_info, + RtecScheduler::CONJUNCTION); + } + + else if (e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) + { + pos++; // Consume the designator + CORBA::ULong n = this->count_children (qos, pos); + + TAO_EC_Filter** children; + ACE_NEW_RETURN (children, TAO_EC_Filter*[n], 0); + for (CORBA::ULong i = 0; i != n; ++i) + { + ACE_CString child_name = name; + char buf[16]; + ACE_OS::sprintf (buf, "/%04.4x", i); + child_name += buf; + + children[i] = this->recursive_build (supplier, qos, pos, + scheduler, + child_name.c_str (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + pos++; + } + return new TAO_EC_Sched_Filter (name.c_str (), + scheduler, + new TAO_EC_Disjunction_Filter (children, + n), + body_info, + RtecScheduler::DISJUNCTION); + } + + else if (e.header.type == ACE_ES_EVENT_TIMEOUT + || e.header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT + || e.header.type == ACE_ES_EVENT_DEADLINE_TIMEOUT) + { + pos++; + TAO_EC_QOS_Info qos_info; + + qos_info.rt_info = + scheduler->create (name.c_str (), ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + // Provide dummy values the scheduler will compute them based on the + // dependencies and the fact that this is a DISJUNCTION. + scheduler->set (qos_info.rt_info, + RtecScheduler::VERY_LOW_CRITICALITY, + 0, // worst_cast_execution_time + 0, // typical_cast_execution_time + 0, // cached_cast_execution_time + e.header.creation_time, // period + RtecScheduler::VERY_LOW_IMPORTANCE, + 0, // quantum + 0, // threads + RtecScheduler::OPERATION, + ACE_TRY_ENV); + ACE_CHECK_RETURN (0); + + return new TAO_EC_Timeout_Filter (this->event_channel_, + supplier, + qos_info, + e.header.type, + e.header.creation_time); + } + + return new TAO_EC_Sched_Filter (name.c_str (), + scheduler, + new TAO_EC_Type_Filter (e.header), + body_info, + RtecScheduler::OPERATION); +} + +CORBA::ULong +TAO_EC_Sched_Filter_Builder:: + count_children (RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::ULong pos) const +{ + CORBA::ULong l = qos.dependencies.length (); + CORBA::ULong i; + for (i = pos; i != l; ++i) + { + const RtecEventComm::Event& e = qos.dependencies[i].event; + if (e.header.type == ACE_ES_CONJUNCTION_DESIGNATOR + || e.header.type == ACE_ES_DISJUNCTION_DESIGNATOR) + break; + } + return i - 1; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.h b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.h new file mode 100644 index 00000000000..27d50c44ccf --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.h @@ -0,0 +1,82 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Sched_Filter_Builder +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_SCHED_FILTER_BUILDER_H +#define TAO_EC_SCHED_FILTER_BUILDER_H + +#include "EC_Filter_Builder.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_EC_Filter; +class TAO_EC_Event_Channel; + +class TAO_ORBSVCS_Export TAO_EC_Sched_Filter_Builder : public TAO_EC_Filter_Builder +{ + // = TITLE + // Implement a builder for the fundamental filters. + // + // = DESCRIPTION + // The sched filtering mechanisms in the Event channel + // (source/type based filtering + disjunctions and conjunctions) + // are constructed using this class. + // +public: + TAO_EC_Sched_Filter_Builder (TAO_EC_Event_Channel* ec); + // constructor. + + virtual ~TAO_EC_Sched_Filter_Builder (void); + // destructor... + + // = The TAO_EC_Filter_Builder methods... + TAO_EC_Filter* build (TAO_EC_ProxyPushSupplier *supplier, + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::Environment &env) const; + +private: + TAO_EC_Filter* recursive_build (TAO_EC_ProxyPushSupplier *supplier, + RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::ULong& pos, + RtecScheduler::Scheduler_ptr scheduler, + const char* base_name, + CORBA::Environment& env) const; + // Recursively build the filter tree. + + CORBA::ULong count_children (RtecEventChannelAdmin::ConsumerQOS& qos, + CORBA::ULong pos) const; + // Count the number of children of the current node, i.e. until a + // conjunction or disjunction starts. + +private: + TAO_EC_Event_Channel* event_channel_; + // The event channel. +}; + +#if defined (__ACE_INLINE__) +#include "EC_Sched_Filter_Builder.i" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_EC_SCHED_FILTER_BUILDER_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.i b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.i new file mode 100644 index 00000000000..a6c08b50552 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Sched_Filter_Builder.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +TAO_EC_Sched_Filter_Builder:: + TAO_EC_Sched_Filter_Builder (TAO_EC_Event_Channel *ec) + : event_channel_ (ec) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.cpp new file mode 100644 index 00000000000..b96abfd40a8 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.cpp @@ -0,0 +1,14 @@ +// $Id$ + +#include "EC_Scheduling_Strategy.h" +#include "EC_QOS_Info.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Scheduling_Strategy.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Scheduling_Strategy, "$Id$") + +TAO_EC_Scheduling_Strategy::~TAO_EC_Scheduling_Strategy (void) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.h b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.h new file mode 100644 index 00000000000..46615edcdc0 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.h @@ -0,0 +1,78 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Scheduling_Strategy +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_SCHEDULING_STRATEGY_H +#define TAO_EC_SCHEDULING_STRATEGY_H + +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/RtecEventCommC.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_EC_ProxyPushConsumer; +class TAO_EC_ProxyPushSupplier; +class TAO_EC_QOS_Info; + +class TAO_ORBSVCS_Export TAO_EC_Scheduling_Strategy +{ + // = TITLE + // Define the interface for the scheduling strategy + // + // = DESCRIPTION + // The scheduling strategy controls the actions that the event + // channel must take to update the dependency information in the + // scheduler and to query the scheduler for the priority of each + // event pushed by a supplier. + // + // The base + // + // = MEMORY MANAGMENT + // +public: + virtual ~TAO_EC_Scheduling_Strategy (void); + // Destructor + + virtual void add_proxy_supplier_dependencies ( + TAO_EC_ProxyPushSupplier *supplier, + TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV) = 0; + // Add all the dependencies between <supplier> and <consumer> + + virtual void init_event_qos ( + const RtecEventComm::EventHeader& header, + TAO_EC_ProxyPushConsumer *consumer, + TAO_EC_QOS_Info& qos_info, + CORBA::Environment &ACE_TRY_ENV) = 0; + // Initializes <qos_info> based on the QoS information for + // <consumer> and the event header. + +}; + +#if defined (__ACE_INLINE__) +#include "EC_Scheduling_Strategy.i" +#endif /* __ACE_INLINE__ */ + +#endif /* TAO_EC_SCHEDULING_STRATEGY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.i b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Scheduling_Strategy.i @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp index 3f159a987db..77f15f3ea28 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.cpp @@ -122,3 +122,12 @@ TAO_EC_Timeout_Filter::can_match ( { return 0; } + +int +TAO_EC_Timeout_Filter::add_dependencies ( + const RtecEventComm::EventHeader&, + const TAO_EC_QOS_Info &, + CORBA::Environment &) +{ + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.h index 03fa9eaac6b..9232bdb6e24 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Timeout_Filter.h @@ -84,6 +84,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader& header) const; + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); private: ACE_UNIMPLEMENTED_FUNC (TAO_EC_Timeout_Filter diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp index 9830f6bdb97..e6a03a59a66 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp @@ -54,14 +54,6 @@ TAO_EC_Trivial_Supplier_Filter::push (const RtecEventComm::EventSet& event, TAO_EC_ConsumerAdmin* consumer_admin = this->event_channel_->consumer_admin (); - ACE_GUARD_THROW_EX (TAO_EC_ConsumerAdmin::Busy_Lock, - ace_mon, consumer_admin->busy_lock (), - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - TAO_EC_ConsumerAdmin::SupplierSetIterator end = - consumer_admin->end (); - for (CORBA::ULong j = 0; j < event.length (); ++j) { const RtecEventComm::Event& e = event[j]; @@ -69,6 +61,15 @@ TAO_EC_Trivial_Supplier_Filter::push (const RtecEventComm::EventSet& event, ACE_const_cast(RtecEventComm::Event*, &e); RtecEventComm::EventSet single_event (1, 1, buffer, 0); + ACE_GUARD_THROW_EX ( + TAO_EC_ConsumerAdmin::Busy_Lock, + ace_mon, consumer_admin->busy_lock (), + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + TAO_EC_ConsumerAdmin::SupplierSetIterator end = + consumer_admin->end (); + for (TAO_EC_ConsumerAdmin::SupplierSetIterator i = consumer_admin->begin (); i != end; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp index bf8272708f1..fa8d1cc68ff 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.cpp @@ -80,14 +80,24 @@ int TAO_EC_Type_Filter::can_match ( const RtecEventComm::EventHeader& header) const { - if (this->header_.source == 0) - if (this->header_.type == 0) - return 1; + if (this->header_.source == 0) + if (this->header_.type == 0) + return 1; else - return this->header_.type == header.type; - else if (this->header_.type == 0) - return this->header_.source == header.source; - + return this->header_.type == header.type; + else if (this->header_.type == 0) + return this->header_.source == header.source; + return (this->header_.type == header.type && this->header_.source == header.source); } + +int +TAO_EC_Type_Filter::add_dependencies ( + const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &, + CORBA::Environment &) +{ + return this->can_match (header); +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h index 81c7835a916..c9d6b7d7b1a 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Type_Filter.h @@ -61,6 +61,9 @@ public: virtual void clear (void); virtual CORBA::ULong max_event_size (void) const; virtual int can_match (const RtecEventComm::EventHeader& header) const; + virtual int add_dependencies (const RtecEventComm::EventHeader& header, + const TAO_EC_QOS_Info &qos_info, + CORBA::Environment &ACE_TRY_ENV); private: ACE_UNIMPLEMENTED_FUNC (TAO_EC_Type_Filter diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile index 80d7e568852..3cb0a106a00 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile +++ b/TAO/orbsvcs/orbsvcs/Makefile @@ -109,7 +109,12 @@ ifneq (,$(findstring Event2,$(TAO_ORBSVCS))) Event/EC_Timeout_Generator \ Event/EC_Reactive_Timeout_Generator \ Event/EC_Priority_Dispatching \ - Event/EC_Dispatching_Task + Event/EC_Dispatching_Task \ + Event/EC_Sched_Filter \ + Event/EC_Sched_Filter_Builder \ + Event/EC_Scheduling_Strategy \ + Event/EC_Null_Scheduling \ + Event/EC_Priority_Scheduling #### TAO's Event requires its Sched Service. ifeq (,$(findstring Sched,$(TAO_ORBSVCS))) |