diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-20 04:39:53 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-20 04:39:53 +0000 |
commit | b4ebb79135ee1f842324272a577e84be193ee1db (patch) | |
tree | ed66db1705649c490bdb5c366b610443c5b4c422 /ace/Asynch_IO.cpp | |
parent | fc0a5b99470b3eb66d099105f91a0c61047395c9 (diff) | |
download | ATCD-b4ebb79135ee1f842324272a577e84be193ee1db.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Asynch_IO.cpp')
-rw-r--r-- | ace/Asynch_IO.cpp | 568 |
1 files changed, 504 insertions, 64 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 4186dae7e2b..e6619e7b515 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -104,9 +104,6 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, this->handler_ = &handler; this->handle_ = handle; - // @@ - ACE_UNUSED_ARG (completion_key); - // Grab the handle from the <handler> if <handle> is invalid if (this->handle_ == ACE_INVALID_HANDLE) this->handle_ = this->handler_->handle (); @@ -123,31 +120,93 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, this->proactor_ = ACE_Proactor::instance(); } -#if !defined (ACE_HAS_AIO_CALLS) - // Register with the <proactor> +#if defined (ACE_HAS_AIO_CALLS) + // AIO stuff is present. So no registering. + // @@ But something has to be done to associate completion key with + // the handle provided. How about a HashTable of size "the number of + // file descriptors that are possible in the system". + // @@ + ACE_UNUSED_ARG (completion_key); + return 0; +#else /* ACE_HAS_AIO_CALLS */ + // Register with the <proactor>. return this->proactor_->register_handle (this->handle_, completion_key); -#else /* ACE_HAS_AIO_CALLS */ - // AIO stuff is present. So no registering. - return 1; #endif /* ACE_HAS_AIO_CALLS */ } int ACE_Asynch_Operation::cancel (void) { +#if defined (ACE_HAS_AIO_CALLS) + // @@ aio_cancel will come here soon. + return 0; +#else /* ACE_HAS_AIO_CALLS */ // All I/O operations that are canceled will complete with the error // ERROR_OPERATION_ABORTED. All completion notifications for the I/O // operations will occur normally. #if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020)) return (int) ::CancelIo (this->handle_); #else - // @@ Alex, there should be an API for cancelling this stuff on - // POSIX! ACE_NOTSUP_RETURN (-1); #endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020)) */ +#endif /* ACE_HAS_AIO_CALLS */ } +#if defined (ACE_HAS_AIO_CALLS) +// If the ptr is o, just check whether there is any slot free and +// return 0 if yes, else return -1. If a valid ptr is passed, keep it +// in a free slot. +int +ACE_Asynch_Operation::register_aio_with_proactor (aiocb *aiocb_ptr) +{ + ACE_DEBUG ((LM_DEBUG, "register_aio_with_proactor\n")); + if (aiocb_ptr == 0) + { + ACE_DEBUG ((LM_DEBUG, + "Status check max %d cur %d\n", + this->proactor_->aiocb_list_max_size_, + this->proactor_->aiocb_list_cur_size_)); + + // Just check the status of the list. + if (this->proactor_->aiocb_list_cur_size_ >= + this->proactor_->aiocb_list_max_size_) + return -1; + else + return 0; + } + + // Non-zero ptr. Find a free slot and store. + + // Make sure again. + if (this->proactor_->aiocb_list_cur_size_ >= + this->proactor_->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Asynch_Operation:No space to store the <aio> info.\n"), + -1); + + // Slot(s) available. Find a free one. + size_t ai; + for (ai = 0; + ai < this->proactor_->aiocb_list_max_size_; + ai++) + if (this->proactor_->aiocb_list_[ai] == 0) + break; + + // Check again. + if (ai == this->proactor_->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Asynch_Operation:No space to store the <aio> info.\n"), + -1); + + // Store the pointers. + this->proactor_->aiocb_list_[ai] = aiocb_ptr; + this->proactor_->aiocb_list_cur_size_ ++; + return 0; +} +#endif /* ACE_HAS_AIO_CALLS */ + + // ************************************************************ ACE_Asynch_Read_Stream::ACE_Asynch_Read_Stream (void) @@ -162,15 +221,18 @@ ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block, // Create the Asynch_Result Result *result = 0; ACE_NEW_RETURN (result, - Result (*this->handler_, - this->handle_, - message_block, - bytes_to_read, - act, - this->proactor_->get_handle ()), - -1); + Result (*this->handler_, + this->handle_, + message_block, + bytes_to_read, + act, + this->proactor_->get_handle ()), + -1); - return this->shared_read (result); + ssize_t return_val = this->shared_read (result); + if (return_val == -1) + delete result; + return return_val; } int @@ -180,11 +242,30 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result) // Make a new AIOCB and issue aio_read, if queueing is possible // store this with the Proactor, so that that can be used for // <aio_return> and <aio_error>. - aiocb *aiocb_ptr; + + // Allocate aiocb block. + aiocb *aiocb_ptr = 0; ACE_NEW_RETURN (aiocb_ptr, aiocb, -1); + // Make sure there is space in the aiocb list. + if (this->register_aio_with_proactor (0) == -1) + { + // No space. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Read_Stream:No space to queue aio_read\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + // @@ Set errno to EAGAIN so that applications will know this as + // a queueing failure and try again this aio_read it they want. + errno = EAGAIN; + + return -1; + } + // Setup AIOCB. // @@ Priority always 0? // @@ Signal no, always? @@ -198,20 +279,31 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result) aiocb_ptr->aio_sigevent.sigev_value.sival_ptr = (void *) result; - // Fire off the aio write. @@ Alex, should this be < 0 or -1? In - // general, please try to use -1 for checking all return values if - // that's what the manual says will be returned... - if (aio_read (aiocb_ptr) < 0) - // Queuing failed. - ACE_ERROR_RETURN ((LM_ERROR, - "Queueing faile for aio_read"), - -1); + // Fire off the aio write. + if (aio_read (aiocb_ptr) == -1) + { + // Queueing failed. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Read_Stream:aio_read queueing failed\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + return -1; + } // Success. Store the aiocb_ptr with Proactor. - if (this->proactor_->insert_to_aiocb_list (aiocb_ptr) < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Fatal error : aio_read queuing suceeded, no place at aiocb_list"), - -1); + if (this->register_aio_with_proactor (aiocb_ptr) == -1) + { + // Couldnt store the aiocb. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Read_Stream:Fatal error\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + return -1; + } // Aio successfully issued and ptr stored. return 0; @@ -315,23 +407,93 @@ ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block, { Result *result = 0; ACE_NEW_RETURN (result, - Result (*this->handler_, - this->handle_, - message_block, - bytes_to_write, - act, - this->proactor_->get_handle ()), - -1); + Result (*this->handler_, + this->handle_, + message_block, + bytes_to_write, + act, + this->proactor_->get_handle ()), + -1); - return this->shared_write (result); + ssize_t return_val = this->shared_write (result); + if (return_val == -1) + delete result; + return return_val; } int ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result) { #if defined (ACE_HAS_AIO_CALLS) - ACE_UNUSED_ARG (result); + // Make a new AIOCB and issue aio_write, if queueing is possible + // store this with the Proactor, so that that can be used for + // <aio_return> and <aio_error>. + + // Allocate aiocb block. + aiocb *aiocb_ptr = 0; + ACE_NEW_RETURN (aiocb_ptr, + aiocb, + -1); + + // Make sure there is space in the aiocb list. + if (this->register_aio_with_proactor (0) == -1) + { + // No space. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Write_Stream:No space to queue aio_read\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + // @@ Set errno to EAGAIN so that applications will know this as + // a queueing failure and try again this aio_read it they want. + errno = EAGAIN; + + return -1; + } + + // Setup AIOCB. + // @@ Priority always 0? + // @@ Signal no, always? + aiocb_ptr->aio_fildes = result->handle (); + aiocb_ptr->aio_offset = result->Offset; + aiocb_ptr->aio_buf = result->message_block ().rd_ptr (); + aiocb_ptr->aio_nbytes = result->bytes_to_write (); + aiocb_ptr->aio_reqprio = 0; + aiocb_ptr->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + aiocb_ptr->aio_sigevent.sigev_value.sival_ptr = + (void *) result; + + // Fire off the aio write. + if (aio_write (aiocb_ptr) == -1) + { + // Queueing failed. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Write_Stream:aio_write queueing failed\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + return -1; + } + + // Success. Store the aiocb_ptr with Proactor. + if (this->register_aio_with_proactor (aiocb_ptr) == -1) + { + // Couldnt store the aiocb. + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Write_Stream:Fatal error\n")); + + // Clean up the memory allocated. + delete aiocb_ptr; + + return -1; + } + + // Aio successfully issued and ptr stored. return 0; + #else /* ACE_HAS_AIO_CALLS */ u_long bytes_written; @@ -594,7 +756,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, u_long bytes_read; - // Initiate the accept + // Initiate the accept. int initiate_result = ::AcceptEx ((SOCKET) result->listen_handle (), (SOCKET) result->accept_handle (), result->message_block ().wr_ptr (), @@ -712,16 +874,57 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, const void *act) { #if defined (ACE_HAS_AIO_CALLS) - ACE_UNUSED_ARG (file); - ACE_UNUSED_ARG (header_and_trailer); - ACE_UNUSED_ARG (bytes_to_write); - ACE_UNUSED_ARG (offset); - ACE_UNUSED_ARG (offset_high); - ACE_UNUSED_ARG (bytes_per_send); - ACE_UNUSED_ARG (flags); - ACE_UNUSED_ARG (act); + // Adjust these parameters if there are default values specified. + ssize_t file_size = ACE_OS::filesize (file); + + if (file_size == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p) Asynch_Transmit_File:Couldnt know the file size\n"), + -1); + + if (bytes_to_write == 0) + bytes_to_write = file_size; + + if (offset > (size_t) file_size) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p %t)Asynch_Transmit_File:File size is less than offset\n"), + -1); + + if (offset != 0) + bytes_to_write = file_size - offset + 1; + + if (bytes_per_send == 0) + bytes_per_send = bytes_to_write; + + // Configure the result parameter. + Result *result = 0; + ACE_NEW_RETURN (result, + Result (*this->handler_, + this->handle_, + file, + header_and_trailer, + bytes_to_write, + offset, + offset_high, + bytes_per_send, + flags, + act, + this->proactor_->get_handle ()), + -1); + + // Make the auxillary handler and initiate transmit. + ACE_Asynch_Transmit_Handler *transmit_handler = 0; + ACE_NEW_RETURN (transmit_handler, + ::ACE_Asynch_Transmit_Handler (result), + -1); + + ssize_t return_val = transmit_handler->transmit (); + if (return_val == -1) + // This deletes the result in it. + delete transmit_handler; + + return return_val; - return 0; #else /* ACE_HAS_AIO_CALLS */ #if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) Result *result = 0; @@ -955,33 +1158,270 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void) // If header is valid, set the fields if (this->header_ != 0) + { + this->transmit_buffers_.Head = this->header_->rd_ptr (); + this->transmit_buffers_.HeadLength = this->header_bytes_; + } + else + { + this->transmit_buffers_.Head = 0; + this->transmit_buffers_.HeadLength = 0; + } + + // If trailer is valid, set the fields + if (this->trailer_ != 0) + { + this->transmit_buffers_.Tail = this->trailer_->rd_ptr (); + this->transmit_buffers_.TailLength = this->trailer_bytes_; + } + else + { + this->transmit_buffers_.Tail = 0; + this->transmit_buffers_.TailLength = 0; + } + + // Return the transmit buffers + return &this->transmit_buffers_; + } +} + +#if defined (ACE_HAS_AIO_CALLS) +// Constructor. +ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result) + : result_ (result), + mb_ (0), + header_act_ (0), + data_act_ (0), + trailer_act_ (0), + file_offset_ (result->offset ()), + file_size_ (0), + bytes_transferred_ (0) +{ + // Allocate memory for the message block. + ACE_NEW (this->mb_, + ACE_Message_Block (this->result_->bytes_per_send () + + 1)); + + // Memory for the ACTs. + ACE_NEW (this->header_act_, + ACT); + + ACE_NEW (this->data_act_, + ACT); + + ACE_NEW (this->trailer_act_, + ACT); + + *this->header_act_ = this->HEADER_ACT; + *this->data_act_ = this->DATA_ACT; + *this->trailer_act_ = this->TRAILER_ACT; + + // Init the file size. + file_size_ = ACE_OS::filesize (this->result_->file ()); +} + +// Destructor. +ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void) +{ + delete result_; + delete mb_; + delete this->header_act_; + delete this->data_act_; + delete this->trailer_act_; +} + +// Do the transmission. +// Initiate transmitting the header. When that completes +// handle_write_stream will be called, there start transmitting the file. +int +ACE_Asynch_Transmit_Handler::transmit (void) +{ + ACE_DEBUG ((LM_DEBUG, "Asynch_Transmit_Handler::transmit\n")); + + // Open Asynch_Read_File. + if (this->rf_.open (*this, this->result_->file ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p %t):ACE_Asynch_Transmit_Handler:read_file open failed\n"), + -1); + + // Open Asynch_Write_Stream. + if (this->ws_.open (*this, this->result_->socket ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ACE_Asynch_Transmit_Handler:write_stream open failed\n"), + -1); + + // Transmit the header. + if (this->ws_.write (*this->result_->header_and_trailer ()->header (), + this->result_->header_and_trailer ()->header_bytes (), + (void *)this->header_act_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p %t):Asynch_Transmit_Handler:transmitting header:write_stream failed\n"), + -1); + + return 0; +} + +void +ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "Asynch_Transmit_Handler:handle_write_stream called\n")); + + // Update bytes transferred so far. + this->bytes_transferred_ += result.bytes_transferred (); + + // Check the success parameter. + if (result.success () == 0) + { + ACE_ERROR ((LM_ERROR, "Asynch_Transmit_File failed.\n")); + + ACE_SEH_TRY { - this->transmit_buffers_.Head = this->header_->rd_ptr (); - this->transmit_buffers_.HeadLength = this->header_bytes_; + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + errno); // Error no. } - else + ACE_SEH_FINALLY { - this->transmit_buffers_.Head = 0; - this->transmit_buffers_.HeadLength = 0; + // This is crucial to prevent memory leaks + delete this; } + } - // If trailer is valid, set the fields - if (this->trailer_ != 0) + // Write stream successful. + + // Partial write to socket. + int unsent_data = result.bytes_to_write () - result.bytes_transferred (); + if (unsent_data != 0) + { + // Reset pointers + result.message_block ().rd_ptr (result.bytes_transferred ()); + + // Duplicate the message block and retry remaining data + if (this->ws_.write (*result.message_block ().duplicate (), + unsent_data, + result.act ()) == -1) { - this->transmit_buffers_.Tail = this->trailer_->rd_ptr (); - this->transmit_buffers_.TailLength = this->trailer_bytes_; + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Transmit_Handler:write_stream failed\n")); + return; } - else + + // @@ Handling *partial write* to a socket. + // Let us not continue further before this write + // finishes. Because proceeding with another read and then write + // might change the order of the file transmission, because + // partial write to the stream is always possible. + return; + } + + // Not a partial write. + + // Check ACT to see what was sent. + ACT act = *(ACT *)result.act (); + switch (act) + { + // If it is the "trailer" that is just sent, then transmit + // file is complete. + case TRAILER_ACT: + ACE_SEH_TRY { - this->transmit_buffers_.Tail = 0; - this->transmit_buffers_.TailLength = 0; + this->result_->complete (this->bytes_transferred_, + 1, // @@ Success. + 0, // @@ Completion key. + errno); // @@ Errno. } + ACE_SEH_FINALLY + { + delete this; + } + break; - // Return the transmit buffers - return &this->transmit_buffers_; + // If header/data was sent, initiate the file data + // transmission. + case HEADER_ACT: + case DATA_ACT: + if (initiate_read_file () == -1) + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Transmit_Handler:read_file couldnt be initiated\n")); + break; + + default: + ACE_ERROR ((LM_ERROR, + "(%p %t):ACE_Asynch_Transmit_File::Aux:handle_write_stream::Unexpected act\n")); } } +void +ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + // Failure. + if (result.success () == 0) + { + ACE_ERROR ((LM_ERROR, + "(%p %t):Asynch_Transmit_Handler : read from the file failed\n")); + ACE_SEH_TRY + { + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + errno); // Error no. + } + ACE_SEH_FINALLY + { + delete this; + } + return; + } + + // Read successful. + if (result.bytes_transferred () == 0) + return; + + // Increment offset and write data to network. + this->file_offset_ += result.bytes_transferred (); + if (this->ws_.write (result.message_block (), + result.bytes_transferred (), + (void *) this->data_act_) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%p %t):ACE_Asynch_Transmit_File : write to the stream failed\n")); + return; + } +} + +int +ACE_Asynch_Transmit_Handler::initiate_read_file (void) +{ + // Is there something to read. + if (this->file_offset_ >= this->file_size_) + { + // File is sent. Send the trailer. + ACE_DEBUG ((LM_DEBUG, + "Trailer %s\n", + this->result_->header_and_trailer ()->trailer ()->rd_ptr ())); + if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (), + this->result_->header_and_trailer ()->trailer_bytes (), + (void *)this->trailer_act_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p %t):Asynch_Transmit_Handler:write_stream writing trailer failed\n"), + -1); + return 0; + } + else + { + // Inititiate an asynchronous read from the file. + if (this->rf_.read (*this->mb_, + this->mb_->size () - 1, + this->file_offset_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p %t) Asynch_Transmit_Handler::read from file failed\n"), + -1); + return 0; + } +} +#endif /* ACE_HAS_AIO_CALLS */ + // ************************************************************ ACE_Handler::~ACE_Handler (void) |