summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp574
1 files changed, 0 insertions, 574 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
deleted file mode 100644
index bfa8ddbd2f6..00000000000
--- a/ace/Proactor.cpp
+++ /dev/null
@@ -1,574 +0,0 @@
-// Proactor.cpp
-// $Id$
-
-#define ACE_BUILD_DLL
-#include "ace/Proactor.h"
-
-#if !defined (__ACE_INLINE__)
-#include "ace/Proactor.i"
-#endif /* __ACE_INLINE__ */
-
-class ACE_Overlapped_IO : public ACE_OVERLAPPED
- // = TITLE
- // A wrapper for Win32 OVERLAPPED.
- //
- // = DESCRIPTION
- // Acts as a magic cookie storing additional state associated
- // with overlapped I/O operations. ReadFile and WriteFile take
- // OVERLAPPED, so we pass in Overlapped_IO. OVERLAPPEDs are
- // returned through GetQueuedCompletionStatus. They are cast
- // back into Overlapped_IOs to get the handler_ etc.
-{
-public:
- // = Initialization and termination methods.
- ACE_Overlapped_IO (ACE_Reactor_Mask mask,
- ACE_Event_Handler *handler,
- ACE_Message_Block *message,
- ACE_Overlapped_File *file,
- ACE_HANDLE event_handle);
-
- ~ACE_Overlapped_IO (void);
- // Death.
-
- int dispatch (u_long bytes_transferred);
- // Callback the appropriate handle_* method on handler_.
-
- int initiate (u_long &bytes_transferred);
- // Call ReadFile or Writefile.
-
- operator ACE_OVERLAPPED * (void);
- // Return this.
-
- void re_init (void);
- // Reset the object to be reused. Calls get_message on the handler_
- // for a new message.
-
- ACE_Reactor_Mask mask_;
- // Reading or writing.
- ACE_Event_Handler *handler_;
- // The IO handler.
- ACE_Message_Block *message_;
- // The current message to send/recv.
- ACE_Overlapped_File *file_;
- // The optional file pointer to update.
-
-private:
- void init (void);
- // Reset everything.
-};
-
-ACE_Overlapped_IO::ACE_Overlapped_IO (ACE_Reactor_Mask mask,
- ACE_Event_Handler *handler,
- ACE_Message_Block *message,
- ACE_Overlapped_File *file,
- ACE_HANDLE event_handle)
- : mask_ (mask),
- handler_ (handler),
- message_ (message),
- file_ (file)
-{
- this->hEvent = event_handle;
- this->init ();
-}
-
-void
-ACE_Overlapped_IO::init (void)
-{
- if (file_ == 0)
- this->Offset = 0;
- else
- this->Offset = file_->offset ();
-
- this->Internal = 0;
- this->InternalHigh = 0;
- this->OffsetHigh = 0;
-}
-
-void
-ACE_Overlapped_IO::re_init (void)
-{
- this->message_ = this->handler_->get_message ();
-
- this->init ();
-}
-
-ACE_Overlapped_IO::~ACE_Overlapped_IO (void)
-{
-}
-
-int
-ACE_Overlapped_IO::dispatch (u_long bytes_transferred)
-{
- if (this->file_ != 0)
- // Move the file pointer forward.
- file_->lseek (bytes_transferred, SEEK_CUR);
-
- switch (this->mask_)
- {
- case ACE_Event_Handler::WRITE_MASK :
- // Update the message length to reflect what was sent.
- this->message_->rd_ptr (bytes_transferred);
- return handler_->handle_output_complete (this->message_,
- bytes_transferred);
-
- case ACE_Event_Handler::READ_MASK :
- // Update the message length to reflect what was received.
- this->message_->wr_ptr (bytes_transferred);
- return this->handler_->handle_input_complete (this->message_,
- bytes_transferred);
-
- default:
- return -1;
- }
-}
-
-// When we port this to use Posix async I/O, these calls will be
-// replace will generic ACE_OS calls.
-
-int
-ACE_Overlapped_IO::initiate (u_long &bytes_transferred)
-{
-#if defined (ACE_WIN32)
- switch (this->mask_)
- {
- case ACE_Event_Handler::WRITE_MASK :
- // Try to write.
- return ::WriteFile (this->handler_->get_handle (),
- this->message_->rd_ptr (),
- this->message_->length (),
- &bytes_transferred,
- this);
-
- case ACE_Event_Handler::READ_MASK :
- // READ_MASK is set, so try to read.
- return ::ReadFile (this->handler_->get_handle (),
- this->message_->wr_ptr (),
- this->message_->size (),
- &bytes_transferred,
- this);
- default:
- return -1;
- }
-#else
- bytes_transferred = bytes_transferred;
- ACE_NOTSUP_RETURN (-1);
-#endif
-}
-
-ACE_Overlapped_IO::operator ACE_OVERLAPPED * (void)
-{
- return (ACE_OVERLAPPED *) this;
-}
-
-ACE_Proactor::ACE_Proactor (size_t number_of_threads, ACE_Timer_Queue *tq)
- : timer_queue_ (tq),
- completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
- number_of_threads_ (number_of_threads)
-{
- if (this->timer_queue_ == 0)
- {
- ACE_NEW (this->timer_queue_, ACE_Timer_Queue);
- this->delete_timer_queue_ = 1;
- }
-
-#if defined (ACE_WIN32)
- // Create an IO completion port that is not associated with a file
- // handle. This will allow us to use GetQueuedCompletionStatus as a
- // timer mechanism only.
- ACE_HANDLE cp;
-
- cp = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
- this->completion_port_,
- (u_long) 0, // 0 completion key
- this->number_of_threads_);
-
- if (cp != 0)
- // Success.
- this->completion_port_ = cp;
- else // Failure.
- {
- int error = ACE_OS::last_error ();
- // If errno == ERROR_INVALID_PARAMETER, then this handle was
- // already registered.
- if (error != ERROR_INVALID_PARAMETER)
- ACE_ERROR ((LM_ERROR,
- "%p CreateIoCompletionPort failed errno = %d.\n",
- "ACE_Proactor::initiate", error));
- }
-#endif
-}
-
-ACE_Proactor::~ACE_Proactor (void)
-{
- if (this->delete_timer_queue_)
- delete this->timer_queue_;
-}
-
-int
-ACE_Proactor::close (void)
-{
- if (this->completion_port_ != 0)
- ACE_OS::close (this->completion_port_);
-
- // @@ Should we call shared_event_.remove ()?
- return 0;
-}
-
-int
-ACE_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- ACE_TRACE ("ACE_Proactor::handle_signal");
-
- ACE_Time_Value timeout (0, 0);
-
- // Perform a non-blocking "poll" for all the I/O events that have
- // completed in the I/O completion queue.
-
- int result;
-
- while ((result = this->handle_events (&timeout)) == 1)
- continue;
-
- // If our handle_events failed, we'll report a failure to the
- // ReactorEx.
- return result == -1 ? -1 : 0;
-}
-
-
-int
-ACE_Proactor::schedule_timer (ACE_Event_Handler *handler,
- const void *arg,
- const ACE_Time_Value &delta_time,
- const ACE_Time_Value &interval)
-{
- ACE_TRACE ("ACE_Proactor::schedule_timer");
-
- return this->timer_queue_->schedule
- (handler, arg, ACE_OS::gettimeofday () + delta_time, interval);
-}
-
-#define ACE_TIMEOUT_OCCURRED 258
-
-int
-ACE_Proactor::handle_events (ACE_Time_Value *max_wait_time)
-{
- // Stash the current time -- the destructor of this object will
- // automatically compute how much time elapsed since this method was
- // called.
- ACE_Countdown_Time countdown (max_wait_time);
-
- max_wait_time = timer_queue_->calculate_timeout (max_wait_time);
-
- ACE_Overlapped_IO *overlapped = 0;
- u_long bytes_transferred = 0;
-
- int error = 0;
-#if defined (ACE_WIN32)
- ACE_HANDLE io_handle = ACE_INVALID_HANDLE;
- int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
-
- BOOL result = 0;
-
- // When we port this to use Posix async I/O, this call will be
- // replace will a generic ACE_OS call.
- result = ::GetQueuedCompletionStatus (completion_port_,
- &bytes_transferred,
- (u_long *) &io_handle,
- (ACE_OVERLAPPED **) &overlapped,
- timeout);
-
- // Check for a failed dequeue. This can happen either because
- // of problems with the IO completion port (in which case
- // overlapped == 0) or due to problems with the completion
- // operation (in which case overlapped != 0). In either case,
- // we'll stash the error value so that we can update errno
- // appropriate later on.
- if (result == FALSE)
- error = ACE_OS::last_error ();
-#endif /* ACE_WIN32 */
-
- // Check for any timers that can be handled before we dispatch the
- // dequeued event. Note that this is done irrespective of whether
- // an error occurred.
- this->timer_queue_->expire ();
-
- // @@ Need to make sure that if GetQueuedCompletionStatus fails due
- // to a time out that this information is propagated correctly to
- // the caller!
-
- // GetQueued returned because of a error or timer.
- if (error != 0 && overlapped == 0)
- {
- // @@ What's the WIN32 constant for 258?!?!?!
- if (error == ACE_TIMEOUT_OCCURRED)
- // Returning because of timeout.
- return 0;
- // Returning because of error.
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p GetQueuedCompletionStatus failed errno = %d.\n",
- "ACE_Proactor::handle_events", error), -1);
- }
-
- // Dequeued a failed or successful operation. Dispatch the
- // Event_Handler. Note that GetQueuedCompletionStatus returns false
- // when operations fail, but they still need to be dispatched.
- int dispatch_result = this->dispatch (overlapped, bytes_transferred, error);
-
- // Return -1 (failure), or return 1. Remember that 0 is reserved
- // for timeouts only, so we have to turn dispatch_results to 1. So,
- // if this->dispatch() returns a 1 or 0, we return 1. Otherwise,
- // we return -1.
- return dispatch_result == -1 ? -1 : 1;
-}
-
-// Returns 0 or 1 on success, -1 on failure.
-int
-ACE_Proactor::dispatch (ACE_Overlapped_IO *overlapped,
- u_long bytes_transferred,
- int error)
-{
- // We propagate the error status to the callee by setting errno =
- // error (which is the value returned by ::GetLastError().
- errno = error;
-
- // Call back the Event_Handler and do what it wants based on the
- // return value.
- int dispatch_result = overlapped->dispatch (bytes_transferred);
-
- switch (dispatch_result)
- {
- case 1: // Start another operation.
- // Reinitialize by getting a new message and resetting the
- // overlapped offset.
- overlapped->re_init ();
- return this->initiate (overlapped);
- case -1: // Handler is closing.
- overlapped->handler_->handle_close
- (overlapped->handler_->get_handle (), overlapped->mask_);
- // Fallthrough.
- default:
- // Handler succeeded, but does not want another operation
- // started.
-
- delete overlapped;
- return 0;
- }
-}
-
-// Returns 0 or 1 on success, -1 on failure.
-int
-ACE_Proactor::initiate (ACE_Event_Handler *handler,
- ACE_Reactor_Mask mask,
- ACE_Message_Block *msg,
- ACE_Overlapped_File *file)
-{
- if (msg == 0)
- msg = handler->get_message ();
-
- // Create the state for this operation. This object "is-a"
- // OVERLAPPED structure, and holds other data and methods for this
- // operation.
- ACE_Overlapped_IO *overlapped = 0;
-
- ACE_NEW_RETURN (overlapped,
- ACE_Overlapped_IO (mask, handler, msg,
- file, this->get_handle ()),
- -1);
-
- // Tell the handler that *this* <Proactor> is dispatching it.
- handler->proactor (this);
- return this->initiate (overlapped);
-}
-
-// Returns 0 or 1 on success, -1 on failure.
-// Returns 1 when initiate succeeded immediately.
-int
-ACE_Proactor::initiate (ACE_Overlapped_IO *overlapped)
-{
-#if defined (ACE_WIN32)
- u_long bytes_transferred = 0;
- ACE_HANDLE io_handle = overlapped->handler_->get_handle ();
- ACE_HANDLE cp = 0;
- cp = ::CreateIoCompletionPort (io_handle,
- this->completion_port_,
- (u_long) io_handle,
- this->number_of_threads_);
-
- if (cp != 0)
- // Success.
- this->completion_port_ = cp;
- else // Failure.
- {
- int error = ACE_OS::last_error ();
- // If errno == ERROR_INVALID_PARAMETER, then this handle was
- // already registered.
- if (error != ERROR_INVALID_PARAMETER)
- ACE_ERROR_RETURN ((LM_ERROR,
- "%p CreateIoCompletionPort failed errno = %d.\n",
- "ACE_Proactor::initiate", error), -1);
- }
-
- // Initiate a WriteFile/ReadFile. If it succeeds, dispatch the
- // handler.
- int initiate_result = overlapped->initiate (bytes_transferred);
-
- if (initiate_result)
- // Return 1; the OVERLAPPED will still get queued.
- return 1;
-
- // If initiate failed, check for a bad error.
- int err = ACE_OS::last_error ();
- switch (err)
- {
- case ERROR_HANDLE_EOF:
- case ERROR_NETNAME_DELETED:
- // The OVERLAPPED will *not* get queued for this case. Thus, we
- // have to dispatch immediately.
- return this->dispatch (overlapped, bytes_transferred, err);
-
- case ERROR_IO_PENDING:
- // The IO will complete proactively.
- return 0;
- default:
- // This is a *bad* error.
- ACE_ERROR ((LM_ERROR, "I/O error %d\n", err));
- return -1;
- }
-#else
- overlapped = overlapped;
- ACE_NOTSUP_RETURN (-1);
-#endif /* ACE_WIN32 */
-}
-
-int
-ACE_Proactor::cancel_io (ACE_Event_Handler *handler)
-{
-#if defined (ACE_WIN32) && defined (ACE_HAS_CANCEL_IO)
- return ::CancelIO (handler->get_handle ()) ? -1 : 0;
-#else
- ACE_UNUSED_ARG(handler);
- return 0;
-#endif /* ACE_WIN32 */
-}
-
-// ************************************************************
-// ************************************************************
-
-ACE_Overlapped_File::ACE_Overlapped_File (const ACE_Overlapped_File &file)
- : offset_ (file.offset ()),
- file_size_ (0),
- handle_ (file.get_handle ()),
- delete_handle_ (0)
-{
-}
-
-ACE_Overlapped_File::ACE_Overlapped_File (void)
- : offset_ (0),
- file_size_ (0),
- handle_ (ACE_INVALID_HANDLE),
- delete_handle_ (0)
-{
-}
-
-ACE_Overlapped_File::ACE_Overlapped_File (const char *file_name,
- int mode,
- int perms)
- : delete_handle_ (1)
-{
- this->open (file_name, mode, perms);
-}
-
-ACE_Overlapped_File::~ACE_Overlapped_File (void)
-{
- this->close ();
-}
-
-void
-ACE_Overlapped_File::close (void)
-{
- if (this->handle_ != ACE_INVALID_HANDLE
- && this->delete_handle_ != 0)
- {
- ACE_OS::close (this->handle_);
- this->handle_ = ACE_INVALID_HANDLE;
- }
-}
-
-int
-ACE_Overlapped_File::open (ACE_HANDLE handle)
-{
- this->handle_ = handle;
- this->delete_handle_ = 0;
-
- if (this->handle_ == ACE_INVALID_HANDLE)
- return -1;
- else
- return 0;
-}
-
-int
-ACE_Overlapped_File::open (const char *file_name,
- int access,
- int share,
- LPSECURITY_ATTRIBUTES security,
- int creation,
- int flags,
- ACE_HANDLE template_file)
-{
-#if defined (ACE_WIN32)
- if (file_name != 0)
- this->handle_ = ::CreateFile (file_name, access, share,
- security, creation, flags,
- template_file);
-
- if (this->handle_ == ACE_INVALID_HANDLE)
- {
- errno = ENOENT;
- return -1;
- }
- else
- {
- this->delete_handle_ = 1;
- return 0;
- }
-#else
- file_name = file_name;
- access = access;
- share = share;
- security = security;
- creation = creation;
- flags = flags;
- template_file = template_file;
- ACE_NOTSUP_RETURN (-1);
-#endif /* ACE_WIN32 */
-}
-
-off_t
-ACE_Overlapped_File::lseek (off_t offset,
- int whence)
-{
- switch (whence)
- {
- case SEEK_SET:
- this->offset_ = offset;
- break;
- case SEEK_CUR:
- this->offset_ += offset;
- break;
- case SEEK_END:
- if (handle_ == ACE_INVALID_HANDLE)
- {
- errno = ENFILE;
- return -1;
- }
- else
- this->offset_ = ACE_OS::filesize (handle_) + offset;
- break;
- default :
- errno = EINVAL;
- ACE_ERROR_RETURN ((LM_ERROR, "ACE_Overlapped_File::lseek"
- "Invalid whence = %d.\n"), -1);
- }
-
- return this->offset_;
-}