diff options
Diffstat (limited to 'Kokyu/Dispatcher_Task.cpp')
-rw-r--r-- | Kokyu/Dispatcher_Task.cpp | 59 |
1 files changed, 54 insertions, 5 deletions
diff --git a/Kokyu/Dispatcher_Task.cpp b/Kokyu/Dispatcher_Task.cpp index e6d86711865..5fcce5f328e 100644 --- a/Kokyu/Dispatcher_Task.cpp +++ b/Kokyu/Dispatcher_Task.cpp @@ -68,6 +68,13 @@ Dispatcher_Task::initialize () own_allocator_ = 1; } +#ifdef KOKYU_HAS_RELEASE_GUARD + this->deferrer_attr_.task_ = this; + this->deferrer_.init(this->deferrer_attr_); + + this->releases_.open(); +#endif //KOKYU_HAS_RELEASE_GUARD + return 0; } @@ -138,6 +145,13 @@ int Dispatcher_Task::enqueue (const Dispatch_Command* cmd, const QoSDescriptor& qos_info) { + //Subclasses which override this function should now be overriding + //enqueue(ACE_Message_Block*) because that is where the main + //behavior is defined. This might invalidate those classes, but the + //decision seemed reasonable since it allows Dispatch_Deferrer to + //use the same Dispatch_Queue_Item as Dispatcher_Task without any + //more memory allocation and it doesn't break the interface. + void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item)); if (buf == 0) @@ -145,12 +159,47 @@ Dispatcher_Task::enqueue (const Dispatch_Command* cmd, ACE_Message_Block *mb = new (buf) Dispatch_Queue_Item (cmd, - qos_info, - &(this->data_block_), - ACE_Message_Block::DONT_DELETE, - this->allocator_); + qos_info, + &(this->data_block_), + ACE_Message_Block::DONT_DELETE, + this->allocator_); + +#ifdef KOKYU_HAS_RELEASE_GUARD + //if current release time < last release time + period then defer dispatch + ACE_Time_Value now(ACE_OS::gettimeofday()); + long rel_msec; + if (this->releases_.find(qos_info,rel_msec) < 0) + { + //new QosDescriptor, so just set last release to zero + rel_msec = 0; + } + ACE_Time_Value release; + release.msec(rel_msec); + release += qos_info.deadline_; + if (now < release) + { + //defer until last release time + period + Dispatch_Queue_Item *qitem = + ACE_dynamic_cast(Dispatch_Queue_Item*, mb); + + if (qitem == 0) + { + ACE_Message_Block::release (mb); + continue; + } + + this->deferrer_.dispatch(qitem); + } + else + { + //release! +#endif //KOKYU_HAS_RELEASE_GUARD + + this->enqueue (mb); - this->putq (mb); +#ifdef KOKYU_HAS_RELEASE_GUARD + } //else now >= release +#endif //KOKYU_HAS_RELEASE_GUARD return 0; } |