// $Id$ // ACE_RCSID(ace, Proactor, "$Id$") #include "ace/WIN32_Proactor.h" #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) // WIN implemenatation of the Proactor. #include "ace/Log_Msg.h" #include "ace/Object_Manager.h" class ACE_Export ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result { // = TITLE // This is result object is used by the of the // ACE_Proactor interface to wake up all the threads blocking // for completions. public: ACE_WIN32_Wakeup_Completion (ACE_Handler &handler, const void *act = 0, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); // Constructor. virtual ~ACE_WIN32_Wakeup_Completion (void); // Destructor. virtual void complete (u_long bytes_transferred = 0, int success = 1, const void *completion_key = 0, u_long error = 0); // This method calls the 's method. }; ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads, int used_with_reactor_event_loop) : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE !!! number_of_threads_ (number_of_threads), used_with_reactor_event_loop_ (used_with_reactor_event_loop) { // Create the completion port. this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, 0, 0, this->number_of_threads_); if (this->completion_port_ == 0) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("CreateIoCompletionPort"))); } ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void) { this->close (); } int ACE_WIN32_Proactor::close (void) { // Close the completion port if (this->completion_port_ != 0) { // To avoid memory leaks we should delete all results from queue. for (;;) { ACE_OVERLAPPED *overlapped = 0; u_long bytes_transferred = 0; u_long completion_key = 0; // Get the next asynchronous operation that completes BOOL res = ::GetQueuedCompletionStatus (this->completion_port_, &bytes_transferred, &completion_key, &overlapped, 0); // poll if (overlapped == 0) break; ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped; delete asynch_result; } int result = ACE_OS::close (this->completion_port_); this->completion_port_ = 0; return result; } return 0; } int ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle, const void *completion_key) { // No locking is needed here as no state changes. ACE_HANDLE cp = ::CreateIoCompletionPort (handle, this->completion_port_, (u_long) completion_key, this->number_of_threads_); if (cp == 0) { ACE_OS::set_errno_to_last_error (); // If errno == ERROR_INVALID_PARAMETER, then this handle was // already registered. if (errno != ERROR_INVALID_PARAMETER) { if (ACE::debug ()) { ACE_DEBUG ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("CreateIoCompletionPort"))); } return -1; } } return 0; } ACE_Asynch_Read_Stream_Impl * ACE_WIN32_Proactor::create_asynch_read_stream (void) { ACE_Asynch_Read_Stream_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_Stream (this), 0); return implementation; } ACE_Asynch_Write_Stream_Impl * ACE_WIN32_Proactor::create_asynch_write_stream (void) { ACE_Asynch_Write_Stream_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_Stream (this), 0); return implementation; } ACE_Asynch_Read_Dgram_Impl * ACE_WIN32_Proactor::create_asynch_read_dgram (void) { ACE_Asynch_Read_Dgram_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_Dgram (this), 0); return implementation; } ACE_Asynch_Write_Dgram_Impl * ACE_WIN32_Proactor::create_asynch_write_dgram (void) { ACE_Asynch_Write_Dgram_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_Dgram (this), 0); return implementation; } ACE_Asynch_Read_File_Impl * ACE_WIN32_Proactor::create_asynch_read_file (void) { ACE_Asynch_Read_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_File (this), 0); return implementation; } ACE_Asynch_Write_File_Impl * ACE_WIN32_Proactor::create_asynch_write_file (void) { ACE_Asynch_Write_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_File (this), 0); return implementation; } ACE_Asynch_Accept_Impl * ACE_WIN32_Proactor::create_asynch_accept (void) { ACE_Asynch_Accept_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Accept (this), 0); return implementation; } ACE_Asynch_Transmit_File_Impl * ACE_WIN32_Proactor::create_asynch_transmit_file (void) { ACE_Asynch_Transmit_File_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Transmit_File (this), 0); return implementation; } ACE_Asynch_Read_Stream_Result_Impl * ACE_WIN32_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Read_Stream_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_Stream_Result (handler, handle, message_block, bytes_to_read, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_Stream_Result_Impl * ACE_WIN32_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Write_Stream_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_Stream_Result (handler, handle, message_block, bytes_to_write, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Read_File_Result_Impl * ACE_WIN32_Proactor::create_asynch_read_file_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Read_File_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_File_Result (handler, handle, message_block, bytes_to_read, act, offset, offset_high, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_File_Result_Impl * ACE_WIN32_Proactor::create_asynch_write_file_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Write_File_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_File_Result (handler, handle, message_block, bytes_to_write, act, offset, offset_high, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Read_Dgram_Result_Impl * ACE_WIN32_Proactor::create_asynch_read_dgram_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_read, int flags, int protocol_family, const void* act, ACE_HANDLE event , int priority , int signal_number) { ACE_Asynch_Read_Dgram_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Read_Dgram_Result (handler, handle, message_block, bytes_to_read, flags, protocol_family, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Write_Dgram_Result_Impl * ACE_WIN32_Proactor::create_asynch_write_dgram_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_read, int flags, const void* act, ACE_HANDLE event , int priority , int signal_number) { ACE_Asynch_Write_Dgram_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Write_Dgram_Result(handler, handle, message_block, bytes_to_read, flags, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Accept_Result_Impl * ACE_WIN32_Proactor::create_asynch_accept_result (ACE_Handler &handler, ACE_HANDLE listen_handle, ACE_HANDLE accept_handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Accept_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Accept_Result (handler, listen_handle, accept_handle, message_block, bytes_to_read, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Transmit_File_Result_Impl * ACE_WIN32_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, ACE_HANDLE file, ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, u_long bytes_to_write, u_long offset, u_long offset_high, u_long bytes_per_send, u_long flags, const void *act, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Transmit_File_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Transmit_File_Result (handler, socket, file, header_and_trailer, bytes_to_write, offset, offset_high, bytes_per_send, flags, act, event, priority, signal_number), 0); return implementation; } ACE_Asynch_Result_Impl * ACE_WIN32_Proactor::create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, int priority, int signal_number) { ACE_Asynch_Result_Impl *implementation = 0; ACE_NEW_RETURN (implementation, ACE_WIN32_Asynch_Timer (handler, act, tv, event, priority, signal_number), 0); return implementation; } int ACE_WIN32_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) { // Perform a non-blocking "poll" for all the I/O events that have // completed in the I/O completion queue. int result = 0; for (ACE_Time_Value timeout (0, 0); ; ) { result = this->handle_events (timeout); if (result != 0 || errno == ETIME) break; } // If our handle_events failed, we'll report a failure to the // Reactor. return result == -1 ? -1 : 0; } int ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { ACE_UNUSED_ARG (close_mask); ACE_UNUSED_ARG (handle); return this->close (); } ACE_HANDLE ACE_WIN32_Proactor::get_handle (void) const { if (this->used_with_reactor_event_loop_) return this->event_.handle (); else return 0; } int ACE_WIN32_Proactor::handle_events (ACE_Time_Value &wait_time) { // Decrement with the amount of time spent in the method ACE_Countdown_Time countdown (&wait_time); return this->handle_events (wait_time.msec ()); } int ACE_WIN32_Proactor::handle_events (void) { return this->handle_events (ACE_INFINITE); } int ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds) { ACE_OVERLAPPED *overlapped = 0; u_long bytes_transferred = 0; u_long completion_key = 0; // Get the next asynchronous operation that completes BOOL result = ::GetQueuedCompletionStatus (this->completion_port_, &bytes_transferred, &completion_key, &overlapped, milli_seconds); if (result == FALSE && overlapped == 0) { ACE_OS::set_errno_to_last_error (); if (errno == WAIT_TIMEOUT) { errno = ETIME; return 0; } else { if (ACE::debug ()) { ACE_DEBUG ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("GetQueuedCompletionStatus"))); } return -1; } } else { // Narrow the result. ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped; // If errors happen, grab the error. if (result == FALSE) ACE_OS::set_errno_to_last_error (); else errno = 0; u_long result_err = asynch_result->error (); // if "result_err" is 0 than // It is normal OS/WIN32 AIO completion. // We have cleared asynch_result->error_ // during shared_read/shared_write. // The real error code is already stored in "errno", // so copy "errno" value to the "result_err" // and pass this "result_err" code // to the application_specific_code () // else // "result_err" non zero // it means we have "post_completed" result // so pass this "result_err" code // to the application_specific_code () if ( result_err == 0 ) result_err = errno ; this->application_specific_code (asynch_result, bytes_transferred, result, (void *) completion_key, result_err); } return 1; } void ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result *asynch_result, u_long bytes_transferred, int success, const void *completion_key, u_long error) { ACE_SEH_TRY { // Call completion hook asynch_result->complete (bytes_transferred, success, (void *) completion_key, error); } ACE_SEH_FINALLY { // This is crucial to prevent memory leaks delete asynch_result; } } int ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result) { // Grab the event associated with the Proactor HANDLE handle = this->get_handle (); // If Proactor event is valid, signal it if (handle != ACE_INVALID_HANDLE && handle != 0) ACE_OS::event_signal (&handle); // pass // bytes_transferred // completion_key // to the ::PostQueuedCompletionStatus() // error will be extracted later in handle_events() u_long bytes_transferred = 0; const void * completion_key = 0 ; if ( result != 0 ) { bytes_transferred = result->bytes_transferred (); completion_key = result->completion_key(); } // Post a completion if (::PostQueuedCompletionStatus (this->completion_port_, // completion port bytes_transferred, // number of bytes transferred (ULONG) completion_key, // completion key result // overlapped ) == FALSE) { delete result; if (ACE::debug ()) { ACE_DEBUG ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("PostQueuedCompletionStatus failed"))); } return -1; } return 0; } int ACE_WIN32_Proactor::post_wakeup_completions (int how_many) { ACE_WIN32_Wakeup_Completion *wakeup_completion = 0; for (ssize_t ci = 0; ci < how_many; ci++) { ACE_NEW_RETURN (wakeup_completion, ACE_WIN32_Wakeup_Completion (this->wakeup_handler_), -1); if (wakeup_completion->post_completion (this) == -1) return -1; } return 0; } int ACE_WIN32_Proactor::wake_up_dispatch_threads (void) { return 0; } int ACE_WIN32_Proactor::close_dispatch_threads (int) { return 0; } size_t ACE_WIN32_Proactor::number_of_threads (void) const { return this->number_of_threads_; } void ACE_WIN32_Proactor::number_of_threads (size_t threads) { this->number_of_threads_ = threads; } ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, int priority, int signal_number) : ACE_Asynch_Result_Impl (), ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number), time_ (tv) { } void ACE_WIN32_Asynch_Timer::complete (u_long bytes_transferred, int success, const void *completion_key, u_long error) { ACE_UNUSED_ARG (error); ACE_UNUSED_ARG (completion_key); ACE_UNUSED_ARG (success); ACE_UNUSED_ARG (bytes_transferred); this->handler_.handle_time_out (this->time_, this->act ()); } ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion (ACE_Handler &handler, const void *act, ACE_HANDLE event, int priority, int signal_number) : ACE_Asynch_Result_Impl (), ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) { } ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void) { } void ACE_WIN32_Wakeup_Completion::complete (u_long /* bytes_transferred */, int /* success */, const void * /* completion_key */, u_long /* error */) { this->handler_.handle_wakeup (); } #endif /* ACE_WIN32 */