summaryrefslogtreecommitdiff
path: root/TAO/local/bin/Event_Service/Dispatching_Modules.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/local/bin/Event_Service/Dispatching_Modules.cpp')
-rw-r--r--TAO/local/bin/Event_Service/Dispatching_Modules.cpp603
1 files changed, 0 insertions, 603 deletions
diff --git a/TAO/local/bin/Event_Service/Dispatching_Modules.cpp b/TAO/local/bin/Event_Service/Dispatching_Modules.cpp
deleted file mode 100644
index 13d8cbe0d46..00000000000
--- a/TAO/local/bin/Event_Service/Dispatching_Modules.cpp
+++ /dev/null
@@ -1,603 +0,0 @@
-// $Id$
-
-#include "ace/Sched_Params.h"
-#include "Memory_Pools.h"
-#include "Scheduler_Factory.h"
-
-#include "Dispatching_Modules.h"
-
-#if !defined (__ACE_INLINE__)
-#include "Dispatching_Modules.i"
-#endif /* __ACE_INLINE__ */
-
-typedef ACE_EventChannel::DISPATCH_ERROR DISPATCH_ERROR;
-typedef ACE_EventChannel::SYNCHRONIZATION_ERROR SYNC_ERROR;
-typedef ACE_EventChannel::QOS_ERROR QOS_ERROR;
-
-// ************************************************************
-
-void
-ACE_ES_Dispatch_Request::make_copy (RtecEventComm::EventSet &dest) const
-{
- if (use_single_event_)
- {
- dest.length (1);
- dest[0] = single_event_;
- }
- else
- {
- dest.length (event_set_.length ());
-
- for (CORBA::ULong index=0; index < event_set_.length (); index++)
- {
- RtecEventComm::Event &dest_event = dest[index];
- ACE_ES_Event_Container_var &source_event_var = ((ACE_ES_Event_Container_var &) event_set_[index]);
- dest_event = *(source_event_var.operator->());
- }
- }
-}
-
-int
-ACE_ES_Dispatch_Request::execute (u_long &command_action)
-{
- ACE_TIMEPROBE ("dispatch (dequeue) the event");
-
- return dispatching_module_->dispatch_event (this, command_action);
-}
-
-#if 0
-void *
-ACE_ES_Dispatch_Request::operator new (size_t nbytes)
-{
- if (nbytes > sizeof (ACE_ES_Dispatch_Request))
- {
- ACE_ERROR ((LM_ERROR, "nbytes = %d, sizeof (ACE_ES_Dispatch_Request_Chunk) = %d.\n",
- sizeof (ACE_ES_Dispatch_Request)));
- ACE_ASSERT (nbytes <= sizeof (ACE_ES_Dispatch_Request));
- }
-
- return ACE_ES_Memory_Pools::new_Dispatch_Request ();
-}
-
-void
-ACE_ES_Dispatch_Request::operator delete (void *mem)
-{
- ACE_ES_Memory_Pools::delete_Dispatch_Request (mem);
-}
-#endif /* 0 */
-
-// ************************************************************
-
-ACE_ES_Priority_Dispatching::ACE_ES_Priority_Dispatching (ACE_EventChannel *channel,
- int threads_per_queue)
- : ACE_ES_Dispatching_Base (channel),
- notification_strategy_ (this),
- highest_priority_ (0),
- shutdown_ (0),
- threads_per_queue_ (threads_per_queue)
-{
- // If we're single threaded, then we need to use the notification strategy.
- if ((threads_per_queue_ == 0) &&
- (notification_strategy_.open () == -1))
- ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Dispatching"));
-
- // Initialize the queues.
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
- {
- queues_[x] = 0;
- delete_me_queues_[x] = 0;
- }
-
- this->initialize_queues ();
-}
-
-ACE_ES_Priority_Dispatching::~ACE_ES_Priority_Dispatching (void)
-{
- // Delete the queues.
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
- delete delete_me_queues_[x];
-}
-
-
-void
-ACE_ES_Priority_Dispatching::initialize_queues (void)
-{
- for (int x=0; x < ACE_Scheduler_MAX_PRIORITIES; x++)
- {
- RtecScheduler::Period tv = ACE_Scheduler_Rates[x];
- queues_[x] = new ACE_ES_Dispatch_Queue (this, &notification_strategy_);
- if (queues_[x] == 0 ||
- queues_[x]->open_queue (tv, threads_per_queue_) == -1)
- {
- ACE_ERROR ((LM_ERROR, "%p.\n", "ACE_ES_Priority_Dispatching::initialize_queues"));
- return;
- }
-
- queue_count_[x] = 1;
- }
-
- highest_priority_ = ACE_Scheduler_MAX_PRIORITIES - 1;
-}
-
-void
-ACE_ES_Priority_Dispatching::connected (ACE_Push_Consumer_Proxy *consumer,
- CORBA::Environment &_env)
-{
- down_->connected (consumer, _env);
-
- // This code does dynamic allocation of channel dispatch threads.
- // It requires that consumer's priorities are known at connection
- // time and that threads can request priorities from the scheduler
- // at run-time. These are both antithetical to static scheduling.
- // The constructor now allocates a thread per rate group.
-#if 0
- // We have to tell the lower portions of the channel about the
- // consumer first. This is so that any changes to the consumer's
- // qos will take effect when we get the dispatch priority.
- down_->connected (consumer, _env);
-
- RtecScheduler::OS_Priority priority =
- ACE_Scheduler::instance ().preemption_priority (consumer->qos ().rt_info_);
-
- {
- ACE_ES_GUARD ace_mon (lock_);
-
- // If a queue has not been created for the consumer's priority,
- // create one.
- if (queues_[priority] == 0)
- {
- // Allocate a new dispatch queue.
- queues_[priority] = new ACE_ES_Dispatch_Queue (this, &notification_strategy_);
- if (queues_[priority] == 0)
- ACE_THROW (CORBA::NO_MEMORY (0, CORBA::COMPLETED_NO,
- "ACE_ES_Priority_Dispatching::connected"));
-
- // Initialize the dispatch queue corresponding to the
- // consumer's priority. With a full implementation of the
- // run-time scheduler, the dispatch queue can find it's
- // scheduling qos online. However, we pass the rate in case
- // one is not found. The rate can be used to obtain the
- // proper priority. If threads_per_queue_ == 0, then these
- // queues will be passive. Otherwise, they will be active.
- // This switches us between MT_ORB and ST_ORB. If we're
- // single-threaded, this registers us with the ReactorEx using
- // our notification_strategy_. If we're multi-threaded, this
- // spawns the threads.
- if (queues_[priority]->open_queue (priority,
- threads_per_queue_) == -1)
- ACE_THROW (DISPATCH_ERROR (0, CORBA::COMPLETED_NO,
- "ACE_ES_Priority_Dispatching::connected:"
- "queue open failed.\n"));
-
- // When this goes down to 0, we will shutdown the queue.
- queue_count_[priority] = 1;
-
- // Keep track of this to optimize handle_signal.
- if (priority > highest_priority_)
- highest_priority_ = priority;
-
- ACE_DEBUG ((LM_DEBUG, "Created queue priority = %d.\n", priority));
- }
- else
- queue_count_[priority]++;
- }
-#endif
-}
-
-void
-ACE_ES_Priority_Dispatching::disconnected (ACE_Push_Consumer_Proxy *consumer)
-{
- // We'll not dynamically close down queues.
- ACE_UNUSED_ARG (consumer);
-
-#if 0
- RtecScheduler::OS_Priority priority =
- ACE_Scheduler::instance ().preemption_priority (consumer->qos ().rt_info_);
-
- {
- ACE_ES_GUARD ace_mon (lock_);
-
- // If there are no more users of this queue, then we *could* shut
- // it down. However, we will not.
- if (--queue_count_[priority] <= 0)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) unused dispatch queue priority = %d, "
- "is_empty = %d.\n",
- priority, queues_[priority]->msg_queue ()->is_empty ()));
-
- queues_[priority]->shutdown_task ();
- }
- }
-#endif
-}
-
-// @@ This method could have a bypass optimization.
-// <request> has been dynamically allocated by the filtering module.
-void
-ACE_ES_Priority_Dispatching::push (ACE_ES_Dispatch_Request *request,
- CORBA::Environment &_env)
-{
- ACE_TIMEPROBE (" push_source_type: Correlation Module");
-
- RtecScheduler::OS_Priority thread_priority;
- RtecScheduler::Sub_Priority subpriority;
- RtecScheduler::Preemption_Priority preemption_priority;
-
- if (request->rt_info () != 0)
- {
- ACE_Scheduler_Factory::server ()->priority
- (request->rt_info (),
- thread_priority,
- subpriority,
- preemption_priority,
- _env);
- if (_env.exception ())
- {
- return;
- }
- }
- else
- {
- thread_priority =
- ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
- ACE_SCOPE_PROCESS);
- subpriority = ACE_Scheduler_MIN_SUB_PRIORITY;
- preemption_priority = ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
- }
-
- // If it's a request to forward an event, it needs a reference to us
- // to call dispatch_event.
- request->set (this, preemption_priority, subpriority);
-
- // Make sure that a queue exists for this priority.
- if (queues_[preemption_priority] == 0)
- {
- ACE_ERROR ((LM_ERROR, "Push to closed queue %d, dropping event.\n", preemption_priority));
- return;
-#if 0
- ACE_THROW (SYNC_ERROR (0, CORBA::COMPLETED_NO, "ACE_ES_Priority_Dispatching::push"));
-#endif /* 0 */
- }
-
- // Enqueue the request. If we're multi-threaded, this request is a
- // command object that will be called by the threads in the queue,
- // or will be dequeued by this->handle_signal if we're
- // single-threaded.
- if (queues_[preemption_priority]->try_put (request) == -1)
- {
- if (ACE_ES_Dispatch_Request::release (request) != 0)
- ACE_ERROR ((LM_ERROR, "ACE_ES_Priority_Dispatching::push"
- " release failed.\n"));
- if (errno != EPIPE)
- {
- ACE_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
- // @@ Orbix parameters
- // 0, CORBA::COMPLETED_NO,
- // "ACE_ES_Priority_Dispatching::push enqueue failed"));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,
- "Request rejected from closed queue %d.\n",
- preemption_priority));
- }
- }
-}
-
-// Start at highest priority queue checking for queued events
-// continuing to lowest priority queue. If an event is ever found,
-// dispatch it and then start back at the highest priority queue
-// again.
-int
-ACE_ES_Priority_Dispatching::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- int done;
-
- do
- {
- done = 1;
- for (int x = 0; x <= highest_priority_; x++)
- {
- // If the queue is not empty, dispatch the request and then
- // start the for loop from the beginning.
- if ((queues_[x] != 0) && (!queues_[x]->msg_queue ()->is_empty ()))
- {
- // Dequeue and service the request.
- queues_[x]->svc_one ();
-
- // Exit the for loop and start over.
- done = 0;
- break;
- }
-
- // If we get through the whole for loop without dispatching
- // anything, then we're done.
- }
- }
- while (!done);
-
-
- return 0;
-}
-
-// This is only for the non-win32 single-threaded implementation.
-int
-ACE_ES_Priority_Dispatching::handle_input (ACE_HANDLE)
-{
- return this->handle_signal (0, 0, 0);
-}
-
-// Shutdown each queue. When each queue exits, they will call back
-// this->dispatch_queue_closed which allows us to free up resources.
-// When the last queue has closed, we'll delete ourselves.
-void
-ACE_ES_Priority_Dispatching::shutdown (void)
-{
- if (shutdown_)
- return;
-
- ACE_DEBUG ((LM_DEBUG, "(%t) ACE_ES_Priority_Dispatching "
- "module shutting down.\n"));
-
- shutdown_ = 1;
-
- // If we're single threaded, then we need to shut down the
- // notification strategy so it can remove itself from the reactor.
- if (threads_per_queue_ == 0)
- notification_strategy_.shutdown ();
-
- // Whether these are active or not, they must be shut down.
- for (int x = 0; x <= highest_priority_; x++)
- if (queues_[x] != 0)
- {
- ACE_DEBUG ((LM_DEBUG, "shutting down dispatch queue %d.\n", x));
- queues_[x]->shutdown_task ();
- }
-}
-
-// This gets called every time a Dispatch Queue closes down. We
-// search for <queue> and delete it. If we have been told to delete
-// ourself, after the last queue has been deleted, we delete
-// ourselves.
-void
-ACE_ES_Priority_Dispatching::dispatch_queue_closed (ACE_ES_Dispatch_Queue *queue)
-{
- ACE_ES_GUARD ace_mon (lock_);
-
- // Find the queue.
- for (int x = 0; x <= highest_priority_; x++)
- {
- if (queues_[x] == queue)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) Dispatch queue %d is closed.\n", x));
-
- // Store the queue for deleting in this object's destructor.
- delete_me_queues_[x] = queues_[x];
- queues_[x] = 0;
-
- // Reset highest_priority_.
- if (x == highest_priority_)
- {
- while ((--highest_priority_ >= 0) &&
- (queues_[highest_priority_] == 0));
-
- if (highest_priority_ < 0)
- {
- ACE_DEBUG ((LM_DEBUG, "Dispatching module shut down.\n"));
- up_->shutdown ();
- return;
- }
- }
-
- // If we found the queue, we can exit the for loop.
- break;
- }
- }
-}
-
-/*
-ACE_HANDLE
-ACE_ES_Priority_Dispatching::get_handle (void) const
-{
- ACE_ES_Priority_Dispatching *fake_this = (ACE_ES_Priority_Dispatching *) this;
- return fake_this->notification_strategy_.get_handle ();
-}
-*/
-
-// ************************************************************
-
-ACE_ES_Dispatch_Queue::ACE_ES_Dispatch_Queue (ACE_ES_Dispatching_Base *dispatching_module,
- ACE_ES_Notification_Strategy *notification_strategy)
- : dispatching_module_ (dispatching_module),
- notification_strategy_ (notification_strategy)
-{
-}
-
-int
-ACE_ES_Dispatch_Queue::open_queue (RtecScheduler::Period &period,
- int threads)
-{
- // First set up the correct message queue according to whether the
- // dispatch queue will be active or not.
-
- // If there are no threads specified, we'll register with the
- // reactor to be called back when someone queues a message.
- if (threads == 0)
- {
- // Allocate a message queue that notifies a reactor when events
- // arrive via the msg_queue call. If that succeeds, set the
- // notification strategy in our message queue via the open call.
- if (this->msg_queue () == 0 ||
- this->msg_queue ()->open (ACE_ES_QUEUE::DEFAULT_HWM,
- ACE_ES_QUEUE::DEFAULT_LWM,
- notification_strategy_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p msg_queue.open failed.\n",
- "ACE_ES_Dispatch_Queue::open_queue"), -1);
- }
- else
- {
- // Allocate a message queue that does not notify.
- ACE_ES_MQ *mq = new ACE_ES_MQ;
- if (mq == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
- "ACE_ES_Dispatch_Queue::open_queue"), -1);
- else
- {
- // This deletes previous message queue.
- this->msg_queue (mq);
- // Set this so that the destructor of ACE_Task deletes our
- // message queue. Note, this must be after the call to
- // msg_queue.
- delete_msg_queue_ = 1;
- }
- }
-
- // Create a name for ourselves using the priority.
- char temp[64];
- ACE_OS::sprintf (temp, "ACE_ES_Dispatch_Queue-%u", period);
-
- // Open the task. This will query the scheduler for our qos
- // structure. It will also synch_threads if it succeeds.
- int result = this->open_task (temp);
-
- switch (result)
- {
- case -1:
- // Error.
- ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
- "ACE_ES_Dispatch_Queue::open_queue"), -1);
- case 0:
- ACE_TRY
- {// @@ TODO: Handle exceptions...
- ACE_Scheduler_Factory::server()->set(rt_info_,
- 0, 0, 0, 0,
- RtecScheduler::VERY_LOW,
- RtecScheduler::NO_QUANTUM,
- 1, ACE_TRY_ENV);
- ACE_CHECK_ENV;
- }
- ACE_CATCHANY
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- "ACE_ES_Display_Queue::exception"), -1);
- }
- ACE_ENDTRY;
- case 1:
- // Found.
- break;
- }
-
- // Spawn threads.
- return this->synch_threads (threads);
-}
-
-// This is called back by ACE_RT_Task when all the threads have
-// exited. We're going to forward this event to the dispatching
-// module so it can clean up any resources.
-void
-ACE_ES_Dispatch_Queue::threads_closed (void)
-{
- dispatching_module_->dispatch_queue_closed (this);
-}
-
-// ************************************************************
-
-ACE_ES_EFD_Dispatching::ACE_ES_EFD_Dispatching (ACE_EventChannel *channel)
- : ACE_ES_Dispatching_Base (channel)
-{
-}
-
-void
-ACE_ES_EFD_Dispatching::push (ACE_ES_Dispatch_Request *request,
- CORBA::Environment &)
-{
- // If it's a request to forward an event, it needs a reference to us
- // to call dispatch_event.
- request->set (this, 0, ACE_Scheduler_MIN_SUB_PRIORITY);
-
- u_long command_action = ACE_RT_Task_Command::RELEASE;
-
- // This may be a request to delete a push consumer proxy, so we
- // should execute it instead of just forwarding it.
- request->execute (command_action);
-
- switch (command_action)
- {
- case ACE_RT_Task_Command::RELEASE:
- // Free the request.
- if (ACE_ES_Dispatch_Request::release (request) != 0)
- ACE_ERROR ((LM_ERROR, "ACE_ES_EFD_Dispatching::push"
- " release failed.\n"));
- break;
-
- default:
- ACE_ERROR ((LM_ERROR, "ACE_ES_EFD_Dispatching::push: unknown command action.\n"));
- }
-}
-
-// ************************************************************
-
-ACE_ES_RTU_Dispatching::ACE_ES_RTU_Dispatching (ACE_EventChannel *channel)
- : ACE_ES_Priority_Dispatching (channel, 0)
-{
-}
-
-// We're called from a dispatch queue, so we can not release the request.
-int
-ACE_ES_RTU_Dispatching::dispatch_event (ACE_ES_Dispatch_Request *request,
- u_long &command_action)
-{
- // Store the priority of the task currently running.
- channel_->rtu_manager ()->priority (request->priority ());
-
- ACE_TRY
- {
- // Forward the request.
- up_->push (request, ACE_TRY_ENV);
- ACE_CHECK_ENV;
- }
- ACE_CATCHANY
- {
- ACE_ERROR ((LM_ERROR, "ACE_ES_RTU_Dispatching::dispatch_event unknown exception.\n"));
- }
- ACE_ENDTRY;
-
- // Reset the priority.
- channel_->rtu_manager ()->priority (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, ACE_SCOPE_PROCESS));
-
- // If the task was preempted, enqueue the request on the head of the
- // dispatch queue.
- if (channel_->rtu_manager ()->not_done ())
- command_action = ACE_RT_Task_Command::UNGETQ;
- else
- // We're done with it.
- command_action = ACE_RT_Task_Command::RELEASE;
-
- return 0;
-}
-
-void
-ACE_ES_RTU_Dispatching::push (ACE_ES_Dispatch_Request *request,
- CORBA::Environment &_env)
-{
- // First enqueue the message in the proper queue.
- ACE_ES_Priority_Dispatching::push (request, _env);
-
- // If the current event is higher priority (lower value) than the
- // current running task, then tell the task to preempt itself.
- int request_priority = request->priority ();
- int running_priority = channel_->rtu_manager ()->priority ();
- if (request_priority < running_priority)
- channel_->rtu_manager ()->should_preempt (1);
- return;
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-template class ACE_CORBA_Sequence<ACE_CORBA_var<ACE_ES_Event_Container> >;
-template void operator+=(ACE_CORBA_Sequence<ACE_CORBA_var<ACE_ES_Event_Container> > &, ACE_CORBA_var<ACE_ES_Event_Container> const &);
-
-#if defined(ACE_ES_LACKS_ORB)
-template class ACE_CORBA_Sequence<ACE_ES_Event>;
-#endif /* ACE_ES_LACKS_ORB */
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */