summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp536
1 files changed, 536 insertions, 0 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
new file mode 100644
index 00000000000..54379d8a28c
--- /dev/null
+++ b/ace/Proactor.cpp
@@ -0,0 +1,536 @@
+// Proactor.cpp
+// $Id$
+
+#define ACE_BUILD_DLL
+#include "ace/Proactor.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Proactor.i"
+#endif /* __ACE_INLINE__ */
+
+class ACE_Overlapped_IO : public ACE_OVERLAPPED
+ // = TITLE
+ // A wrapper for Win32 OVERLAPPED.
+ //
+ // = DESCRIPTION
+ // Acts as a magic cookie storing additional state associated
+ // with overlapped I/O operations. ReadFile and WriteFile take
+ // OVERLAPPED, so we pass in Overlapped_IO. OVERLAPPEDs are
+ // returned through GetQueuedCompletionStatus. They are cast
+ // back into Overlapped_IOs to get the handler_ etc.
+{
+public:
+ // = Initialization and termination methods.
+ ACE_Overlapped_IO (ACE_Reactor_Mask mask,
+ ACE_Event_Handler *handler,
+ ACE_Message_Block *message,
+ ACE_Overlapped_File *file,
+ ACE_HANDLE event_handle);
+
+ ~ACE_Overlapped_IO (void);
+ // Death.
+
+ int dispatch (u_long bytes_transfered);
+ // Callback the appropriate handle_* method on handler_.
+
+ int initiate (u_long &bytes_transfered);
+ // Call ReadFile or Writefile.
+
+ operator ACE_OVERLAPPED * (void);
+ // Return this.
+
+ void re_init (void);
+ // Reset the object to be reused. Calls get_message on the handler_
+ // for a new message.
+
+ ACE_Reactor_Mask mask_;
+ // Reading or writing.
+ ACE_Event_Handler *handler_;
+ // The IO handler.
+ ACE_Message_Block *message_;
+ // The current message to send/recv.
+ ACE_Overlapped_File *file_;
+ // The optional file pointer to update.
+
+private:
+ void init (void);
+ // Reset everything.
+};
+
+ACE_Overlapped_IO::ACE_Overlapped_IO (ACE_Reactor_Mask mask,
+ ACE_Event_Handler *handler,
+ ACE_Message_Block *message,
+ ACE_Overlapped_File *file,
+ ACE_HANDLE event_handle)
+ : mask_ (mask),
+ handler_ (handler),
+ message_ (message),
+ file_ (file)
+{
+ this->hEvent = event_handle;
+ this->init ();
+}
+
+void
+ACE_Overlapped_IO::init (void)
+{
+ if (file_ == 0)
+ this->Offset = 0;
+ else
+ this->Offset = file_->offset ();
+
+ this->Internal = 0;
+ this->InternalHigh = 0;
+ this->OffsetHigh = 0;
+}
+
+void
+ACE_Overlapped_IO::re_init (void)
+{
+ this->message_ = this->handler_->get_message ();
+
+ this->init ();
+}
+
+ACE_Overlapped_IO::~ACE_Overlapped_IO (void)
+{
+}
+
+int
+ACE_Overlapped_IO::dispatch (u_long bytes_transfered)
+{
+ if (this->file_ != 0)
+ // Move the file pointer forward.
+ file_->lseek (bytes_transfered, SEEK_CUR);
+
+ if (this->mask_ == ACE_Event_Handler::WRITE_MASK)
+ {
+ // Update the message length to reflect what was sent.
+ this->message_->rd_ptr (bytes_transfered);
+ return handler_->handle_output_complete (this->message_,
+ bytes_transfered);
+ }
+ else // this->mask_ == ACE_Event_Handler::READ_MASK
+ {
+ // Update the message length to reflect what was received.
+ this->message_->wr_ptr (bytes_transfered);
+ return this->handler_->handle_input_complete (this->message_,
+ bytes_transfered);
+ }
+}
+
+// When we port this to use Posix async I/O, these calls will be
+// replace will generic ACE_OS calls.
+
+int
+ACE_Overlapped_IO::initiate (u_long &bytes_transfered)
+{
+#if defined (ACE_WIN32)
+ if (this->mask_ == ACE_Event_Handler::WRITE_MASK)
+ {
+ // Try to write.
+ return ::WriteFile (this->handler_->get_handle (),
+ this->message_->rd_ptr (),
+ this->message_->length (),
+ &bytes_transfered,
+ this);
+ }
+ else
+ {
+ // READ_MASK is set, so try to read.
+ return ::ReadFile (this->handler_->get_handle (),
+ this->message_->wr_ptr (),
+ this->message_->size (),
+ &bytes_transfered,
+ this);
+ }
+#else
+ ACE_NOTSUP_RETURN (-1);
+#endif
+}
+
+ACE_Overlapped_IO::operator ACE_OVERLAPPED * (void)
+{
+ return (ACE_OVERLAPPED *) this;
+}
+
+ACE_Proactor::ACE_Proactor (size_t number_of_threads)
+ : completion_port_ (0),
+ number_of_threads_ (number_of_threads),
+ timer_skew_ (0, ACE_TIMER_SKEW)
+{
+#if defined (ACE_WIN32)
+ // Create an "auto-reset" event to indicate that one or more I/O
+ // overlapped events have completed.
+ this->global_handle_ = ::CreateEvent (NULL, TRUE, FALSE, NULL);
+#endif /* ACE_HAS_WIN32 */
+}
+
+int
+ACE_Proactor::close (void)
+{
+ if (this->completion_port_ != 0)
+ ACE_OS::close (this->completion_port_);
+
+ ACE_OS::close (this->global_handle_);
+ return 0;
+}
+
+int
+ACE_Proactor::handle_signal (int index, siginfo_t *, ucontext_t *)
+{
+ ACE_TRACE ("ACE_Proactor::handle_signal");
+
+ ACE_Time_Value timeout (0, 0);
+
+#if defined (ACE_WIN32)
+ // Reset the handle to a non-signaled state.
+ if (::ResetEvent (global_handle_) == FALSE)
+ ACE_ERROR_RETURN ((LM_ERROR, "ResetEvent failed.\n"), -1);
+#endif /* ACE_HAS_WIN32 */
+
+ // Perform a non-blocking "poll" for all the I/O events that have
+ // completed in the I/O completion queue.
+
+ int result;
+
+ while ((result = this->handle_events (&timeout)) == 1)
+ continue;
+
+ // If our handle_events failed, we'll report a failure to the
+ // ReactorEx.
+ return result == -1 ? -1 : 0;
+}
+
+
+int
+ACE_Proactor::schedule_timer (ACE_Event_Handler *handler,
+ const void *arg,
+ const ACE_Time_Value &delta_time,
+ const ACE_Time_Value &interval)
+{
+ ACE_TRACE ("ACE_Proactor::schedule_timer");
+
+ return this->timer_queue_.schedule
+ (handler, arg, ACE_OS::gettimeofday () + delta_time, interval);
+}
+
+int
+ACE_Proactor::handle_events (ACE_Time_Value *how_long)
+{
+ // Stash the current time.
+ ACE_Time_Value prev_time = ACE_OS::gettimeofday ();
+
+ how_long = timer_queue_.calculate_timeout (how_long);
+
+ ACE_Overlapped_IO *overlapped = 0;
+ u_long bytes_transfered = 0;
+
+#if defined (ACE_WIN32)
+ int error = 0;
+ ACE_HANDLE io_handle = ACE_INVALID_HANDLE;
+
+ // When we port this to use Posix async I/O, this call will be
+ // replace will a generic ACE_OS call.
+ BOOL result;
+
+ result = ::GetQueuedCompletionStatus (this->completion_port_,
+ &bytes_transfered,
+ (u_long *) &io_handle,
+ (ACE_OVERLAPPED **) &overlapped,
+ how_long == 0 ? INFINITE : how_long->msec ());
+
+ // Check for a failed dequeue. Stash the error value.
+ if (result == FALSE && overlapped == 0)
+ error = ::GetLastError ();
+
+ // Check for any timers that can be handled before we dispatch the
+ // dequeued event. Note that this is done irrespective of whether
+ // an error occurred.
+ if (!this->timer_queue_.is_empty ())
+ // Fudge factor accounts for problems with Solaris timers...
+ this->timer_queue_.expire (ACE_OS::gettimeofday () + this->timer_skew_);
+
+ // @@ Need to make sure that if GetQueuedCompletionStatus fails due
+ // to a time out that this information is propagated correctly to
+ // the caller!
+
+ // GetQueued returned because of a error or timer.
+ if (error != 0)
+ {
+ // Compute the time while the Proactor is processing.
+ ACE_Time_Value elapsed_time = ACE_OS::gettimeofday () - prev_time;
+
+ // Update -how_long- to reflect the amount of time since
+ // handle_events was called.
+ if (how_long != 0)
+ {
+ if (*how_long > elapsed_time)
+ *how_long = *how_long - elapsed_time;
+ else
+ *how_long = ACE_Time_Value::zero; // Used all of timeout.
+ }
+
+ // @@ What's the WIN32 constant for 258?!?!?!
+ if (error == 258)
+ // Returning because of timeout.
+ return 0;
+ // Returning because of error.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p GetQueuedCompletionStatus failed errno = %d.\n"
+ "ACE_Proactor::handle_events", error), -1);
+ }
+
+#endif /* ACE_WIN32 */
+
+ // Dequeued a failed or successful operation. Dispatch the
+ // Event_Handler. Note that GetQueuedCompletionStatus returns false
+ // when operations fail, but they still need to be dispatched.
+ // Should we propogate this to the handler somehow? Maybe an extra
+ // failed/succeeded flag in the dispatch call?
+ int dispatch_result = this->dispatch (overlapped, bytes_transfered);
+
+ // Compute the time while the Proactor is processing.
+ ACE_Time_Value elapsed_time = ACE_OS::gettimeofday () - prev_time;
+
+ // Update <how_long> to reflect the amount of time since
+ // <handle_events> was called.
+ if (how_long != 0)
+ {
+ if (*how_long > elapsed_time)
+ *how_long = *how_long - elapsed_time;
+ else
+ *how_long = ACE_Time_Value::zero; // Used all of timeout.
+ }
+
+ // Return -1 (failure), or return 1. Remember that 0 is reserved
+ // for timeouts only, so we have to turn dispatch_results to 1. So,
+ // if this->dispatch() returns a 1 or 0, we return 1. Otherwise,
+ // we return -1.
+ return dispatch_result == -1 ? -1 : 1;
+}
+
+// Returns 0 or 1 on success, -1 on failure.
+int
+ACE_Proactor::dispatch (ACE_Overlapped_IO *overlapped,
+ u_long bytes_transfered)
+{
+ // Call back the Event_Handler and do what it wants based on the
+ // return value.
+ int dispatch_result = overlapped->dispatch (bytes_transfered);
+
+ switch (dispatch_result)
+ {
+ case 1: // Start another operation.
+ // Reinitialize by getting a new message and resetting the
+ // overlapped offset.
+ overlapped->re_init ();
+ return this->initiate (overlapped);
+ case -1: // Handler is closing.
+ overlapped->handler_->
+ handle_close (overlapped->handler_->get_handle (),
+ overlapped->mask_);
+ // Fallthrough.
+ default:
+ // Handler succeeded, but does not want another operation
+ // started.
+ delete overlapped;
+ return 0;
+ }
+}
+
+// Returns 0 or 1 on success, -1 on failure.
+int
+ACE_Proactor::initiate (ACE_Event_Handler *handler,
+ ACE_Reactor_Mask mask,
+ ACE_Message_Block *msg,
+ ACE_Overlapped_File *file)
+{
+ if (msg == 0)
+ msg = handler->get_message ();
+
+ // Create the state for this operation. This object "is-a"
+ // OVERLAPPED structure, and holds other data and methods for this
+ // operation.
+ ACE_Overlapped_IO *overlapped;
+
+ ACE_NEW_RETURN (overlapped,
+ ACE_Overlapped_IO (mask, handler, msg,
+ file, this->global_handle_),
+ -1);
+
+ return this->initiate (overlapped);
+}
+
+// Returns 0 or 1 on success, -1 on failure.
+// Returns 1 when initiate succeeded immediately.
+int
+ACE_Proactor::initiate (ACE_Overlapped_IO *overlapped)
+{
+#if defined (ACE_WIN32)
+ u_long bytes_transfered = 0;
+
+ ACE_HANDLE io_handle = overlapped->handler_->get_handle ();
+ ACE_HANDLE cp;
+
+ cp = ::CreateIoCompletionPort (io_handle,
+ this->completion_port_,
+ (u_long) io_handle,
+ this->number_of_threads_);
+
+ if (cp != 0)
+ // Success.
+ this->completion_port_ = cp;
+ else // Failure.
+ {
+ int error = ::GetLastError ();
+ // If errno == ERROR_INVALID_PARAMETER, then this handle was
+ // already registered.
+ if (error != ERROR_INVALID_PARAMETER)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%p CreateIoCompletionPort failed errno = %d.\n",
+ "ACE_Proactor::initiate", error), -1);
+ }
+
+ // Initiate a WriteFile/ReadFile. If it succeeds, dispatch the
+ // handler.
+ int initiate_result = overlapped->initiate (bytes_transfered);
+
+ if (initiate_result)
+ // Return 1; the OVERLAPPED will still get queued.
+ return 1;
+
+ // If initiate failed, check for a bad error.
+ int err = ::GetLastError ();
+ switch (err)
+ {
+ case ERROR_HANDLE_EOF:
+ case ERROR_NETNAME_DELETED:
+ // The OVERLAPPED will *not* get queued for this case. Thus, we
+ // have to dispatch immediately.
+ return this->dispatch (overlapped, bytes_transfered);
+
+ case ERROR_IO_PENDING:
+ // The IO will complete proactively.
+ return 0;
+ default:
+ // This is a *bad* error.
+ ACE_ERROR ((LM_ERROR, "I/O error %d\n", err));
+ return -1;
+ }
+#else
+ ACE_NOTSUP_RETURN (-1);
+#endif /* ACE_WIN32 */
+}
+
+ACE_Overlapped_File::ACE_Overlapped_File (const ACE_Overlapped_File &file)
+ : offset_ (file.offset ()),
+ handle_ (file.get_handle ()),
+ file_size_ (0),
+ delete_handle_ (0)
+{
+}
+
+ACE_Overlapped_File::ACE_Overlapped_File (void)
+ : offset_ (0),
+ handle_ (ACE_INVALID_HANDLE),
+ file_size_ (0),
+ delete_handle_ (0)
+{
+}
+
+ACE_Overlapped_File::ACE_Overlapped_File (const char *file_name,
+ int mode,
+ int perms)
+ : delete_handle_ (1)
+{
+ this->open (file_name, mode, perms);
+}
+
+ACE_Overlapped_File::~ACE_Overlapped_File (void)
+{
+ this->close ();
+}
+
+void
+ACE_Overlapped_File::close (void)
+{
+ if (this->handle_ != ACE_INVALID_HANDLE
+ && this->delete_handle_ != 0)
+ {
+ ACE_OS::close (this->handle_);
+ this->handle_ = ACE_INVALID_HANDLE;
+ }
+}
+
+int
+ACE_Overlapped_File::open (ACE_HANDLE handle)
+{
+ this->handle_ = handle;
+ this->delete_handle_ = 0;
+
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ return -1;
+ else
+ return 0;
+}
+
+int
+ACE_Overlapped_File::open (const char *file_name,
+ int access,
+ int share,
+ LPSECURITY_ATTRIBUTES security,
+ int creation,
+ int flags,
+ ACE_HANDLE template_file)
+{
+#if defined (ACE_WIN32)
+ if (file_name != 0)
+ this->handle_ = ::CreateFile (file_name, access, share,
+ security, creation, flags,
+ template_file);
+
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ {
+ errno = ENOENT;
+ return -1;
+ }
+ else
+ {
+ this->delete_handle_ = 1;
+ return 0;
+ }
+#else
+ ACE_NOTSUP_RETURN (-1);
+#endif /* ACE_WIN32 */
+}
+
+off_t
+ACE_Overlapped_File::lseek (off_t offset,
+ int whence)
+{
+ switch (whence)
+ {
+ case SEEK_SET:
+ this->offset_ = offset;
+ break;
+ case SEEK_CUR:
+ this->offset_ += offset;
+ break;
+ case SEEK_END:
+ if (handle_ == ACE_INVALID_HANDLE)
+ {
+ errno = ENFILE;
+ return -1;
+ }
+ else
+ this->offset_ = ACE_OS::filesize (handle_) + offset;
+ break;
+ default :
+ errno = EINVAL;
+ ACE_ERROR_RETURN ((LM_ERROR, "ACE_Overlapped_File::lseek"
+ "Invalid whence = %d.\n"), -1);
+ }
+
+ return this->offset_;
+}