summaryrefslogtreecommitdiff
path: root/Kokyu/Dispatcher_Task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Kokyu/Dispatcher_Task.cpp')
-rw-r--r--Kokyu/Dispatcher_Task.cpp59
1 files changed, 54 insertions, 5 deletions
diff --git a/Kokyu/Dispatcher_Task.cpp b/Kokyu/Dispatcher_Task.cpp
index e6d86711865..5fcce5f328e 100644
--- a/Kokyu/Dispatcher_Task.cpp
+++ b/Kokyu/Dispatcher_Task.cpp
@@ -68,6 +68,13 @@ Dispatcher_Task::initialize ()
own_allocator_ = 1;
}
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ this->deferrer_attr_.task_ = this;
+ this->deferrer_.init(this->deferrer_attr_);
+
+ this->releases_.open();
+#endif //KOKYU_HAS_RELEASE_GUARD
+
return 0;
}
@@ -138,6 +145,13 @@ int
Dispatcher_Task::enqueue (const Dispatch_Command* cmd,
const QoSDescriptor& qos_info)
{
+ //Subclasses which override this function should now be overriding
+ //enqueue(ACE_Message_Block*) because that is where the main
+ //behavior is defined. This might invalidate those classes, but the
+ //decision seemed reasonable since it allows Dispatch_Deferrer to
+ //use the same Dispatch_Queue_Item as Dispatcher_Task without any
+ //more memory allocation and it doesn't break the interface.
+
void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item));
if (buf == 0)
@@ -145,12 +159,47 @@ Dispatcher_Task::enqueue (const Dispatch_Command* cmd,
ACE_Message_Block *mb =
new (buf) Dispatch_Queue_Item (cmd,
- qos_info,
- &(this->data_block_),
- ACE_Message_Block::DONT_DELETE,
- this->allocator_);
+ qos_info,
+ &(this->data_block_),
+ ACE_Message_Block::DONT_DELETE,
+ this->allocator_);
+
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ //if current release time < last release time + period then defer dispatch
+ ACE_Time_Value now(ACE_OS::gettimeofday());
+ long rel_msec;
+ if (this->releases_.find(qos_info,rel_msec) < 0)
+ {
+ //new QosDescriptor, so just set last release to zero
+ rel_msec = 0;
+ }
+ ACE_Time_Value release;
+ release.msec(rel_msec);
+ release += qos_info.deadline_;
+ if (now < release)
+ {
+ //defer until last release time + period
+ Dispatch_Queue_Item *qitem =
+ ACE_dynamic_cast(Dispatch_Queue_Item*, mb);
+
+ if (qitem == 0)
+ {
+ ACE_Message_Block::release (mb);
+ continue;
+ }
+
+ this->deferrer_.dispatch(qitem);
+ }
+ else
+ {
+ //release!
+#endif //KOKYU_HAS_RELEASE_GUARD
+
+ this->enqueue (mb);
- this->putq (mb);
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ } //else now >= release
+#endif //KOKYU_HAS_RELEASE_GUARD
return 0;
}