summaryrefslogtreecommitdiff
path: root/ace/POSIX_Proactor.cpp
diff options
context:
space:
mode:
authorSteve Huston <shuston@riverace.com>2002-04-25 19:50:17 +0000
committerSteve Huston <shuston@riverace.com>2002-04-25 19:50:17 +0000
commit8286d1808e4e4369bbe90c214b28a13ab8e9c009 (patch)
tree23892818bb314bea6233b2cf61449e8777f800fa /ace/POSIX_Proactor.cpp
parent90c994f617ee3cb95a7b2c379602dd822a0fb9be (diff)
downloadATCD-8286d1808e4e4369bbe90c214b28a13ab8e9c009.tar.gz
ChangeLogTag:Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com>
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r--ace/POSIX_Proactor.cpp356
1 files changed, 244 insertions, 112 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 381d7521303..c3fba575a62 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -562,23 +562,51 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr
ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
{
+ // 1. try to cancel pending aio
+ this->read_stream_.cancel ();
+
+ // 2. close both handles
+ // Destuctor of ACE_Pipe does not close handles.
+ // We can not use ACE_Pipe::close() as it
+ // closes read_handle and than write_handle.
+ // In some systems close() may wait for
+ // completion for all asynch. pending requests.
+ // So we should close write_handle firstly
+ // to force read completion ( if 1. does not help )
+ // and then read_handle and not vice versa
+
+ ACE_HANDLE h = this->pipe_.write_handle ();
+ if (h != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (h);
+
+ h = this->pipe_.read_handle ();
+ if ( h != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (h);
+
}
+
int
ACE_AIOCB_Notify_Pipe_Manager::notify ()
{
// Send the result pointer through the pipe.
char char_send = 0;
- int ret_val = ACE::send (this->pipe_.write_handle (),
- & char_send ,
- sizeof (char_send));
+ int ret_val = ACE::send (this->pipe_.write_handle (),
+ &char_send,
+ sizeof (char_send));
+
+ if (ret_val < 0)
+ {
+ if (errno != EWOULDBLOCK)
+#if 0
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("(%P %t):%p\n"),
+ ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
+ ACE_LIB_TEXT ("Error:Writing on to notify pipe failed")));
+#endif /* 0 */
+ return -1;
+ }
- if (ret_val < 0 && errno != EWOULDBLOCK)
- ACE_ERROR_RETURN ((LM_ERROR,
- "(%P %t):%p\n",
- "ACE_AIOCB_Notify_Pipe_Manager::notify"
- "Error:Writing on to notify pipe failed"),
- -1);
return 0;
}
@@ -595,15 +623,14 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
// One accept has completed. Issue a read to handle any
// <post_completion>s in the future.
- if (this->read_stream_.read (this->message_block_,
- 1, // enough to read 1 byte
- 0, // ACT
- 0) // Priority
- == -1)
+ if (-1 == this->read_stream_.read (this->message_block_,
+ 1, // enough to read 1 byte
+ 0, // ACT
+ 0)) // Priority
ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t):%p\n",
- "ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:"
- "Read from pipe failed"));
+ ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"),
+ ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
+ ACE_LIB_TEXT ("Read from pipe failed")));
// 2. Do the upcalls
@@ -621,22 +648,12 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
num_deferred_aiocb_ (0),
num_started_aio_ (0)
{
- //check for correct value for max_aio_operations
+ // Check for correct value for max_aio_operations
check_max_aio_num ();
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
+ this->create_result_aiocb_list ();
- // Initialize the array.
- for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
- {
- aiocb_list_[ai] = 0;
- result_list_[ai] = 0;
- }
-
- create_notify_manager ();
+ this->create_notify_manager ();
// start pseudo-asynchronous accept task
// one per all future acceptors
@@ -646,7 +663,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
// Special protected constructor for ACE_SUN_Proactor
ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
- ACE_POSIX_Proactor::Proactor_Type ptype)
+ ACE_POSIX_Proactor::Proactor_Type)
: aiocb_notify_pipe_manager_ (0),
aiocb_list_ (0),
result_list_ (0),
@@ -656,15 +673,49 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
num_deferred_aiocb_ (0),
num_started_aio_ (0)
{
- ACE_UNUSED_ARG (ptype);
-
//check for correct value for max_aio_operations
- check_max_aio_num ();
+ this->check_max_aio_num ();
+
+ this->create_result_aiocb_list ();
+
+ // @@ We should create Notify_Pipe_Manager in the derived class to
+ // provide correct calls for virtual functions !!!
+}
+
+// Destructor.
+ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
+{
+ this->close();
+}
+
+int
+ACE_POSIX_AIOCB_Proactor::close (void)
+{
+ // stop asynch accept task
+ this->get_asynch_pseudo_task().stop ();
+
+ this->delete_notify_manager ();
+
+ this->clear_result_queue ();
+
+ return this->delete_result_aiocb_list ();
+}
+
+void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
+{
+ notify_pipe_read_handle_ = h;
+}
- ACE_NEW (aiocb_list_,
- aiocb *[aiocb_list_max_size_]);
- ACE_NEW (result_list_,
- ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]);
+int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
+{
+ if (aiocb_list_ != 0)
+ return 0;
+
+ ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
+
+ ACE_NEW_RETURN (result_list_,
+ ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
+ -1);
// Initialize the array.
for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
@@ -673,54 +724,99 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
result_list_[ai] = 0;
}
-
- // @@ We should create Notify_Pipe_Manager in the derived class to
- // provide correct calls for virtual functions !!!
+ return 0;
}
-// Destructor.
-ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
+int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
{
- // stop asynch accept task
- this->get_asynch_pseudo_task().stop ();
+ if (aiocb_list_ == 0) // already deleted
+ return 0;
+
+ // try to cancel all uncomlpeted operarion
+ // POSIX systems may have hidden system threads
+ // that still can work with our aiocb's!
+ for (size_t ai = 0; ai < aiocb_list_max_size_; ai++)
+ {
+ if (this->aiocb_list_[ai] != 0) // active operation
+ this->cancel_aiocb (result_list_[ai]);
+ }
- delete_notify_manager ();
+ int num_pending = 0;
- // delete all uncomlpeted operarion
- // as nobody will notify client since now
for (size_t ai = 0; ai < aiocb_list_max_size_; ai++)
{
- delete result_list_[ai];
- result_list_[ai] = 0;
- aiocb_list_[ai] = 0;
+ if (this->aiocb_list_[ai] == 0 ) // not active operation
+ continue;
+
+ // Get the error and return status of the aio_ operation.
+ int error_status = 0;
+ int return_status = 0;
+ int flg_completed = this->get_result_status (result_list_[ai],
+ error_status,
+ return_status);
+
+ //don't delete uncompleted AIOCB's
+ if (flg_completed == 0) // not completed !!!
+ {
+ num_pending++;
+#if 0
+ char * errtxt = ACE_OS::strerror (error_status);
+ if (errtxt == 0)
+ errtxt ="?????????";
+
+ char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
+ "WRITE":"READ" ;
+
+
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"),
+ ai,
+ op,
+ error_status,
+ return_status,
+ errtxt ));
+#endif /* 0 */
+ }
+ else // completed , OK
+ {
+ delete this->result_list_[ai];
+ this->result_list_[ai] = 0;
+ this->aiocb_list_[ai] = 0;
+ }
}
- delete [] aiocb_list_;
- aiocb_list_ = 0;
+ //if it is not possible cancel some operation (num_pending > 0 ),
+ //we can we do only one thing -report about this
+ //and complain about POSIX implementation
+ //we know that we have memory leaks,
+ //but it is better than segmentation fault!
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
+ ACE_LIB_TEXT(" number pending AIO=%d\n"),
+ num_pending
+ ));
- delete [] result_list_;
- result_list_ = 0;
+ delete [] this->aiocb_list_;
+ this->aiocb_list_ = 0;
- clear_result_queue ();
-}
+ delete [] this->result_list_;
+ this->result_list_ = 0;
-void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
-{
- notify_pipe_read_handle_ = h;
+ return (num_pending == 0 ? 0 : -1);
+ // ?? or just always return 0;
}
void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
{
- long max_os_aio_num = ACE_OS ::sysconf (_SC_AIO_MAX);
+ long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
// Define max limit AIO's for concrete OS
// -1 means that there is no limit, but it is not true
// (example, SunOS 5.6)
- if (max_os_aio_num > 0
- && aiocb_list_max_size_ > (unsigned long) max_os_aio_num
- )
+ if (max_os_aio_num > 0 &&
+ aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
aiocb_list_max_size_ = max_os_aio_num;
#if defined (HPUX)
@@ -729,7 +825,7 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
// it has a limit 256 max elements for aio_suspend ()
// It is a pity, but ...
- long max_os_listio_num = ACE_OS ::sysconf (_SC_AIO_LISTIO_MAX);
+ long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
if (max_os_listio_num > 0
&& aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
aiocb_list_max_size_ = max_os_listio_num;
@@ -762,6 +858,39 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
"(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
aiocb_list_max_size_));
+#if defined(__sgi)
+
+ ACE_DEBUG((LM_DEBUG,
+ ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n")));
+
+//typedef struct aioinit {
+// int aio_threads; /* The number of aio threads to start (5) */
+// int aio_locks; /* Initial number of preallocated locks (3) */
+// int aio_num; /* estimated total simultanious aiobc structs (1000) */
+// int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */
+// int aio_debug; /* turn on debugging (0) */
+// int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
+// int aio_reserved[3];
+//} aioinit_t;
+
+ aioinit_t aioinit;
+
+ aioinit.aio_threads = 10; /* The number of aio threads to start (5) */
+ aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */
+ /* estimated total simultaneous aiobc structs (1000) */
+ aioinit.aio_num = aiocb_list_max_size_;
+ aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */
+ aioinit.aio_debug = 0; /* turn on debugging (0) */
+ aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */
+ aioinit.aio_reserved[0] = 0;
+ aioinit.aio_reserved[1] = 0;
+ aioinit.aio_reserved[2] = 0;
+
+ aio_sgi_init (&aioinit);
+
+#endif
+
+ return;
}
void
@@ -834,7 +963,10 @@ ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
"%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
-1);
- this->notify_completion (sig_num);
+ // let try not to overflow signal queue or notification pipe
+
+ if (this->result_queue_.size () == 1)
+ this->notify_completion (sig_num);
return 0;
}
@@ -1054,6 +1186,47 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
return retval > 0 ? 1 : 0;
}
+int
+ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ int &error_status,
+ int &return_status)
+{
+ return_status = 0;
+
+ // Get the error status of the aio_ operation.
+ error_status = aio_error (asynch_result);
+
+#if 0
+ if (error_status == -1) // <aio_error> itself has failed.
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::get_result_status:"
+ "<aio_error> has failed\n"));
+#endif /* 0 */
+
+ if (error_status == EINPROGRESS)
+ {
+ return_status = 0;
+ return 0; // not completed
+ }
+
+ return_status = aio_return (asynch_result);
+
+ if (return_status < 0)
+ {
+ return_status = 0; // zero bytes transferred
+#if 0
+ if (error_status == 0) // nonsense
+ ACE_ERROR ((LM_ERROR,
+ "%N:%l:(%P | %t)::%p\n",
+ "ACE_POSIX_AIOCB_Proactor::get_result_status:"
+ "<aio_return> failed\n"));
+#endif /* 0 */
+ }
+
+ return 1; // completed
+}
+
ACE_POSIX_Asynch_Result *
ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
int &return_status,
@@ -1082,30 +1255,9 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
if (aiocb_list_[index] == 0) // Dont process null blocks.
continue;
- // Get the error status of the aio_ operation.
- error_status = aio_error (aiocb_list_[index]);
-
- if (error_status == -1) // <aio_error> itself has failed.
- {
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_error> has failed\n"));
-
- break;
-
- // we should notify user, otherwise :
- // memory leak for result and "hanging" user
- // what was before skip this operation
-
- //aiocb_list_[index] = 0;
- //result_list_[index] = 0;
- //aiocb_list_cur_size_--;
- //continue;
- }
-
- // Continue the loop if <aio_> operation is still in progress.
- if (error_status != EINPROGRESS)
+ if (0 != this->get_result_status (result_list_[index],
+ error_status,
+ return_status)) // completed
break;
} // end for
@@ -1113,25 +1265,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
if (count == 0) // all processed , nothing found
return asynch_result;
- if (error_status == ECANCELED)
- return_status = 0;
- else if (error_status == -1)
- return_status = 0;
- else
- return_status = aio_return (aiocb_list_[index]);
-
- if (return_status == -1)
- {
- return_status = 0; // zero bytes transferred
-
- if (error_status == 0) // nonsense
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::find_completed_aio:"
- "<aio_return> failed\n"));
- }
-
-
asynch_result = result_list_[index];
aiocb_list_[index] = 0;
@@ -1196,7 +1329,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul
if (ret_val != 0) // No free slot
{
- errno = EAGAIN;
+ errno = EAGAIN;
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t)::\n"
"register_and_start_aio: "
@@ -1570,8 +1703,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
{
- // stop asynch accept task
- this->get_asynch_pseudo_task().stop ();
+ this->close ();
// @@ Enable the masked signals again.
}