diff options
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r-- | ace/Proactor.cpp | 574 |
1 files changed, 0 insertions, 574 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp deleted file mode 100644 index bfa8ddbd2f6..00000000000 --- a/ace/Proactor.cpp +++ /dev/null @@ -1,574 +0,0 @@ -// Proactor.cpp -// $Id$ - -#define ACE_BUILD_DLL -#include "ace/Proactor.h" - -#if !defined (__ACE_INLINE__) -#include "ace/Proactor.i" -#endif /* __ACE_INLINE__ */ - -class ACE_Overlapped_IO : public ACE_OVERLAPPED - // = TITLE - // A wrapper for Win32 OVERLAPPED. - // - // = DESCRIPTION - // Acts as a magic cookie storing additional state associated - // with overlapped I/O operations. ReadFile and WriteFile take - // OVERLAPPED, so we pass in Overlapped_IO. OVERLAPPEDs are - // returned through GetQueuedCompletionStatus. They are cast - // back into Overlapped_IOs to get the handler_ etc. -{ -public: - // = Initialization and termination methods. - ACE_Overlapped_IO (ACE_Reactor_Mask mask, - ACE_Event_Handler *handler, - ACE_Message_Block *message, - ACE_Overlapped_File *file, - ACE_HANDLE event_handle); - - ~ACE_Overlapped_IO (void); - // Death. - - int dispatch (u_long bytes_transferred); - // Callback the appropriate handle_* method on handler_. - - int initiate (u_long &bytes_transferred); - // Call ReadFile or Writefile. - - operator ACE_OVERLAPPED * (void); - // Return this. - - void re_init (void); - // Reset the object to be reused. Calls get_message on the handler_ - // for a new message. - - ACE_Reactor_Mask mask_; - // Reading or writing. - ACE_Event_Handler *handler_; - // The IO handler. - ACE_Message_Block *message_; - // The current message to send/recv. - ACE_Overlapped_File *file_; - // The optional file pointer to update. - -private: - void init (void); - // Reset everything. -}; - -ACE_Overlapped_IO::ACE_Overlapped_IO (ACE_Reactor_Mask mask, - ACE_Event_Handler *handler, - ACE_Message_Block *message, - ACE_Overlapped_File *file, - ACE_HANDLE event_handle) - : mask_ (mask), - handler_ (handler), - message_ (message), - file_ (file) -{ - this->hEvent = event_handle; - this->init (); -} - -void -ACE_Overlapped_IO::init (void) -{ - if (file_ == 0) - this->Offset = 0; - else - this->Offset = file_->offset (); - - this->Internal = 0; - this->InternalHigh = 0; - this->OffsetHigh = 0; -} - -void -ACE_Overlapped_IO::re_init (void) -{ - this->message_ = this->handler_->get_message (); - - this->init (); -} - -ACE_Overlapped_IO::~ACE_Overlapped_IO (void) -{ -} - -int -ACE_Overlapped_IO::dispatch (u_long bytes_transferred) -{ - if (this->file_ != 0) - // Move the file pointer forward. - file_->lseek (bytes_transferred, SEEK_CUR); - - switch (this->mask_) - { - case ACE_Event_Handler::WRITE_MASK : - // Update the message length to reflect what was sent. - this->message_->rd_ptr (bytes_transferred); - return handler_->handle_output_complete (this->message_, - bytes_transferred); - - case ACE_Event_Handler::READ_MASK : - // Update the message length to reflect what was received. - this->message_->wr_ptr (bytes_transferred); - return this->handler_->handle_input_complete (this->message_, - bytes_transferred); - - default: - return -1; - } -} - -// When we port this to use Posix async I/O, these calls will be -// replace will generic ACE_OS calls. - -int -ACE_Overlapped_IO::initiate (u_long &bytes_transferred) -{ -#if defined (ACE_WIN32) - switch (this->mask_) - { - case ACE_Event_Handler::WRITE_MASK : - // Try to write. - return ::WriteFile (this->handler_->get_handle (), - this->message_->rd_ptr (), - this->message_->length (), - &bytes_transferred, - this); - - case ACE_Event_Handler::READ_MASK : - // READ_MASK is set, so try to read. - return ::ReadFile (this->handler_->get_handle (), - this->message_->wr_ptr (), - this->message_->size (), - &bytes_transferred, - this); - default: - return -1; - } -#else - bytes_transferred = bytes_transferred; - ACE_NOTSUP_RETURN (-1); -#endif -} - -ACE_Overlapped_IO::operator ACE_OVERLAPPED * (void) -{ - return (ACE_OVERLAPPED *) this; -} - -ACE_Proactor::ACE_Proactor (size_t number_of_threads, ACE_Timer_Queue *tq) - : timer_queue_ (tq), - completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! - number_of_threads_ (number_of_threads) -{ - if (this->timer_queue_ == 0) - { - ACE_NEW (this->timer_queue_, ACE_Timer_Queue); - this->delete_timer_queue_ = 1; - } - -#if defined (ACE_WIN32) - // Create an IO completion port that is not associated with a file - // handle. This will allow us to use GetQueuedCompletionStatus as a - // timer mechanism only. - ACE_HANDLE cp; - - cp = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, - this->completion_port_, - (u_long) 0, // 0 completion key - this->number_of_threads_); - - if (cp != 0) - // Success. - this->completion_port_ = cp; - else // Failure. - { - int error = ACE_OS::last_error (); - // If errno == ERROR_INVALID_PARAMETER, then this handle was - // already registered. - if (error != ERROR_INVALID_PARAMETER) - ACE_ERROR ((LM_ERROR, - "%p CreateIoCompletionPort failed errno = %d.\n", - "ACE_Proactor::initiate", error)); - } -#endif -} - -ACE_Proactor::~ACE_Proactor (void) -{ - if (this->delete_timer_queue_) - delete this->timer_queue_; -} - -int -ACE_Proactor::close (void) -{ - if (this->completion_port_ != 0) - ACE_OS::close (this->completion_port_); - - // @@ Should we call shared_event_.remove ()? - return 0; -} - -int -ACE_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_TRACE ("ACE_Proactor::handle_signal"); - - ACE_Time_Value timeout (0, 0); - - // Perform a non-blocking "poll" for all the I/O events that have - // completed in the I/O completion queue. - - int result; - - while ((result = this->handle_events (&timeout)) == 1) - continue; - - // If our handle_events failed, we'll report a failure to the - // ReactorEx. - return result == -1 ? -1 : 0; -} - - -int -ACE_Proactor::schedule_timer (ACE_Event_Handler *handler, - const void *arg, - const ACE_Time_Value &delta_time, - const ACE_Time_Value &interval) -{ - ACE_TRACE ("ACE_Proactor::schedule_timer"); - - return this->timer_queue_->schedule - (handler, arg, ACE_OS::gettimeofday () + delta_time, interval); -} - -#define ACE_TIMEOUT_OCCURRED 258 - -int -ACE_Proactor::handle_events (ACE_Time_Value *max_wait_time) -{ - // Stash the current time -- the destructor of this object will - // automatically compute how much time elapsed since this method was - // called. - ACE_Countdown_Time countdown (max_wait_time); - - max_wait_time = timer_queue_->calculate_timeout (max_wait_time); - - ACE_Overlapped_IO *overlapped = 0; - u_long bytes_transferred = 0; - - int error = 0; -#if defined (ACE_WIN32) - ACE_HANDLE io_handle = ACE_INVALID_HANDLE; - int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec (); - - BOOL result = 0; - - // When we port this to use Posix async I/O, this call will be - // replace will a generic ACE_OS call. - result = ::GetQueuedCompletionStatus (completion_port_, - &bytes_transferred, - (u_long *) &io_handle, - (ACE_OVERLAPPED **) &overlapped, - timeout); - - // Check for a failed dequeue. This can happen either because - // of problems with the IO completion port (in which case - // overlapped == 0) or due to problems with the completion - // operation (in which case overlapped != 0). In either case, - // we'll stash the error value so that we can update errno - // appropriate later on. - if (result == FALSE) - error = ACE_OS::last_error (); -#endif /* ACE_WIN32 */ - - // Check for any timers that can be handled before we dispatch the - // dequeued event. Note that this is done irrespective of whether - // an error occurred. - this->timer_queue_->expire (); - - // @@ Need to make sure that if GetQueuedCompletionStatus fails due - // to a time out that this information is propagated correctly to - // the caller! - - // GetQueued returned because of a error or timer. - if (error != 0 && overlapped == 0) - { - // @@ What's the WIN32 constant for 258?!?!?! - if (error == ACE_TIMEOUT_OCCURRED) - // Returning because of timeout. - return 0; - // Returning because of error. - ACE_ERROR_RETURN ((LM_ERROR, - "%p GetQueuedCompletionStatus failed errno = %d.\n", - "ACE_Proactor::handle_events", error), -1); - } - - // Dequeued a failed or successful operation. Dispatch the - // Event_Handler. Note that GetQueuedCompletionStatus returns false - // when operations fail, but they still need to be dispatched. - int dispatch_result = this->dispatch (overlapped, bytes_transferred, error); - - // Return -1 (failure), or return 1. Remember that 0 is reserved - // for timeouts only, so we have to turn dispatch_results to 1. So, - // if this->dispatch() returns a 1 or 0, we return 1. Otherwise, - // we return -1. - return dispatch_result == -1 ? -1 : 1; -} - -// Returns 0 or 1 on success, -1 on failure. -int -ACE_Proactor::dispatch (ACE_Overlapped_IO *overlapped, - u_long bytes_transferred, - int error) -{ - // We propagate the error status to the callee by setting errno = - // error (which is the value returned by ::GetLastError(). - errno = error; - - // Call back the Event_Handler and do what it wants based on the - // return value. - int dispatch_result = overlapped->dispatch (bytes_transferred); - - switch (dispatch_result) - { - case 1: // Start another operation. - // Reinitialize by getting a new message and resetting the - // overlapped offset. - overlapped->re_init (); - return this->initiate (overlapped); - case -1: // Handler is closing. - overlapped->handler_->handle_close - (overlapped->handler_->get_handle (), overlapped->mask_); - // Fallthrough. - default: - // Handler succeeded, but does not want another operation - // started. - - delete overlapped; - return 0; - } -} - -// Returns 0 or 1 on success, -1 on failure. -int -ACE_Proactor::initiate (ACE_Event_Handler *handler, - ACE_Reactor_Mask mask, - ACE_Message_Block *msg, - ACE_Overlapped_File *file) -{ - if (msg == 0) - msg = handler->get_message (); - - // Create the state for this operation. This object "is-a" - // OVERLAPPED structure, and holds other data and methods for this - // operation. - ACE_Overlapped_IO *overlapped = 0; - - ACE_NEW_RETURN (overlapped, - ACE_Overlapped_IO (mask, handler, msg, - file, this->get_handle ()), - -1); - - // Tell the handler that *this* <Proactor> is dispatching it. - handler->proactor (this); - return this->initiate (overlapped); -} - -// Returns 0 or 1 on success, -1 on failure. -// Returns 1 when initiate succeeded immediately. -int -ACE_Proactor::initiate (ACE_Overlapped_IO *overlapped) -{ -#if defined (ACE_WIN32) - u_long bytes_transferred = 0; - ACE_HANDLE io_handle = overlapped->handler_->get_handle (); - ACE_HANDLE cp = 0; - cp = ::CreateIoCompletionPort (io_handle, - this->completion_port_, - (u_long) io_handle, - this->number_of_threads_); - - if (cp != 0) - // Success. - this->completion_port_ = cp; - else // Failure. - { - int error = ACE_OS::last_error (); - // If errno == ERROR_INVALID_PARAMETER, then this handle was - // already registered. - if (error != ERROR_INVALID_PARAMETER) - ACE_ERROR_RETURN ((LM_ERROR, - "%p CreateIoCompletionPort failed errno = %d.\n", - "ACE_Proactor::initiate", error), -1); - } - - // Initiate a WriteFile/ReadFile. If it succeeds, dispatch the - // handler. - int initiate_result = overlapped->initiate (bytes_transferred); - - if (initiate_result) - // Return 1; the OVERLAPPED will still get queued. - return 1; - - // If initiate failed, check for a bad error. - int err = ACE_OS::last_error (); - switch (err) - { - case ERROR_HANDLE_EOF: - case ERROR_NETNAME_DELETED: - // The OVERLAPPED will *not* get queued for this case. Thus, we - // have to dispatch immediately. - return this->dispatch (overlapped, bytes_transferred, err); - - case ERROR_IO_PENDING: - // The IO will complete proactively. - return 0; - default: - // This is a *bad* error. - ACE_ERROR ((LM_ERROR, "I/O error %d\n", err)); - return -1; - } -#else - overlapped = overlapped; - ACE_NOTSUP_RETURN (-1); -#endif /* ACE_WIN32 */ -} - -int -ACE_Proactor::cancel_io (ACE_Event_Handler *handler) -{ -#if defined (ACE_WIN32) && defined (ACE_HAS_CANCEL_IO) - return ::CancelIO (handler->get_handle ()) ? -1 : 0; -#else - ACE_UNUSED_ARG(handler); - return 0; -#endif /* ACE_WIN32 */ -} - -// ************************************************************ -// ************************************************************ - -ACE_Overlapped_File::ACE_Overlapped_File (const ACE_Overlapped_File &file) - : offset_ (file.offset ()), - file_size_ (0), - handle_ (file.get_handle ()), - delete_handle_ (0) -{ -} - -ACE_Overlapped_File::ACE_Overlapped_File (void) - : offset_ (0), - file_size_ (0), - handle_ (ACE_INVALID_HANDLE), - delete_handle_ (0) -{ -} - -ACE_Overlapped_File::ACE_Overlapped_File (const char *file_name, - int mode, - int perms) - : delete_handle_ (1) -{ - this->open (file_name, mode, perms); -} - -ACE_Overlapped_File::~ACE_Overlapped_File (void) -{ - this->close (); -} - -void -ACE_Overlapped_File::close (void) -{ - if (this->handle_ != ACE_INVALID_HANDLE - && this->delete_handle_ != 0) - { - ACE_OS::close (this->handle_); - this->handle_ = ACE_INVALID_HANDLE; - } -} - -int -ACE_Overlapped_File::open (ACE_HANDLE handle) -{ - this->handle_ = handle; - this->delete_handle_ = 0; - - if (this->handle_ == ACE_INVALID_HANDLE) - return -1; - else - return 0; -} - -int -ACE_Overlapped_File::open (const char *file_name, - int access, - int share, - LPSECURITY_ATTRIBUTES security, - int creation, - int flags, - ACE_HANDLE template_file) -{ -#if defined (ACE_WIN32) - if (file_name != 0) - this->handle_ = ::CreateFile (file_name, access, share, - security, creation, flags, - template_file); - - if (this->handle_ == ACE_INVALID_HANDLE) - { - errno = ENOENT; - return -1; - } - else - { - this->delete_handle_ = 1; - return 0; - } -#else - file_name = file_name; - access = access; - share = share; - security = security; - creation = creation; - flags = flags; - template_file = template_file; - ACE_NOTSUP_RETURN (-1); -#endif /* ACE_WIN32 */ -} - -off_t -ACE_Overlapped_File::lseek (off_t offset, - int whence) -{ - switch (whence) - { - case SEEK_SET: - this->offset_ = offset; - break; - case SEEK_CUR: - this->offset_ += offset; - break; - case SEEK_END: - if (handle_ == ACE_INVALID_HANDLE) - { - errno = ENFILE; - return -1; - } - else - this->offset_ = ACE_OS::filesize (handle_) + offset; - break; - default : - errno = EINVAL; - ACE_ERROR_RETURN ((LM_ERROR, "ACE_Overlapped_File::lseek" - "Invalid whence = %d.\n"), -1); - } - - return this->offset_; -} |