summaryrefslogtreecommitdiff
path: root/ace/Asynch_IO.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-20 04:39:53 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-20 04:39:53 +0000
commitb4ebb79135ee1f842324272a577e84be193ee1db (patch)
treeed66db1705649c490bdb5c366b610443c5b4c422 /ace/Asynch_IO.cpp
parentfc0a5b99470b3eb66d099105f91a0c61047395c9 (diff)
downloadATCD-b4ebb79135ee1f842324272a577e84be193ee1db.tar.gz
*** empty log message ***
Diffstat (limited to 'ace/Asynch_IO.cpp')
-rw-r--r--ace/Asynch_IO.cpp568
1 files changed, 504 insertions, 64 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 4186dae7e2b..e6619e7b515 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -104,9 +104,6 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
this->handler_ = &handler;
this->handle_ = handle;
- // @@
- ACE_UNUSED_ARG (completion_key);
-
// Grab the handle from the <handler> if <handle> is invalid
if (this->handle_ == ACE_INVALID_HANDLE)
this->handle_ = this->handler_->handle ();
@@ -123,31 +120,93 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
this->proactor_ = ACE_Proactor::instance();
}
-#if !defined (ACE_HAS_AIO_CALLS)
- // Register with the <proactor>
+#if defined (ACE_HAS_AIO_CALLS)
+ // AIO stuff is present. So no registering.
+ // @@ But something has to be done to associate completion key with
+ // the handle provided. How about a HashTable of size "the number of
+ // file descriptors that are possible in the system".
+ // @@
+ ACE_UNUSED_ARG (completion_key);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
+ // Register with the <proactor>.
return this->proactor_->register_handle (this->handle_,
completion_key);
-#else /* ACE_HAS_AIO_CALLS */
- // AIO stuff is present. So no registering.
- return 1;
#endif /* ACE_HAS_AIO_CALLS */
}
int
ACE_Asynch_Operation::cancel (void)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ // @@ aio_cancel will come here soon.
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// All I/O operations that are canceled will complete with the error
// ERROR_OPERATION_ABORTED. All completion notifications for the I/O
// operations will occur normally.
#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020))
return (int) ::CancelIo (this->handle_);
#else
- // @@ Alex, there should be an API for cancelling this stuff on
- // POSIX!
ACE_NOTSUP_RETURN (-1);
#endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020)) */
+#endif /* ACE_HAS_AIO_CALLS */
}
+#if defined (ACE_HAS_AIO_CALLS)
+// If the ptr is o, just check whether there is any slot free and
+// return 0 if yes, else return -1. If a valid ptr is passed, keep it
+// in a free slot.
+int
+ACE_Asynch_Operation::register_aio_with_proactor (aiocb *aiocb_ptr)
+{
+ ACE_DEBUG ((LM_DEBUG, "register_aio_with_proactor\n"));
+ if (aiocb_ptr == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Status check max %d cur %d\n",
+ this->proactor_->aiocb_list_max_size_,
+ this->proactor_->aiocb_list_cur_size_));
+
+ // Just check the status of the list.
+ if (this->proactor_->aiocb_list_cur_size_ >=
+ this->proactor_->aiocb_list_max_size_)
+ return -1;
+ else
+ return 0;
+ }
+
+ // Non-zero ptr. Find a free slot and store.
+
+ // Make sure again.
+ if (this->proactor_->aiocb_list_cur_size_ >=
+ this->proactor_->aiocb_list_max_size_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Asynch_Operation:No space to store the <aio> info.\n"),
+ -1);
+
+ // Slot(s) available. Find a free one.
+ size_t ai;
+ for (ai = 0;
+ ai < this->proactor_->aiocb_list_max_size_;
+ ai++)
+ if (this->proactor_->aiocb_list_[ai] == 0)
+ break;
+
+ // Check again.
+ if (ai == this->proactor_->aiocb_list_max_size_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Asynch_Operation:No space to store the <aio> info.\n"),
+ -1);
+
+ // Store the pointers.
+ this->proactor_->aiocb_list_[ai] = aiocb_ptr;
+ this->proactor_->aiocb_list_cur_size_ ++;
+ return 0;
+}
+#endif /* ACE_HAS_AIO_CALLS */
+
+
// ************************************************************
ACE_Asynch_Read_Stream::ACE_Asynch_Read_Stream (void)
@@ -162,15 +221,18 @@ ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
// Create the Asynch_Result
Result *result = 0;
ACE_NEW_RETURN (result,
- Result (*this->handler_,
- this->handle_,
- message_block,
- bytes_to_read,
- act,
- this->proactor_->get_handle ()),
- -1);
+ Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_read,
+ act,
+ this->proactor_->get_handle ()),
+ -1);
- return this->shared_read (result);
+ ssize_t return_val = this->shared_read (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
}
int
@@ -180,11 +242,30 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result)
// Make a new AIOCB and issue aio_read, if queueing is possible
// store this with the Proactor, so that that can be used for
// <aio_return> and <aio_error>.
- aiocb *aiocb_ptr;
+
+ // Allocate aiocb block.
+ aiocb *aiocb_ptr = 0;
ACE_NEW_RETURN (aiocb_ptr,
aiocb,
-1);
+ // Make sure there is space in the aiocb list.
+ if (this->register_aio_with_proactor (0) == -1)
+ {
+ // No space.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Read_Stream:No space to queue aio_read\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ // @@ Set errno to EAGAIN so that applications will know this as
+ // a queueing failure and try again this aio_read it they want.
+ errno = EAGAIN;
+
+ return -1;
+ }
+
// Setup AIOCB.
// @@ Priority always 0?
// @@ Signal no, always?
@@ -198,20 +279,31 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result)
aiocb_ptr->aio_sigevent.sigev_value.sival_ptr =
(void *) result;
- // Fire off the aio write. @@ Alex, should this be < 0 or -1? In
- // general, please try to use -1 for checking all return values if
- // that's what the manual says will be returned...
- if (aio_read (aiocb_ptr) < 0)
- // Queuing failed.
- ACE_ERROR_RETURN ((LM_ERROR,
- "Queueing faile for aio_read"),
- -1);
+ // Fire off the aio write.
+ if (aio_read (aiocb_ptr) == -1)
+ {
+ // Queueing failed.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Read_Stream:aio_read queueing failed\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ return -1;
+ }
// Success. Store the aiocb_ptr with Proactor.
- if (this->proactor_->insert_to_aiocb_list (aiocb_ptr) < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Fatal error : aio_read queuing suceeded, no place at aiocb_list"),
- -1);
+ if (this->register_aio_with_proactor (aiocb_ptr) == -1)
+ {
+ // Couldnt store the aiocb.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Read_Stream:Fatal error\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ return -1;
+ }
// Aio successfully issued and ptr stored.
return 0;
@@ -315,23 +407,93 @@ ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
{
Result *result = 0;
ACE_NEW_RETURN (result,
- Result (*this->handler_,
- this->handle_,
- message_block,
- bytes_to_write,
- act,
- this->proactor_->get_handle ()),
- -1);
+ Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_write,
+ act,
+ this->proactor_->get_handle ()),
+ -1);
- return this->shared_write (result);
+ ssize_t return_val = this->shared_write (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
}
int
ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result)
{
#if defined (ACE_HAS_AIO_CALLS)
- ACE_UNUSED_ARG (result);
+ // Make a new AIOCB and issue aio_write, if queueing is possible
+ // store this with the Proactor, so that that can be used for
+ // <aio_return> and <aio_error>.
+
+ // Allocate aiocb block.
+ aiocb *aiocb_ptr = 0;
+ ACE_NEW_RETURN (aiocb_ptr,
+ aiocb,
+ -1);
+
+ // Make sure there is space in the aiocb list.
+ if (this->register_aio_with_proactor (0) == -1)
+ {
+ // No space.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Write_Stream:No space to queue aio_read\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ // @@ Set errno to EAGAIN so that applications will know this as
+ // a queueing failure and try again this aio_read it they want.
+ errno = EAGAIN;
+
+ return -1;
+ }
+
+ // Setup AIOCB.
+ // @@ Priority always 0?
+ // @@ Signal no, always?
+ aiocb_ptr->aio_fildes = result->handle ();
+ aiocb_ptr->aio_offset = result->Offset;
+ aiocb_ptr->aio_buf = result->message_block ().rd_ptr ();
+ aiocb_ptr->aio_nbytes = result->bytes_to_write ();
+ aiocb_ptr->aio_reqprio = 0;
+ aiocb_ptr->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ aiocb_ptr->aio_sigevent.sigev_value.sival_ptr =
+ (void *) result;
+
+ // Fire off the aio write.
+ if (aio_write (aiocb_ptr) == -1)
+ {
+ // Queueing failed.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Write_Stream:aio_write queueing failed\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ return -1;
+ }
+
+ // Success. Store the aiocb_ptr with Proactor.
+ if (this->register_aio_with_proactor (aiocb_ptr) == -1)
+ {
+ // Couldnt store the aiocb.
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Write_Stream:Fatal error\n"));
+
+ // Clean up the memory allocated.
+ delete aiocb_ptr;
+
+ return -1;
+ }
+
+ // Aio successfully issued and ptr stored.
return 0;
+
#else /* ACE_HAS_AIO_CALLS */
u_long bytes_written;
@@ -594,7 +756,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
u_long bytes_read;
- // Initiate the accept
+ // Initiate the accept.
int initiate_result = ::AcceptEx ((SOCKET) result->listen_handle (),
(SOCKET) result->accept_handle (),
result->message_block ().wr_ptr (),
@@ -712,16 +874,57 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
const void *act)
{
#if defined (ACE_HAS_AIO_CALLS)
- ACE_UNUSED_ARG (file);
- ACE_UNUSED_ARG (header_and_trailer);
- ACE_UNUSED_ARG (bytes_to_write);
- ACE_UNUSED_ARG (offset);
- ACE_UNUSED_ARG (offset_high);
- ACE_UNUSED_ARG (bytes_per_send);
- ACE_UNUSED_ARG (flags);
- ACE_UNUSED_ARG (act);
+ // Adjust these parameters if there are default values specified.
+ ssize_t file_size = ACE_OS::filesize (file);
+
+ if (file_size == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p) Asynch_Transmit_File:Couldnt know the file size\n"),
+ -1);
+
+ if (bytes_to_write == 0)
+ bytes_to_write = file_size;
+
+ if (offset > (size_t) file_size)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p %t)Asynch_Transmit_File:File size is less than offset\n"),
+ -1);
+
+ if (offset != 0)
+ bytes_to_write = file_size - offset + 1;
+
+ if (bytes_per_send == 0)
+ bytes_per_send = bytes_to_write;
+
+ // Configure the result parameter.
+ Result *result = 0;
+ ACE_NEW_RETURN (result,
+ Result (*this->handler_,
+ this->handle_,
+ file,
+ header_and_trailer,
+ bytes_to_write,
+ offset,
+ offset_high,
+ bytes_per_send,
+ flags,
+ act,
+ this->proactor_->get_handle ()),
+ -1);
+
+ // Make the auxillary handler and initiate transmit.
+ ACE_Asynch_Transmit_Handler *transmit_handler = 0;
+ ACE_NEW_RETURN (transmit_handler,
+ ::ACE_Asynch_Transmit_Handler (result),
+ -1);
+
+ ssize_t return_val = transmit_handler->transmit ();
+ if (return_val == -1)
+ // This deletes the result in it.
+ delete transmit_handler;
+
+ return return_val;
- return 0;
#else /* ACE_HAS_AIO_CALLS */
#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
Result *result = 0;
@@ -955,33 +1158,270 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void)
// If header is valid, set the fields
if (this->header_ != 0)
+ {
+ this->transmit_buffers_.Head = this->header_->rd_ptr ();
+ this->transmit_buffers_.HeadLength = this->header_bytes_;
+ }
+ else
+ {
+ this->transmit_buffers_.Head = 0;
+ this->transmit_buffers_.HeadLength = 0;
+ }
+
+ // If trailer is valid, set the fields
+ if (this->trailer_ != 0)
+ {
+ this->transmit_buffers_.Tail = this->trailer_->rd_ptr ();
+ this->transmit_buffers_.TailLength = this->trailer_bytes_;
+ }
+ else
+ {
+ this->transmit_buffers_.Tail = 0;
+ this->transmit_buffers_.TailLength = 0;
+ }
+
+ // Return the transmit buffers
+ return &this->transmit_buffers_;
+ }
+}
+
+#if defined (ACE_HAS_AIO_CALLS)
+// Constructor.
+ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result)
+ : result_ (result),
+ mb_ (0),
+ header_act_ (0),
+ data_act_ (0),
+ trailer_act_ (0),
+ file_offset_ (result->offset ()),
+ file_size_ (0),
+ bytes_transferred_ (0)
+{
+ // Allocate memory for the message block.
+ ACE_NEW (this->mb_,
+ ACE_Message_Block (this->result_->bytes_per_send ()
+ + 1));
+
+ // Memory for the ACTs.
+ ACE_NEW (this->header_act_,
+ ACT);
+
+ ACE_NEW (this->data_act_,
+ ACT);
+
+ ACE_NEW (this->trailer_act_,
+ ACT);
+
+ *this->header_act_ = this->HEADER_ACT;
+ *this->data_act_ = this->DATA_ACT;
+ *this->trailer_act_ = this->TRAILER_ACT;
+
+ // Init the file size.
+ file_size_ = ACE_OS::filesize (this->result_->file ());
+}
+
+// Destructor.
+ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void)
+{
+ delete result_;
+ delete mb_;
+ delete this->header_act_;
+ delete this->data_act_;
+ delete this->trailer_act_;
+}
+
+// Do the transmission.
+// Initiate transmitting the header. When that completes
+// handle_write_stream will be called, there start transmitting the file.
+int
+ACE_Asynch_Transmit_Handler::transmit (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "Asynch_Transmit_Handler::transmit\n"));
+
+ // Open Asynch_Read_File.
+ if (this->rf_.open (*this, this->result_->file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p %t):ACE_Asynch_Transmit_Handler:read_file open failed\n"),
+ -1);
+
+ // Open Asynch_Write_Stream.
+ if (this->ws_.open (*this, this->result_->socket ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
+ -1);
+
+ // Transmit the header.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
+ this->result_->header_and_trailer ()->header_bytes (),
+ (void *)this->header_act_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p %t):Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
+ -1);
+
+ return 0;
+}
+
+void
+ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "Asynch_Transmit_Handler:handle_write_stream called\n"));
+
+ // Update bytes transferred so far.
+ this->bytes_transferred_ += result.bytes_transferred ();
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ ACE_ERROR ((LM_ERROR, "Asynch_Transmit_File failed.\n"));
+
+ ACE_SEH_TRY
{
- this->transmit_buffers_.Head = this->header_->rd_ptr ();
- this->transmit_buffers_.HeadLength = this->header_bytes_;
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ errno); // Error no.
}
- else
+ ACE_SEH_FINALLY
{
- this->transmit_buffers_.Head = 0;
- this->transmit_buffers_.HeadLength = 0;
+ // This is crucial to prevent memory leaks
+ delete this;
}
+ }
- // If trailer is valid, set the fields
- if (this->trailer_ != 0)
+ // Write stream successful.
+
+ // Partial write to socket.
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ // Reset pointers
+ result.message_block ().rd_ptr (result.bytes_transferred ());
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data,
+ result.act ()) == -1)
{
- this->transmit_buffers_.Tail = this->trailer_->rd_ptr ();
- this->transmit_buffers_.TailLength = this->trailer_bytes_;
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Transmit_Handler:write_stream failed\n"));
+ return;
}
- else
+
+ // @@ Handling *partial write* to a socket.
+ // Let us not continue further before this write
+ // finishes. Because proceeding with another read and then write
+ // might change the order of the file transmission, because
+ // partial write to the stream is always possible.
+ return;
+ }
+
+ // Not a partial write.
+
+ // Check ACT to see what was sent.
+ ACT act = *(ACT *)result.act ();
+ switch (act)
+ {
+ // If it is the "trailer" that is just sent, then transmit
+ // file is complete.
+ case TRAILER_ACT:
+ ACE_SEH_TRY
{
- this->transmit_buffers_.Tail = 0;
- this->transmit_buffers_.TailLength = 0;
+ this->result_->complete (this->bytes_transferred_,
+ 1, // @@ Success.
+ 0, // @@ Completion key.
+ errno); // @@ Errno.
}
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ break;
- // Return the transmit buffers
- return &this->transmit_buffers_;
+ // If header/data was sent, initiate the file data
+ // transmission.
+ case HEADER_ACT:
+ case DATA_ACT:
+ if (initiate_read_file () == -1)
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
+ break;
+
+ default:
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):ACE_Asynch_Transmit_File::Aux:handle_write_stream::Unexpected act\n"));
}
}
+void
+ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ // Failure.
+ if (result.success () == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):Asynch_Transmit_Handler : read from the file failed\n"));
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ errno); // Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ return;
+ }
+
+ // Read successful.
+ if (result.bytes_transferred () == 0)
+ return;
+
+ // Increment offset and write data to network.
+ this->file_offset_ += result.bytes_transferred ();
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred (),
+ (void *) this->data_act_) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "(%p %t):ACE_Asynch_Transmit_File : write to the stream failed\n"));
+ return;
+ }
+}
+
+int
+ACE_Asynch_Transmit_Handler::initiate_read_file (void)
+{
+ // Is there something to read.
+ if (this->file_offset_ >= this->file_size_)
+ {
+ // File is sent. Send the trailer.
+ ACE_DEBUG ((LM_DEBUG,
+ "Trailer %s\n",
+ this->result_->header_and_trailer ()->trailer ()->rd_ptr ()));
+ if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
+ this->result_->header_and_trailer ()->trailer_bytes (),
+ (void *)this->trailer_act_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p %t):Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
+ -1);
+ return 0;
+ }
+ else
+ {
+ // Inititiate an asynchronous read from the file.
+ if (this->rf_.read (*this->mb_,
+ this->mb_->size () - 1,
+ this->file_offset_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p %t) Asynch_Transmit_Handler::read from file failed\n"),
+ -1);
+ return 0;
+ }
+}
+#endif /* ACE_HAS_AIO_CALLS */
+
// ************************************************************
ACE_Handler::~ACE_Handler (void)