diff options
Diffstat (limited to 'ace/TP_Reactor.cpp')
-rw-r--r-- | ace/TP_Reactor.cpp | 321 |
1 files changed, 52 insertions, 269 deletions
diff --git a/ace/TP_Reactor.cpp b/ace/TP_Reactor.cpp index 16822dc36e7..0d6da2830fa 100644 --- a/ace/TP_Reactor.cpp +++ b/ace/TP_Reactor.cpp @@ -1,6 +1,5 @@ // $Id$ - #include "ace/TP_Reactor.h" #include "ace/Reactor.h" #include "ace/Thread.h" @@ -11,7 +10,6 @@ ACE_RCSID(ace, TP_Reactor, "$Id$") - ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor) int @@ -39,7 +37,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook)); } - // Now that this thread owns the token let us make // Check for timeouts and errors. if (result == -1) { @@ -55,7 +52,6 @@ ACE_TP_Token_Guard::grab_token (ACE_Time_Value *max_wait_time) return result; } - int ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time) { @@ -79,7 +75,6 @@ ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time) ACE_MT (result = this->token_.acquire ()); } - // Now that this thread owns the token let us make // Check for timeouts and errors. if (result == -1) { @@ -148,14 +143,14 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // called. ACE_Countdown_Time countdown (max_wait_time); + // // The order of these events is very subtle, modify with care. - + // // Instantiate the token guard which will try grabbing the token for // this thread. ACE_TP_Token_Guard guard (this->token_); - int result = guard.grab_token (max_wait_time); // If the guard is NOT the owner just return the retval @@ -169,145 +164,10 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) // Update the countdown to reflect time waiting for the token. countdown.update (); - return this->dispatch_i (max_wait_time, guard); } - -int -ACE_TP_Reactor::remove_handler (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask) -{ - int result = 0; - // Artificial scoping for grabbing and releasing the token - { - ACE_TP_Token_Guard guard (this->token_); - - // Acquire the token - result = guard.acquire_token (); - - if (!guard.is_owner ()) - return result; - - // Call the remove_handler_i () with a DONT_CALL mask. We dont - // want to call the handle_close with the token held. - result = this->remove_handler_i (eh->get_handle (), - mask | ACE_Event_Handler::DONT_CALL); - - if (result == -1) - return -1; - } - - // Close down the <Event_Handler> unless we've been instructed not - // to. - if (result == 0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)) - eh->handle_close (ACE_INVALID_HANDLE, mask); - - return 0; -} - -int -ACE_TP_Reactor::remove_handler (ACE_HANDLE handle, - ACE_Reactor_Mask mask) -{ - - ACE_Event_Handler *eh = 0; - int result = 0; - // Artificial scoping for grabbing and releasing the token - { - ACE_TP_Token_Guard guard (this->token_); - - // Acquire the token - result = guard.acquire_token (); - - if (!guard.is_owner ()) - return result; - - size_t slot = 0; - eh = this->handler_rep_.find (handle, &slot); - - if (eh == 0) - return -1; - - // Call the remove_handler_i () with a DONT_CALL mask. We dont - // want to call the handle_close with the token held. - result = this->remove_handler_i (handle, - mask | ACE_Event_Handler::DONT_CALL); - - if (result == -1) - return -1; - } - - // Close down the <Event_Handler> unless we've been instructed not - // to. - // @@ Note: The check for result ==0 may be redundant, but shouldnt - // be a problem. - if (result ==0 && (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)) - eh->handle_close (handle, mask); - - return 0; -} - - -int -ACE_TP_Reactor::remove_handler (const ACE_Handle_Set &handles, - ACE_Reactor_Mask m) -{ - // Array of <Event_Handlers> corresponding to <handles> - ACE_Event_Handler **aeh = 0; - - // Allocate memory for the size of the handle set - ACE_NEW_RETURN (aeh, - ACE_Event_Handler *[handles.num_set ()], - -1); - - size_t index = 0; - - // Artificial scoping for grabbing and releasing the token - { - ACE_TP_Token_Guard guard (this->token_); - - // Acquire the token - int result = guard.acquire_token (); - - if (!guard.is_owner ()) - return result; - - ACE_HANDLE h; - - ACE_Handle_Set_Iterator handle_iter (handles); - - while ((h = handle_iter ()) != ACE_INVALID_HANDLE) - { - size_t slot = 0; - ACE_Event_Handler *eh = - this->handler_rep_.find (h, &slot); - - if (this->remove_handler_i (h, - m | ACE_Event_Handler::DONT_CALL) == -1) - { - delete [] aeh; - return -1; - } - - aeh [index] = eh; - index ++; - } - } - - // Close down the <Event_Handler> unless we've been instructed not - // to. - if (ACE_BIT_ENABLED (m, ACE_Event_Handler::DONT_CALL) == 0) - { - for (size_t i = 0; i < index; i++) - aeh[i]->handle_close (ACE_INVALID_HANDLE, m); - } - - delete [] aeh; - return 0; -} - int ACE_TP_Reactor::register_handler (int, ACE_Event_Handler *, @@ -378,22 +238,6 @@ ACE_TP_Reactor::register_handler (const ACE_Handle_Set &handles, } int -ACE_TP_Reactor::remove_handler (int /*signum*/, - ACE_Sig_Action * /*new_disp*/, - ACE_Sig_Action * /*old_disp*/, - int /*sigkey*/) -{ - ACE_NOTSUP_RETURN (-1); -} - -int -ACE_TP_Reactor::remove_handler (const ACE_Sig_Set & /*sigset*/) -{ - ACE_NOTSUP_RETURN (-1); -} - - -int ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, ACE_TP_Token_Guard &guard) { @@ -413,6 +257,7 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, // a later point of time, we decide to handle signals we have to // release the lock before we make any upcalls.. What is here // now is not the right thing... + // // @@ We need to do better.. return this->handle_signals (event_count, guard); @@ -421,8 +266,8 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, // If there are no signals and if we had received a proper // event_count then first look at dispatching timeouts. We need to // handle timers early since they may have higher latency - // constraints than I/O handlers. Ideally, the order of - // dispatching should be a strategy... + // constraints than I/O handlers. Ideally, the order of dispatching + // should be a strategy... // NOTE: The event count does not have the number of timers that // needs dispatching. But we are still passing this along. We dont @@ -435,14 +280,13 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, if (result > 0) return result; - - // Else justgo ahead fall through for further handling + // Else just go ahead fall through for further handling. if (event_count > 0) { // Next dispatch the notification handlers (if there are any to - // dispatch). These are required to handle multiple-threads that - // are trying to update the <Reactor>. + // dispatch). These are required to handle multiple-threads + // that are trying to update the <Reactor>. result = this->handle_notify_events (event_count, guard); @@ -460,12 +304,8 @@ ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time, } return 0; - } - - - int ACE_TP_Reactor::handle_signals (int & /*event_count*/, ACE_TP_Token_Guard & /*guard*/) @@ -496,7 +336,7 @@ ACE_TP_Reactor::handle_signals (int & /*event_count*/, // result of signals they should be dispatched since // they may be time critical... active_handle_count = this->any_ready (dispatch_set); - #else +#else // active_handle_count = 0; #endif @@ -525,14 +365,25 @@ ACE_TP_Reactor::handle_timer_events (int & /*event_count*/, if (this->timer_queue_->dispatch_info (cur_time, info)) { + const void *upcall_act = 0; + + // Preinvoke. + this->timer_queue_->preinvoke (info, + cur_time, + upcall_act); + // Release the token before dispatching notifies... guard.release_token (); // call the functor - this->timer_queue_->upcall (info.type_, - info.act_, + this->timer_queue_->upcall (info, cur_time); + // Postinvoke + this->timer_queue_->postinvoke (info, + cur_time, + upcall_act); + // We have dispatched a timer return 1; } @@ -540,8 +391,6 @@ ACE_TP_Reactor::handle_timer_events (int & /*event_count*/, return 0; } - - int ACE_TP_Reactor::handle_notify_events (int & /*event_count*/, ACE_TP_Token_Guard &guard) @@ -568,7 +417,7 @@ ACE_TP_Reactor::handle_notify_events (int & /*event_count*/, while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0) { - // Just figure out whether we can read any buffer that has + // Just figure out whether we can read any buffer that has // dispatchable info. If not we have just been unblocked by // another thread trying to update the reactor. If we get any // buffer that needs dispatching we will dispatch that after @@ -612,109 +461,54 @@ ACE_TP_Reactor::handle_socket_events (int &event_count, return 0; } - // Suspend the handler so that other threads don't start - // dispatching it. + // Suspend the handler so that other threads don't start dispatching + // it. + // // NOTE: This check was performed in older versions of the // TP_Reactor. Looks like it is a waste.. if (dispatch_info.event_handler_ != this->notify_handler_) this->suspend_i (dispatch_info.handle_); - // Release the lock. Others threads can start waiting. - guard.release_token (); + int resume_flag = + dispatch_info.event_handler_->resume_handler (); - int result = 0; + int reference_counting_required = + dispatch_info.event_handler_->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; - // If there was an event handler ready, dispatch it. - // Decrement the event left - --event_count; - - if (this->dispatch_socket_event (dispatch_info) == 0) - ++result; // Dispatched an event - - // This is to get around a problem/ which is well described in - // 1361. This is just a work around that would help applications - // from resuming handles at the most inopportune moment. - int flag = - ACE_Event_Handler::ACE_EVENT_HANDLER_NOT_RESUMED; - - // Acquire the token since we want to access the handler - // repository. The call to find () does not hold a lock and hence - // this is required. - guard.acquire_token (); - - // Get the handler for the handle that we have dispatched to. - ACE_Event_Handler *eh = - this->handler_rep_.find (dispatch_info.handle_); - - // This check is required for the following reasons - // 1. If dispatch operation returned a -1, then there is a - // possibility that the event handler could be deleted. In such - // cases the pointer to the event_handler that <dispatch_info> - // holds is set to 0. - // - // 2. If the application did its own memory management, a return - // value of 0 cannot be believed since the event handler could - // be deleted by the application based on some conditions. This - // is *bad*. But we dont have much of a choice with the existing - // reactor setup. To get around this, we can make a check for - // the handler registered with the repository for the handle - // that we have and compare with the handler that we - // posses. Yeah, I know it is like touching your nose by taking - // your hand around your head. But that is the way it is. This - // is a fix for [BUGID 1231] - - if (dispatch_info.event_handler_ != 0 && - eh == dispatch_info.event_handler_) + // Call add_reference() if needed. + if (reference_counting_required) { - flag = - dispatch_info.event_handler_->resume_handler (); + dispatch_info.event_handler_->add_reference (); } - // Use resume_i () since we hold the token already. - if (dispatch_info.handle_ != ACE_INVALID_HANDLE && - dispatch_info.event_handler_ != this->notify_handler_ && - flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER) - this->resume_i (dispatch_info.handle_); - - // Let me release the token here. This is not required since the - // destruction of the object on the stack will take care of this. + // Release the lock. Others threads can start waiting. guard.release_token (); - return result; -} - -int -ACE_TP_Reactor::mask_ops (ACE_HANDLE handle, - ACE_Reactor_Mask mask, - int ops) -{ - ACE_TRACE ("ACE_TP_Reactor::mask_ops"); - ACE_MT (ACE_GUARD_RETURN (ACE_Select_Reactor_Token, - ace_mon, this->token_, -1)); - int result = 0; - // If it looks like the handle isn't suspended, then - // set the ops on the wait_set_, otherwise set the suspend_set_. + // If there was an event handler ready, dispatch it. + // Decrement the event left + --event_count; - if (this->suspend_set_.rd_mask_.is_set (handle) == 0 - && this->suspend_set_.wr_mask_.is_set (handle) == 0 - && this->suspend_set_.ex_mask_.is_set (handle) == 0) + // Dispatched an event + if (this->dispatch_socket_event (dispatch_info) == 0) + ++result; - result = this->bit_ops (handle, mask, - this->wait_set_, - ops); - else + // Resume handler if required. + if (dispatch_info.event_handler_ != this->notify_handler_ && + resume_flag == ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER) + this->resume_handler (dispatch_info.handle_); - result = this->bit_ops (handle, mask, - this->suspend_set_, - ops); + // Call remove_reference() if needed. + if (reference_counting_required) + { + dispatch_info.event_handler_->remove_reference (); + } return result; } - - int ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) { @@ -756,7 +550,8 @@ ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time) int ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event) { - event.reset (); // Nothing to dispatch yet + // Nothing to dispatch yet + event.reset (); // Check for dispatch in write, except, read. Only catch one, but if // one is caught, be sure to clear the handle from each mask in case @@ -863,10 +658,6 @@ ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info) int retval = this->remove_handler (handle, mask); - // As the handler is no longer valid, invalidate the handle - dispatch_info.event_handler_ = 0; - dispatch_info.handle_ = ACE_INVALID_HANDLE; - return retval; } @@ -886,14 +677,6 @@ ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time) return this->handle_events (&max_wait_time); } -int -ACE_TP_Reactor::mask_ops (ACE_Event_Handler *eh, - ACE_Reactor_Mask mask, - int ops) -{ - return this->mask_ops (eh->get_handle (), mask, ops); -} - void ACE_TP_Reactor::notify_handle (ACE_HANDLE, ACE_Reactor_Mask, |