summaryrefslogtreecommitdiff
path: root/ace/POSIX_Asynch_IO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/POSIX_Asynch_IO.cpp')
-rw-r--r--ace/POSIX_Asynch_IO.cpp114
1 files changed, 63 insertions, 51 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index 6c3683ed4fb..f789eb9cdec 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -78,7 +78,6 @@ ACE_POSIX_Asynch_Result::signal_number (void) const
{
return this->aio_sigevent.sigev_signo;
}
-
int
ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl)
{
@@ -237,6 +236,12 @@ ACE_POSIX_SIG_Asynch_Operation::~ACE_POSIX_SIG_Asynch_Operation (void)
{
}
+int
+ACE_POSIX_SIG_Asynch_Operation::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result)
+{
+ return this->posix_proactor ()->register_aio_with_proactor (result);
+}
+
// *********************************************************************
u_long
@@ -534,13 +539,12 @@ ACE_POSIX_SIG_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Resu
// We want queuing of RT signal to notify completion.
result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
result->aio_sigevent.sigev_signo = result->signal_number ();
-
- // Keep ACE_POSIX_Asynch_Result, the base class pointer in the
- // signal value.
- ACE_POSIX_Asynch_Result *base_result = result;
- result->aio_sigevent.sigev_value.sival_ptr = ACE_reinterpret_cast (void *,
- base_result);
+ result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
+ // Register the real-time signal with the Proactor.
+ if (this->register_aio_with_proactor (result) == -1)
+ return -1;
+
// Fire off the aio read.
if (aio_read (result) == -1)
// Queueing failed.
@@ -867,13 +871,12 @@ ACE_POSIX_SIG_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_R
// We want queuing of RT signal to notify completion.
result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
result->aio_sigevent.sigev_signo = result->signal_number ();
+ result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
- // Keep ACE_POSIX_Asynch_Result, the base class pointer in the
- // signal value.
- ACE_POSIX_Asynch_Result *base_result = result;
- result->aio_sigevent.sigev_value.sival_ptr = ACE_reinterpret_cast (void *,
- base_result);
-
+ // Register the real-time signal with the Proactor.
+ if (this->register_aio_with_proactor (result) == -1)
+ return -1;
+
// Fire off the aio write.
if (aio_write (result) == -1)
// Queueing failed.
@@ -1725,12 +1728,12 @@ protected:
ACE_POSIX_Proactor *posix_proactor_;
// POSIX_Proactor.
-
+
ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_;
// Queue of Result pointers that correspond to all the <accept>'s
// pending.
- ACE_SYNCH_MUTEX lock_;
+ ACE_Thread_Mutex lock_;
// The lock to protect the result queue which is shared. The queue
// is updated by main thread in the register function call and
// through the auxillary thread in the deregister fun. So let us
@@ -1836,7 +1839,7 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void)
// The queue is updated by main thread in the register function call and
// thru the auxillary thread in the deregister fun. So let us mutex
// it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
// Get the first item (result ptr) from the Queue.
ACE_POSIX_Asynch_Accept_Result* result = 0;
@@ -1882,13 +1885,13 @@ ACE_POSIX_AIOCB_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Ac
// The queue is updated by main thread in the register function call
// and thru the auxillary thread in the deregister fun. So let us
// mutex it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
return register_accept_call_i (result);
}
int
-ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
+ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
{
// An <accept> has been sensed on the <listen_handle>. We should be
// able to just go ahead and do the <accept> now on this <fd>. This
@@ -1949,17 +1952,18 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Acce
// The queue is updated by main thread in the register function call
// and thru the auxillary thread in the deregister fun. So let us
// mutex it.
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
// Do the work.
if (this->register_accept_call_i (result) == -1)
return -1;
-
- return 0;
+
+ // Also register the real-time signal.
+ return this->posix_proactor_->register_aio_with_proactor (result);
}
int
-ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */)
+ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
{
// An <accept> has been sensed on the <listen_handle>. We should be
// able to just go ahead and do the <accept> now on this <fd>. This
@@ -2090,7 +2094,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
// Spawn the thread. It is the only thread we are going to have. It
// will do the <handle_events> on the reactor.
return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_AIOCB_Asynch_Accept::thread_function,
- ACE_reinterpret_cast (void *, &this->reactor_));
+ (void *) &this->reactor_);
if (return_val == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:Thread_Manager::spawn failed\n"),
@@ -2106,9 +2110,10 @@ ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void)
void*
ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n"));
+
// Retrieve the reactor pointer from the argument.
- ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *,
- arg_reactor);
+ ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor;
// It should be valid Reactor, since we have a reactor_ ,e,ner we
// are passing only that one here.
@@ -2125,7 +2130,13 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
while (result != -1)
{
result = reactor->handle_events ();
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n",
+ result));
}
+
+ ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n"));
+
return 0;
}
@@ -2251,8 +2262,7 @@ void*
ACE_POSIX_SIG_Asynch_Accept::thread_function (void* arg_reactor)
{
// Retrieve the reactor pointer from the argument.
- ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *,
- arg_reactor);
+ ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor;
if (reactor == 0)
reactor = ACE_Reactor::instance ();
@@ -2466,9 +2476,7 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
// = DESCRIPTION
//
// This is a helper class for implementing
- // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. This class
- // abstracts out all the commonalities in the two different
- // POSIX Transmit Handler implementations.
+ // <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
public:
virtual ~ACE_POSIX_Asynch_Transmit_Handler (void);
@@ -2670,7 +2678,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::transmit (void)
// Transmit the header.
if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
this->result_->header_and_trailer ()->header_bytes (),
- ACE_reinterpret_cast (void *, &this->header_act_),
+ (void *) &this->header_act_,
0) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
@@ -2687,25 +2695,32 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W
// Check the success parameter.
if (result.success () == 0)
{
- // Failure.
ACE_ERROR ((LM_ERROR,
"Asynch_Transmit_File failed.\n"));
-
- ACE_SEH_TRY
- {
- this->result_->complete (this->bytes_transferred_,
- 0, // Failure.
- 0, // @@ Completion key.
- 0); // @@ Error no.
- }
- ACE_SEH_FINALLY
+
+ // Check the success parameter.
+ if (result.success () == 0)
{
- // This is crucial to prevent memory leaks. This deletes
- // the result pointer also.
- delete this;
+ // Failure.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ 0); // @@ Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ // This is crucial to prevent memory leaks. This deletes
+ // the result pointer also.
+ delete this;
+ }
}
}
-
+
// Write stream successful.
// Partial write to socket.
@@ -2739,7 +2754,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W
// Not a partial write. A full write.
// Check ACT to see what was sent.
- ACT act = * (ACT *) result.act ();
+ ACT act = *(ACT *) result.act ();
switch (act)
{
@@ -2801,10 +2816,8 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read
if (result.bytes_transferred () == 0)
return;
- // Increment offset.
+ // Increment offset and write data to network.
this->file_offset_ += result.bytes_transferred ();
-
- // Write data to network.
if (this->ws_.write (result.message_block (),
result.bytes_transferred (),
(void *)&this->data_act_,
@@ -2905,8 +2918,7 @@ ACE_POSIX_SIG_Asynch_Transmit_Handler::transmit (void)
// Transmit the header.
if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
this->result_->header_and_trailer ()->header_bytes (),
- ACE_reinterpret_cast (void *,
- &this->header_act_),
+ (void *) &this->header_act_,
this->result_->priority (),
this->result_->signal_number ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,