diff options
Diffstat (limited to 'ace/WFMO_Reactor.cpp')
-rw-r--r-- | ace/WFMO_Reactor.cpp | 310 |
1 files changed, 201 insertions, 109 deletions
diff --git a/ace/WFMO_Reactor.cpp b/ace/WFMO_Reactor.cpp index cd689927ec5..d1c7043b35c 100644 --- a/ace/WFMO_Reactor.cpp +++ b/ace/WFMO_Reactor.cpp @@ -27,8 +27,8 @@ int ACE_WFMO_Reactor_Handler_Repository::open (size_t size) { if (size > MAXIMUM_WAIT_OBJECTS) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"), + ACE_ERROR_RETURN ((LM_ERROR, + ASYS_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"), size, MAXIMUM_WAIT_OBJECTS), -1); @@ -71,40 +71,131 @@ ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void) delete [] this->to_be_added_info_; } -void -ACE_WFMO_Reactor_Handler_Repository::remove_network_events_i (long &existing_masks, - ACE_Reactor_Mask to_be_removed_masks) +ACE_Reactor_Mask +ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks, + ACE_Reactor_Mask change_masks, + int operation) { - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::READ_MASK)) + // + // Find the old reactor masks. This automatically does the work of + // the GET_MASK operation. + // + + ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK; + + if (ACE_BIT_ENABLED (existing_masks, FD_READ) || + ACE_BIT_ENABLED (existing_masks, FD_CLOSE)) { - ACE_CLR_BITS (existing_masks, FD_READ); - ACE_CLR_BITS (existing_masks, FD_CLOSE); + ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK); } - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::WRITE_MASK)) - ACE_CLR_BITS (existing_masks, FD_WRITE); + if (ACE_BIT_ENABLED (existing_masks, FD_WRITE)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK); + + if (ACE_BIT_ENABLED (existing_masks, FD_OOB)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK); + + if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK); + + if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK); + + if (ACE_BIT_ENABLED (existing_masks, FD_QOS)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK); + + if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS)) + ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK); + + switch (operation) + { + case ACE_Reactor::CLR_MASK: + + // + // For the CLR_MASK operation, clear only the specific masks. + // + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK)) + { + ACE_CLR_BITS (existing_masks, FD_READ); + ACE_CLR_BITS (existing_masks, FD_CLOSE); + } + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK)) + ACE_CLR_BITS (existing_masks, FD_WRITE); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK)) + ACE_CLR_BITS (existing_masks, FD_OOB); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK)) + ACE_CLR_BITS (existing_masks, FD_ACCEPT); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK)) + ACE_CLR_BITS (existing_masks, FD_CONNECT); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK)) + ACE_CLR_BITS (existing_masks, FD_QOS); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK)) + ACE_CLR_BITS (existing_masks, FD_GROUP_QOS); + + break; + + case ACE_Reactor::SET_MASK: + + // + // If the operation is a set, first reset any existing masks + // + + existing_masks = 0; + /* FALLTHRU */ + + case ACE_Reactor::ADD_MASK: + + // + // For the ADD_MASK and the SET_MASK operation, add only the + // specific masks. + // + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK)) + { + ACE_SET_BITS (existing_masks, FD_READ); + ACE_SET_BITS (existing_masks, FD_CLOSE); + } - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::EXCEPT_MASK)) - ACE_CLR_BITS (existing_masks, FD_OOB); + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK)) + ACE_SET_BITS (existing_masks, FD_WRITE); - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::ACCEPT_MASK)) - ACE_CLR_BITS (existing_masks, FD_ACCEPT); + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK)) + ACE_SET_BITS (existing_masks, FD_OOB); - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::CONNECT_MASK)) - ACE_CLR_BITS (existing_masks, FD_CONNECT); + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK)) + ACE_SET_BITS (existing_masks, FD_ACCEPT); - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::QOS_MASK)) - ACE_CLR_BITS (existing_masks, FD_QOS); + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK)) + ACE_SET_BITS (existing_masks, FD_CONNECT); - if (ACE_BIT_ENABLED (to_be_removed_masks, - ACE_Event_Handler::GROUP_QOS_MASK)) - ACE_CLR_BITS (existing_masks, FD_GROUP_QOS); + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK)) + ACE_SET_BITS (existing_masks, FD_QOS); + + if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK)) + ACE_SET_BITS (existing_masks, FD_GROUP_QOS); + + break; + + case ACE_Reactor::GET_MASK: + + // + // The work for this operation is done in all cases at the + // begining of the function. + // + + ACE_UNUSED_ARG (change_masks); + + break; + } + + return old_masks; } int @@ -133,7 +224,8 @@ ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle, // Make sure that it is not already marked for deleted !this->current_info_[i].delete_entry_) { - result = this->remove_handler_i (i, mask); + result = this->remove_handler_i (i, + mask); if (result == -1) error = 1; } @@ -147,7 +239,8 @@ ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle, // Make sure that it is not already marked for deleted !this->current_suspended_info_[i].delete_entry_) { - result = this->remove_suspended_handler_i (i, mask); + result = this->remove_suspended_handler_i (i, + mask); if (result == -1) error = 1; } @@ -161,7 +254,8 @@ ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle, // Make sure that it is not already marked for deleted !this->to_be_added_info_[i].delete_entry_) { - result = this->remove_to_be_added_handler_i (i, mask); + result = this->remove_to_be_added_handler_i (i, + mask); if (result == -1) error = 1; } @@ -183,8 +277,9 @@ ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t index, { // See if there are other events that the <Event_Handler> is // interested in - this->remove_network_events_i (this->current_info_[index].network_events_, - to_be_removed_masks); + this->bit_ops (this->current_info_[index].network_events_, + to_be_removed_masks, + ACE_Reactor::CLR_MASK); // Disassociate/Reassociate the event from/with the I/O handle. // This will depend on the value of remaining set of network @@ -216,7 +311,7 @@ ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t index, // Remember the mask this->current_info_[index].close_masks_ = to_be_removed_masks; // Increment the handle count - this->handles_to_be_deleted_++; + this->handles_to_be_deleted_++; } else { @@ -229,7 +324,7 @@ ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t index, if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0) { ACE_HANDLE handle = this->current_info_[index].io_handle_; - this->current_info_[index].event_handler_->handle_close (handle, + this->current_info_[index].event_handler_->handle_close (handle, to_be_removed_masks); } } @@ -246,8 +341,9 @@ ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t index, { // See if there are other events that the <Event_Handler> is // interested in - this->remove_network_events_i (this->current_suspended_info_[index].network_events_, - to_be_removed_masks); + this->bit_ops (this->current_suspended_info_[index].network_events_, + to_be_removed_masks, + ACE_Reactor::CLR_MASK); // Disassociate/Reassociate the event from/with the I/O handle. // This will depend on the value of remaining set of network @@ -309,8 +405,9 @@ ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t index, { // See if there are other events that the <Event_Handler> is // interested in - this->remove_network_events_i (this->to_be_added_info_[index].network_events_, - to_be_removed_masks); + this->bit_ops (this->to_be_added_info_[index].network_events_, + to_be_removed_masks, + ACE_Reactor::CLR_MASK); // Disassociate/Reassociate the event from/with the I/O handle. // This will depend on the value of remaining set of network @@ -355,7 +452,7 @@ ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t index, if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0) { ACE_HANDLE handle = this->to_be_added_info_[index].io_handle_; - this->to_be_added_info_[index].event_handler_->handle_close (handle, + this->to_be_added_info_[index].event_handler_->handle_close (handle, to_be_removed_masks); } } @@ -821,7 +918,7 @@ ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void) return 0; } -void +void ACE_WFMO_Reactor_Handler_Repository::dump (void) const { size_t i = 0; @@ -830,7 +927,7 @@ ACE_WFMO_Reactor_Handler_Repository::dump (void) const ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Max size = %d\n"), this->max_size_)); @@ -883,7 +980,7 @@ ACE_WFMO_Reactor_Handler_Repository::dump (void) const ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } - + /************************************************************/ ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh, @@ -1032,7 +1129,7 @@ ACE_WFMO_Reactor::open (size_t size, // Open the notification handler if (this->notify_handler_->open (this, this->timer_queue_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, + ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("opening notify handler ")), -1); @@ -1138,7 +1235,7 @@ int ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, ACE_HANDLE io_handle, ACE_Event_Handler *event_handler, - ACE_Reactor_Mask mask) + ACE_Reactor_Mask new_masks) { // Make sure that the <handle> is valid if (io_handle == ACE_INVALID_HANDLE) @@ -1149,22 +1246,24 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, long new_network_events = 0; int delete_event = 0; - auto_ptr <ACE_Auto_Event> event; - // Look up the repository to see if the <Event_Handler> is already + // Look up the repository to see if the <event_handler> is already // there. - int found = this->handler_rep_.add_network_events_i (mask, - io_handle, - new_network_events, - event_handle, - delete_event); + ACE_Reactor_Mask old_masks; + int found = this->handler_rep_.modify_network_events_i (io_handle, + new_masks, + old_masks, + new_network_events, + event_handle, + delete_event, + ACE_Reactor::ADD_MASK); // Check to see if the user passed us a valid event; If not then we // need to create one if (event_handle == ACE_INVALID_HANDLE) { - event = auto_ptr <ACE_Auto_Event> (new ACE_Auto_Event); + event = auto_ptr<ACE_Auto_Event> (new ACE_Auto_Event); event_handle = event->handle (); delete_event = 1; } @@ -1183,8 +1282,8 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, event_handle, delete_event) != -1) { - // The <Event_Handler was not found in the repository Add to the - // repository. + // The <event_handler> was not found in the repository, add to + // the repository. if (delete_event) event->handle (ACE_INVALID_HANDLE); return 0; @@ -1194,8 +1293,9 @@ ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle, } int -ACE_WFMO_Reactor::schedule_wakeup_i (ACE_HANDLE io_handle, - ACE_Reactor_Mask masks_to_be_added) +ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle, + ACE_Reactor_Mask new_masks, + int operation) { // Make sure that the <handle> is valid if (this->handler_rep_.invalid_handle (io_handle)) @@ -1207,28 +1307,40 @@ ACE_WFMO_Reactor::schedule_wakeup_i (ACE_HANDLE io_handle, // Look up the repository to see if the <Event_Handler> is already // there. - int found = this->handler_rep_.add_network_events_i (masks_to_be_added, - io_handle, - new_network_events, - event_handle, - delete_event); + ACE_Reactor_Mask old_masks; + int found = this->handler_rep_.modify_network_events_i (io_handle, + new_masks, + old_masks, + new_network_events, + event_handle, + delete_event, + operation); if (found) - return ::WSAEventSelect ((SOCKET) io_handle, - event_handle, - new_network_events); + { + int result = ::WSAEventSelect ((SOCKET) io_handle, + event_handle, + new_network_events); + if (result == 0) + return old_masks; + else + return result; + } else return -1; } + int -ACE_WFMO_Reactor_Handler_Repository::add_network_events_i (ACE_Reactor_Mask mask, - ACE_HANDLE io_handle, - long &new_masks, - ACE_HANDLE &event_handle, - int &delete_event) +ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle, + ACE_Reactor_Mask new_masks, + ACE_Reactor_Mask &old_masks, + long &new_network_events, + ACE_HANDLE &event_handle, + int &delete_event, + int operation) { - long *modified_masks = &new_masks; + long *modified_network_events = &new_network_events; int found = 0; size_t i; @@ -1241,7 +1353,7 @@ ACE_WFMO_Reactor_Handler_Repository::add_network_events_i (ACE_Reactor_Mask mask !this->current_info_[i].delete_entry_) { found = 1; - modified_masks = &this->current_info_[i].network_events_; + modified_network_events = &this->current_info_[i].network_events_; delete_event = this->current_info_[i].delete_event_; event_handle = this->current_handles_[i]; } @@ -1255,7 +1367,7 @@ ACE_WFMO_Reactor_Handler_Repository::add_network_events_i (ACE_Reactor_Mask mask !this->current_suspended_info_[i].delete_entry_) { found = 1; - modified_masks = &this->current_suspended_info_[i].network_events_; + modified_network_events = &this->current_suspended_info_[i].network_events_; delete_event = this->current_suspended_info_[i].delete_event_; event_handle = this->current_suspended_info_[i].event_handle_; } @@ -1269,36 +1381,16 @@ ACE_WFMO_Reactor_Handler_Repository::add_network_events_i (ACE_Reactor_Mask mask !this->to_be_added_info_[i].delete_entry_) { found = 1; - modified_masks = &this->to_be_added_info_[i].network_events_; + modified_network_events = &this->to_be_added_info_[i].network_events_; delete_event = this->to_be_added_info_[i].delete_event_; event_handle = this->to_be_added_info_[i].event_handle_; } - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)) - { - ACE_SET_BITS (*modified_masks, FD_READ); - ACE_SET_BITS (*modified_masks, FD_CLOSE); - } - - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK)) - ACE_SET_BITS (*modified_masks, FD_WRITE); - - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)) - ACE_SET_BITS (*modified_masks, FD_OOB); - - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) - ACE_SET_BITS (*modified_masks, FD_ACCEPT); - - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)) - ACE_SET_BITS (*modified_masks, FD_CONNECT); - - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::QOS_MASK)) - ACE_SET_BITS (*modified_masks, FD_QOS); + old_masks = this->bit_ops (*modified_network_events, + new_masks, + operation); - if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::GROUP_QOS_MASK)) - ACE_SET_BITS (*modified_masks, FD_GROUP_QOS); - - new_masks = *modified_masks; + new_network_events = *modified_network_events; return found; } @@ -1410,8 +1502,8 @@ ACE_WFMO_Reactor::poll_remaining_handles (size_t index) { return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - index, this->handler_rep_.handles () + index, - FALSE, - 0); + FALSE, + 0); } int @@ -1508,14 +1600,14 @@ ACE_WFMO_Reactor::dispatch_handles (size_t wait_status) // Dispatch handler if (this->dispatch_handler (dispatch_index, max_handlep1) == -1) return -1; - + // Increment index dispatch_index++; // We're done. if (dispatch_index >= max_handlep1) return number_of_handlers_dispatched; - + // Readjust nCount nCount = max_handlep1 - dispatch_index; @@ -1534,7 +1626,7 @@ ACE_WFMO_Reactor::dispatch_handles (size_t wait_status) } int -ACE_WFMO_Reactor::dispatch_handler (size_t index, +ACE_WFMO_Reactor::dispatch_handler (size_t index, size_t max_handlep1) { // Check if there are window messages that need to be dispatched @@ -1557,7 +1649,7 @@ ACE_WFMO_Reactor::dispatch_handler (size_t index, return this->complex_dispatch_handler (index, event_handle); else return this->simple_dispatch_handler (index, event_handle); - } + } else // The handle was scheduled for deletion, so we will skip it. return 0; @@ -1623,19 +1715,19 @@ ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler, { long actual_events = events.lNetworkEvents; - if ((interested_events & actual_events & FD_READ) && + if ((interested_events & actual_events & FD_READ) && event_handler->handle_input (io_handle) == -1) ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK); - + if ((interested_events & actual_events & FD_CLOSE) && !ACE_BIT_ENABLED (problems, ACE_Event_Handler::READ_MASK) && event_handler->handle_input (io_handle) == -1) ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK); - + if ((interested_events & actual_events & FD_ACCEPT) && event_handler->handle_input (io_handle) == -1) ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK); - + if ((interested_events & actual_events & FD_WRITE) && event_handler->handle_output (io_handle) == -1) ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK); @@ -1739,11 +1831,11 @@ ACE_WFMO_Reactor::dump (void) const ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("Count of currently active threads = %d\n"), this->active_threads_)); - ACE_DEBUG ((LM_DEBUG, + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ID of owner thread = %d\n"), this->owner_)); |